#!/usr/bin/env python3 import diffsync import logging import sqlalchemy from typing import Optional, List from ywsd.objects import Extension, ForkRank, User, Yate logger = logging.getLogger("fieldpoc.routing") class YateModel(diffsync.DiffSyncModel): _modelname = "yate" _identifiers = ("guru3_identifier",) _attributes = ( "hostname", "voip_listener", ) guru3_identifier: str hostname: str voip_listener: str yate_id: Optional[int] class YwsdYateModel(YateModel): @classmethod def create(cls, diffsync, ids, attrs): with diffsync.engine.connect() as conn: result = conn.execute( Yate.table.insert().values( guru3_identifier=ids["guru3_identifier"], **attrs ) ) attrs["yate_id"] = result.inserted_primary_key[0] return super().create(diffsync, ids=ids, attrs=attrs) def update(self, attrs): with self.diffsync.engine.connect() as conn: conn.execute( Yate.table.update() .where(Yate.table.c.id == self.yate_id) .values(**attrs) ) return super().update(attrs) def delete(self): with self.diffsync.engine.connect() as conn: conn.execute(Yate.table.delete().where(Yate.table.c.id == self.yate_id)) return super().delete() class ExtensionModel(diffsync.DiffSyncModel): _modelname = "extension" _identifiers = ("extension",) _attributes = ( "yate", "name", "extension_type", "outgoing_extension", "dialout_allowed", "forwarding_mode", "lang", ) _children = {"extension": "forwarding_extension"} yate: Optional[str] extension: str name: Optional[str] short_name: Optional[str] extension_type: str outgoing_extension: Optional[str] outgoing_name: Optional[str] dialout_allowed: bool ringback: Optional[str] forwarding_mode: str forwarding_delay: Optional[int] forwarding_extension: List = [] lang: str extension_id: Optional[int] class YwsdExtensionModel(ExtensionModel): @classmethod def create(cls, diffsync, ids, attrs): with diffsync.engine.connect() as conn: result = conn.execute( Extension.table.insert().values( extension=ids["extension"], type=attrs["extension_type"], yate_id=diffsync.get("yate", attrs["yate"]).yate_id if attrs.get("yate") else None, **dict( (k, v) for k, v in attrs.items() if k not in ["extension_type", "yate"] ), ) ) attrs["extension_id"] = result.inserted_primary_key[0] return super().create(diffsync, ids=ids, attrs=attrs) def update(self, attrs): with self.diffsync.engine.connect() as conn: addn_attrs = {} if attrs.get("extension_type"): addn_attrs["type"] = attrs["extension_typ"] if attrs.get("yate"): addn_attrs["yate_id"] = ( self.diffsync.get("yate", attrs["yate"]).yate_id, ) conn.execute( Extension.table.update() .where(Extension.table.c.id == self.extension_id) .values( **dict( (k, v) for k, v in attrs.items() if k not in ["extension_type", "yate"] ), **addn_attrs, ) ) return super().update(attrs) def delete(self): with self.diffsync.engine.connect() as conn: conn.execute( Extension.table.delete().where( Extension.table.c.id == self.extension_id ) ) return super().delete() class UserModel(diffsync.DiffSyncModel): _modelname = "user" _identifiers = ("username",) _attributes = ( "displayname", "password", "user_type", "dect_displaymode", "trunk", "static_target", ) username: str displayname: Optional[str] password: str inuse: Optional[bool] user_type: str dect_displaymode: Optional[str] trunk: Optional[bool] static_target: Optional[str] class YwsdUserModel(UserModel): @classmethod def create(cls, diffsync, ids, attrs): with diffsync.engine.connect() as conn: conn.execute( User.table.insert().values( username=ids["username"], type=attrs["user_type"], **dict((k, v) for k, v in attrs.items() if k not in ["user_type"]), ) ) return super().create(diffsync, ids=ids, attrs=attrs) def update(self, attrs): with self.diffsync.engine.connect() as conn: conn.execute( User.table.update() .where(User.table.c.username == self.username) .values( type=attrs.get("user_type", self.user_type), **dict((k, v) for k, v in attrs.items() if k not in ["user_type"]), ) ) return super().update(attrs) def delete(self): with self.diffsync.engine.connect() as conn: conn.execute( User.table.delete().where(User.table.c.username == self.username) ) return super().delete() class ForkRankModel(diffsync.DiffSyncModel): _modelname = "forkrank" _identifiers = ( "extension", "index", ) _attributes = ( "mode", "delay", ) forkrank_id: Optional[int] extension: str index: int mode: str delay: Optional[int] class YwsdForkRankModel(ForkRankModel): @classmethod def create(cls, diffsync, ids, attrs): with diffsync.engine.connect() as conn: result = conn.execute( ForkRank.table.insert().values( extension_id=diffsync.get( "extension", ids["extension"] ).extension_id, index=ids["index"], **attrs, ) ) attrs["forkrank_id"] = result.inserted_primary_key[0] return super().create(diffsync, ids=ids, attrs=attrs) def update(self, attrs): with self.diffsync.engine.connect() as conn: conn.execute( ForkRank.table.update() .where(ForkRank.table.c.id == self.forkrank_id) .values(**attrs) ) return super().update(attrs) def delete(self): with self.diffsync.engine.connect() as conn: conn.execute( ForkRank.table.delete().where(ForkRank.table.c.id == self.forkrank_id) ) return super().delete() class ForkRankMemberModel(diffsync.DiffSyncModel): _modelname = "forkrankmember" _identifiers = ( "forkrank", "extension", ) _attributes = ( "rankmember_type", "active", ) forkrank: str extension: str rankmember_type: str active: bool class YwsdForkRankMemberModel(ForkRankMemberModel): @classmethod def create(cls, diffsync, ids, attrs): with diffsync.engine.connect() as conn: conn.execute( ForkRank.member_table.insert().values( forkrank_id=diffsync.get("forkrank", ids["forkrank"]).forkrank_id, extension_id=diffsync.get( "extension", ids["extension"] ).extension_id, **attrs, ) ) return super().create(diffsync, ids=ids, attrs=attrs) def update(self, attrs): with self.diffsync.engine.connect() as conn: conn.execute( ForkRank.member_table.update() .where( _and( ForkRank.member_table.c.forkrank_id == self.diffsync.get("forkrank", self.forkrank).forkrank_id, ForkRank.member_table.c.extension_id == self.diffsync.get("extension", self.extension).extension_id, ) ) .values(**attrs) ) return super().update(attrs) def delete(self): with self.diffsync.engine.connect() as conn: conn.execute( ForkRank.member_table.delete().where( _and( ForkRank.member_table.c.forkrank_id == self.diffsync.get("forkrank", self.forkrank).forkrank_id, ForkRank.member_table.c.extension_id == self.diffsync.get("extension", self.extension).extension_id, ) ) ) return super().delete() class BackendNerd(diffsync.DiffSync): yate = YateModel extension = ExtensionModel user = UserModel forkrank = ForkRankModel forkrankmember = ForkRankMemberModel top_level = ["yate", "extension", "user", "forkrank", "forkrankmember"] def load(self, data): #yate_dect = self.yate( # guru3_identifier="dect", hostname="dect", voip_listener="local" #) #self.add(yate_dect) yate_sip = self.yate( guru3_identifier="sip", hostname="sip", voip_listener="local" ) self.add(yate_sip) for key, value in data["extensions"].items(): yate = None if value["type"] in ["dect", "static"]: # yate = yate_dect.guru3_identifier yate = yate_sip.guru3_identifier elif value["type"] in ["sip"]: yate = yate_sip.guru3_identifier elif value["type"] in ["callgroup"]: forkrank = self.forkrank(extension=key, index=0, mode="DEFAULT") self.add(forkrank) for member in value["callgroup_members"]: frm = self.forkrankmember( forkrank=forkrank.get_unique_id(), extension=member, rankmember_type="DEFAULT", active=True, ) self.add(frm) elif value["type"] in ["temp"]: continue extension = self.extension( extension=key, **dict( (k, v) for k, v in value.items() if k in ["name", "outgoing_extension", "dialout_allowed"] ), extension_type=("SIMPLE" if not value["trunk"] else "TRUNK") if not value["type"] == "callgroup" else "GROUP", forwarding_mode="DISABLED", lang="en-GB", yate=yate, ) self.add(extension) if value["type"] in ["sip", "static", "dect"]: user_type = {"sip": "user", "dect": "user", "static": "static"} user = self.user( username=key, displayname=value["name"], password=value.get("sip_password", key), user_type=user_type[value["type"]], trunk=value["trunk"], static_target=value.get("static_target", ""), ) self.add(user) class BackendYwsd(diffsync.DiffSync): yate = YwsdYateModel extension = YwsdExtensionModel user = YwsdUserModel forkrank = YwsdForkRankModel forkrankmember = YwsdForkRankMemberModel top_level = ["yate", "extension", "user", "forkrank", "forkrankmember"] def load(self, database_url): self.engine = sqlalchemy.create_engine(database_url) extension_ids = {} forkrank_ids = {} with self.engine.connect() as conn: for row in conn.execute(Yate.table.select()): yate = self.yate( **dict( (k, v) for k, v in row._mapping.items() if k in ["guru3_identifier", "hostname", "voip_listener"] ), yate_id=row.id, ) self.add(yate) for row in conn.execute(Extension.table.select()): extension = self.extension( **dict( (k, v) for k, v in row._mapping.items() if k in [ "extension", "name", "short_name", "outgoing_extension", "outgoing_name", "dialout_allowed", "ringback", "forwarding_mode", "forwarding_delay", "lang", ] ), extension_type=row.type, extension_id=row.id, ) extension_ids[row.id] = row.extension # TODO: forwarding_extension_id if row.yate_id: for yate in self.get_all("yate"): if yate.yate_id == row.yate_id: extension.yate = yate.guru3_identifier self.add(extension) for row in conn.execute(User.table.select()): user = self.user( **dict( (k, v) for k, v in row._mapping.items() if k in [ "username", "displayname", "password", "inuse", "dect_displaymode", "trunk", "static_target", ] ), user_type=row.type, ) self.add(user) for row in conn.execute(ForkRank.table.select()): fr = self.forkrank( forkrank_id=row.id, extension=extension_ids[row.extension_id], **dict( (k, v) for k, v in row._mapping.items() if k in ["index", "mode", "delay"] ), ) forkrank_ids[row.id] = fr.get_unique_id() self.add(fr) for row in conn.execute(ForkRank.member_table.select()): frm = self.forkrankmember( forkrank=forkrank_ids[row.forkrank_id], extension=extension_ids[row.extension_id], **dict( (k, v) for k, v in row._mapping.items() if k in ["rankmember_type", "active"] ), ) self.add(frm) class Routing: def __init__(self, fp): self.fp = fp def run(self): while True: msg = self.fp.queues["routing"].get() self.fp.queues["routing"].task_done() if msg.get("type") == "stop": break elif msg.get("type") == "sync": logger.info("syncing") state_fieldpoc = BackendNerd() state_fieldpoc.load(self.fp.extensions._c) state_yate = BackendYwsd() state_yate.load("postgresql+psycopg2://{}:{}@{}/{}".format( self.fp.config.database.username, self.fp.config.database.password, self.fp.config.database.hostname, self.fp.config.database.database, )) logger.info("\n" + state_yate.diff_from(state_fieldpoc).str()) state_yate.sync_from(state_fieldpoc)