You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

509 lines
16 KiB
Python

#!/usr/bin/env python3
import diffsync
import logging
import sqlalchemy
from typing import Optional, List
from ywsd.objects import Extension, ForkRank, User, Yate
logger = logging.getLogger("fieldpoc.routing")
class YateModel(diffsync.DiffSyncModel):
_modelname = "yate"
_identifiers = ("guru3_identifier",)
_attributes = (
"hostname",
"voip_listener",
)
guru3_identifier: str
hostname: str
voip_listener: str
yate_id: Optional[int]
class YwsdYateModel(YateModel):
@classmethod
def create(cls, diffsync, ids, attrs):
with diffsync.engine.connect() as conn:
result = conn.execute(
Yate.table.insert().values(
guru3_identifier=ids["guru3_identifier"], **attrs
)
)
attrs["yate_id"] = result.inserted_primary_key[0]
return super().create(diffsync, ids=ids, attrs=attrs)
def update(self, attrs):
with self.diffsync.engine.connect() as conn:
conn.execute(
Yate.table.update()
.where(Yate.table.c.id == self.yate_id)
.values(**attrs)
)
return super().update(attrs)
def delete(self):
with self.diffsync.engine.connect() as conn:
conn.execute(Yate.table.delete().where(Yate.table.c.id == self.yate_id))
return super().delete()
class ExtensionModel(diffsync.DiffSyncModel):
_modelname = "extension"
_identifiers = ("extension",)
_attributes = (
"yate",
"name",
"extension_type",
"outgoing_extension",
"dialout_allowed",
"forwarding_mode",
"lang",
)
_children = {"extension": "forwarding_extension"}
yate: Optional[str]
extension: str
name: Optional[str]
short_name: Optional[str]
extension_type: str
outgoing_extension: Optional[str]
outgoing_name: Optional[str]
dialout_allowed: bool
ringback: Optional[str]
forwarding_mode: str
forwarding_delay: Optional[int]
forwarding_extension: List = []
lang: str
extension_id: Optional[int]
class YwsdExtensionModel(ExtensionModel):
@classmethod
def create(cls, diffsync, ids, attrs):
with diffsync.engine.connect() as conn:
result = conn.execute(
Extension.table.insert().values(
extension=ids["extension"],
type=attrs["extension_type"],
yate_id=diffsync.get("yate", attrs["yate"]).yate_id
if attrs.get("yate")
else None,
**dict(
(k, v)
for k, v in attrs.items()
if k not in ["extension_type", "yate"]
),
)
)
attrs["extension_id"] = result.inserted_primary_key[0]
return super().create(diffsync, ids=ids, attrs=attrs)
def update(self, attrs):
with self.diffsync.engine.connect() as conn:
addn_attrs = {}
if attrs.get("extension_type"):
addn_attrs["type"] = attrs["extension_typ"]
if attrs.get("yate"):
addn_attrs["yate_id"] = (
self.diffsync.get("yate", attrs["yate"]).yate_id,
)
conn.execute(
Extension.table.update()
.where(Extension.table.c.id == self.extension_id)
.values(
**dict(
(k, v)
for k, v in attrs.items()
if k not in ["extension_type", "yate"]
),
**addn_attrs,
)
)
return super().update(attrs)
def delete(self):
with self.diffsync.engine.connect() as conn:
conn.execute(
Extension.table.delete().where(
Extension.table.c.id == self.extension_id
)
)
return super().delete()
class UserModel(diffsync.DiffSyncModel):
_modelname = "user"
_identifiers = ("username",)
_attributes = (
"displayname",
"password",
"user_type",
"dect_displaymode",
"trunk",
"static_target",
)
username: str
displayname: Optional[str]
password: str
inuse: Optional[bool]
user_type: str
dect_displaymode: Optional[str]
trunk: Optional[bool]
static_target: Optional[str]
class YwsdUserModel(UserModel):
@classmethod
def create(cls, diffsync, ids, attrs):
with diffsync.engine.connect() as conn:
conn.execute(
User.table.insert().values(
username=ids["username"],
type=attrs["user_type"],
**dict((k, v) for k, v in attrs.items() if k not in ["user_type"]),
)
)
return super().create(diffsync, ids=ids, attrs=attrs)
def update(self, attrs):
with self.diffsync.engine.connect() as conn:
conn.execute(
User.table.update()
.where(User.table.c.username == self.username)
.values(
type=attrs.get("user_type", self.user_type),
**dict((k, v) for k, v in attrs.items() if k not in ["user_type"]),
)
)
return super().update(attrs)
def delete(self):
with self.diffsync.engine.connect() as conn:
conn.execute(
User.table.delete().where(User.table.c.username == self.username)
)
return super().delete()
class ForkRankModel(diffsync.DiffSyncModel):
_modelname = "forkrank"
_identifiers = (
"extension",
"index",
)
_attributes = (
"mode",
"delay",
)
forkrank_id: Optional[int]
extension: str
index: int
mode: str
delay: Optional[int]
class YwsdForkRankModel(ForkRankModel):
@classmethod
def create(cls, diffsync, ids, attrs):
with diffsync.engine.connect() as conn:
result = conn.execute(
ForkRank.table.insert().values(
extension_id=diffsync.get(
"extension", ids["extension"]
).extension_id,
index=ids["index"],
**attrs,
)
)
attrs["forkrank_id"] = result.inserted_primary_key[0]
return super().create(diffsync, ids=ids, attrs=attrs)
def update(self, attrs):
with self.diffsync.engine.connect() as conn:
conn.execute(
ForkRank.table.update()
.where(ForkRank.table.c.id == self.forkrank_id)
.values(**attrs)
)
return super().update(attrs)
def delete(self):
with self.diffsync.engine.connect() as conn:
conn.execute(
ForkRank.table.delete().where(ForkRank.table.c.id == self.forkrank_id)
)
return super().delete()
class ForkRankMemberModel(diffsync.DiffSyncModel):
_modelname = "forkrankmember"
_identifiers = (
"forkrank",
"extension",
)
_attributes = (
"rankmember_type",
"active",
)
forkrank: str
extension: str
rankmember_type: str
active: bool
class YwsdForkRankMemberModel(ForkRankMemberModel):
@classmethod
def create(cls, diffsync, ids, attrs):
with diffsync.engine.connect() as conn:
conn.execute(
ForkRank.member_table.insert().values(
forkrank_id=diffsync.get("forkrank", ids["forkrank"]).forkrank_id,
extension_id=diffsync.get(
"extension", ids["extension"]
).extension_id,
**attrs,
)
)
return super().create(diffsync, ids=ids, attrs=attrs)
def update(self, attrs):
with self.diffsync.engine.connect() as conn:
conn.execute(
ForkRank.member_table.update()
.where(
sqlalchemy.and_(
ForkRank.member_table.c.forkrank_id
== self.diffsync.get("forkrank", self.forkrank).forkrank_id,
ForkRank.member_table.c.extension_id
== self.diffsync.get("extension", self.extension).extension_id,
)
)
.values(**attrs)
)
return super().update(attrs)
def delete(self):
with self.diffsync.engine.connect() as conn:
conn.execute(
ForkRank.member_table.delete().where(
sqlalchemy.and_(
ForkRank.member_table.c.forkrank_id
== self.diffsync.get("forkrank", self.forkrank).forkrank_id,
ForkRank.member_table.c.extension_id
== self.diffsync.get("extension", self.extension).extension_id,
)
)
)
return super().delete()
class BackendNerd(diffsync.DiffSync):
yate = YateModel
extension = ExtensionModel
user = UserModel
forkrank = ForkRankModel
forkrankmember = ForkRankMemberModel
top_level = ["yate", "extension", "user", "forkrank", "forkrankmember"]
def __init__(self, fp, *args, **kwargs):
self.fp = fp
super().__init__(*args, **kwargs)
def load(self, data):
#yate_dect = self.yate(
# guru3_identifier="dect", hostname="dect", voip_listener="local"
#)
#self.add(yate_dect)
yate_sip = self.yate(
guru3_identifier="sip", hostname="sip", voip_listener="local"
)
self.add(yate_sip)
for key, value in data["extensions"].items():
yate = None
if value["type"] in ["dect", "static"]:
# yate = yate_dect.guru3_identifier
yate = yate_sip.guru3_identifier
elif value["type"] in ["sip"]:
yate = yate_sip.guru3_identifier
elif value["type"] in ["callgroup"]:
forkrank = self.forkrank(extension=key, index=0, mode="DEFAULT")
self.add(forkrank)
for member in value["callgroup_members"]:
frm = self.forkrankmember(
forkrank=forkrank.get_unique_id(),
extension=member,
rankmember_type="DEFAULT",
active=True,
)
self.add(frm)
elif value["type"] in ["temp"]:
continue
extension = self.extension(
extension=key,
**dict(
(k, v)
for k, v in value.items()
if k in ["name", "outgoing_extension", "dialout_allowed"]
),
extension_type=("SIMPLE" if not value["trunk"] else "TRUNK")
if not value["type"] == "callgroup"
else "GROUP",
forwarding_mode="DISABLED",
lang="en-GB",
yate=yate,
)
self.add(extension)
if value["type"] in ["sip", "static", "dect"]:
user_type = {"sip": "user", "dect": "user", "static": "static"}
user = self.user(
username=key,
displayname=value["name"],
password=value.get("sip_password", self.fp._dect.get_sip_password_for_number(key)),
user_type=user_type[value["type"]],
trunk=value["trunk"],
static_target=value.get("static_target", ""),
)
self.add(user)
class BackendYwsd(diffsync.DiffSync):
yate = YwsdYateModel
extension = YwsdExtensionModel
user = YwsdUserModel
forkrank = YwsdForkRankModel
forkrankmember = YwsdForkRankMemberModel
top_level = ["yate", "extension", "user", "forkrank", "forkrankmember"]
def load(self, database_url):
self.engine = sqlalchemy.create_engine(database_url)
extension_ids = {}
forkrank_ids = {}
with self.engine.connect() as conn:
for row in conn.execute(Yate.table.select()):
yate = self.yate(
**dict(
(k, v)
for k, v in row._mapping.items()
if k in ["guru3_identifier", "hostname", "voip_listener"]
),
yate_id=row.id,
)
self.add(yate)
for row in conn.execute(Extension.table.select()):
extension = self.extension(
**dict(
(k, v)
for k, v in row._mapping.items()
if k
in [
"extension",
"name",
"short_name",
"outgoing_extension",
"outgoing_name",
"dialout_allowed",
"ringback",
"forwarding_mode",
"forwarding_delay",
"lang",
]
),
extension_type=row.type,
extension_id=row.id,
)
extension_ids[row.id] = row.extension
# TODO: forwarding_extension_id
if row.yate_id:
for yate in self.get_all("yate"):
if yate.yate_id == row.yate_id:
extension.yate = yate.guru3_identifier
self.add(extension)
for row in conn.execute(User.table.select()):
user = self.user(
**dict(
(k, v)
for k, v in row._mapping.items()
if k
in [
"username",
"displayname",
"password",
"inuse",
"dect_displaymode",
"trunk",
"static_target",
]
),
user_type=row.type,
)
self.add(user)
for row in conn.execute(ForkRank.table.select()):
fr = self.forkrank(
forkrank_id=row.id,
extension=extension_ids[row.extension_id],
**dict(
(k, v)
for k, v in row._mapping.items()
if k in ["index", "mode", "delay"]
),
)
forkrank_ids[row.id] = fr.get_unique_id()
self.add(fr)
for row in conn.execute(ForkRank.member_table.select()):
frm = self.forkrankmember(
forkrank=forkrank_ids[row.forkrank_id],
extension=extension_ids[row.extension_id],
**dict(
(k, v)
for k, v in row._mapping.items()
if k in ["rankmember_type", "active"]
),
)
self.add(frm)
class Routing:
def __init__(self, fp):
self.fp = fp
def run(self):
while True:
msg = self.fp.queues["routing"].get()
self.fp.queues["routing"].task_done()
if msg.get("type") == "stop":
break
elif msg.get("type") == "sync":
logger.info("syncing")
state_fieldpoc = BackendNerd(self.fp)
extensions = self.fp.extensions._c.copy()
extensions['extensions'].update(self.fp.temp_extensions)
state_fieldpoc.load(extensions)
state_yate = BackendYwsd()
state_yate.load("postgresql+psycopg2://{}:{}@{}/{}".format(
self.fp.config.database.username,
self.fp.config.database.password,
self.fp.config.database.hostname,
self.fp.config.database.database,
))
logger.info("\n" + state_yate.diff_from(state_fieldpoc).str())
state_yate.sync_from(state_fieldpoc)