From 9394f6376b363435155b1601148c854b5e4de005 Mon Sep 17 00:00:00 2001 From: clerie Date: Wed, 6 Jul 2022 13:15:31 +0200 Subject: [PATCH] Integrate call routing management --- fieldpoc/fieldpoc.py | 6 + fieldpoc/routing.py | 497 +++++++++++++++++++++++++++++++++++++++ fieldpoc_extensions.json | 97 +++++++- 3 files changed, 597 insertions(+), 3 deletions(-) create mode 100644 fieldpoc/routing.py diff --git a/fieldpoc/fieldpoc.py b/fieldpoc/fieldpoc.py index 8ebc00b..8737c39 100644 --- a/fieldpoc/fieldpoc.py +++ b/fieldpoc/fieldpoc.py @@ -9,6 +9,7 @@ import threading from . import config from . import controller from . import dect +from . import routing from . import ywsd logger = logging.getLogger("fieldpoc.fieldpoc") @@ -28,11 +29,13 @@ class FieldPOC: self.queues = { "controller": queue.Queue(), "dect": queue.Queue(), + "routing": queue.Queue(), } logger.info("initialising components") self._controller = controller.Controller(self) self._dect = dect.Dect(self) + self._routing = routing.Routing(self) self._ywsd = ywsd.Ywsd(self) def queue_all(self, msg): @@ -64,6 +67,9 @@ class FieldPOC: self._dect_thread = threading.Thread(target=self._dect.run) self._dect_thread.start() + self._routing_thread = threading.Thread(target=self._routing.run) + self._routing_thread.start() + self._ywsd_thread = threading.Thread(target=self._ywsd.run, daemon=True) self._ywsd_thread.start() diff --git a/fieldpoc/routing.py b/fieldpoc/routing.py new file mode 100644 index 0000000..f54766e --- /dev/null +++ b/fieldpoc/routing.py @@ -0,0 +1,497 @@ +#!/usr/bin/env python3 + +import diffsync +import logging +import sqlalchemy +from typing import Optional, List +from ywsd.objects import Extension, ForkRank, User, Yate + +logger = logging.getLogger("fieldpoc.routing") + +class YateModel(diffsync.DiffSyncModel): + _modelname = "yate" + _identifiers = ("guru3_identifier",) + _attributes = ( + "hostname", + "voip_listener", + ) + + guru3_identifier: str + hostname: str + voip_listener: str + + yate_id: Optional[int] + + +class YwsdYateModel(YateModel): + @classmethod + def create(cls, diffsync, ids, attrs): + with diffsync.engine.connect() as conn: + conn.execute( + Yate.table.insert().values( + guru3_identifier=ids["guru3_identifier"], **attrs + ) + ) + return super().create(diffsync, ids=ids, attrs=attrs) + + def update(self, attrs): + with self.diffsync.engine.connect() as conn: + conn.execute( + Yate.table.update() + .where(Yate.table.c.id == self.yate_id) + .values(**attrs) + ) + return super().update(attrs) + + def delete(self): + with self.diffsync.engine.connect() as conn: + conn.execute(Yate.table.delete().where(Yate.table.c.id == self.yate_id)) + return super().delete() + + +class ExtensionModel(diffsync.DiffSyncModel): + _modelname = "extension" + _identifiers = ("extension",) + _attributes = ( + "yate", + "name", + "extension_type", + "outgoing_extension", + "dialout_allowed", + "forwarding_mode", + "lang", + ) + _children = {"extension": "forwarding_extension"} + + yate: Optional[str] + extension: str + name: Optional[str] + short_name: Optional[str] + extension_type: str + outgoing_extension: Optional[str] + outgoing_name: Optional[str] + dialout_allowed: bool + ringback: Optional[str] + forwarding_mode: str + forwarding_delay: Optional[int] + forwarding_extension: List = [] + lang: str + + extension_id: Optional[int] + + +class YwsdExtensionModel(ExtensionModel): + @classmethod + def create(cls, diffsync, ids, attrs): + with diffsync.engine.connect() as conn: + conn.execute( + Extension.table.insert().values( + extension=ids["extension"], + type=attrs["extension_type"], + yate_id=diffsync.get("yate", attrs["yate"]).yate_id + if attrs.get("yate") + else None, + **dict( + (k, v) + for k, v in attrs.items() + if k not in ["extension_type", "yate"] + ), + ) + ) + return super().create(diffsync, ids=ids, attrs=attrs) + + def update(self, attrs): + with self.diffsync.engine.connect() as conn: + addn_attrs = {} + if attrs.get("extension_type"): + addn_attrs["type"] = attrs["extension_typ"] + if attrs.get("yate"): + addn_attrs["yate_id"] = ( + self.diffsync.get("yate", attrs["yate"]).yate_id, + ) + conn.execute( + Extension.table.update() + .where(Extension.table.c.id == self.extension_id) + .values( + **dict( + (k, v) + for k, v in attrs.items() + if k not in ["extension_type", "yate"] + ), + **addn_attrs, + ) + ) + return super().update(attrs) + + def delete(self): + with self.diffsync.engine.connect() as conn: + conn.execute( + Extension.table.delete().where( + Extension.table.c.id == self.extension_id + ) + ) + return super().delete() + + +class UserModel(diffsync.DiffSyncModel): + _modelname = "user" + _identifiers = ("username",) + _attributes = ( + "displayname", + "password", + "user_type", + "dect_displaymode", + "trunk", + "static_target", + ) + + username: str + displayname: Optional[str] + password: str + inuse: Optional[bool] + user_type: str + dect_displaymode: Optional[str] + trunk: Optional[bool] + static_target: Optional[str] + + +class YwsdUserModel(UserModel): + @classmethod + def create(cls, diffsync, ids, attrs): + with diffsync.engine.connect() as conn: + conn.execute( + User.table.insert().values( + username=ids["username"], + type=attrs["user_type"], + **dict((k, v) for k, v in attrs.items() if k not in ["user_type"]), + ) + ) + return super().create(diffsync, ids=ids, attrs=attrs) + + def update(self, attrs): + with self.diffsync.engine.connect() as conn: + conn.execute( + User.table.update() + .where(User.table.c.username == self.username) + .values( + type=attrs.get("user_type", self.user_type), + **dict((k, v) for k, v in attrs.items() if k not in ["user_type"]), + ) + ) + return super().update(attrs) + + def delete(self): + with self.diffsync.engine.connect() as conn: + conn.execute( + User.table.delete().where(User.table.c.username == self.username) + ) + return super().delete() + + +class ForkRankModel(diffsync.DiffSyncModel): + _modelname = "forkrank" + _identifiers = ( + "extension", + "index", + ) + _attributes = ( + "mode", + "delay", + ) + + forkrank_id: Optional[int] + extension: str + index: int + mode: str + delay: Optional[int] + + +class YwsdForkRankModel(ForkRankModel): + @classmethod + def create(cls, diffsync, ids, attrs): + with diffsync.engine.connect() as conn: + conn.execute( + ForkRank.table.insert().values( + extension_id=diffsync.get( + "extension", ids["extension"] + ).extension_id, + index=ids["index"], + **attrs, + ) + ) + return super().create(diffsync, ids=ids, attrs=attrs) + + def update(self, attrs): + with self.diffsync.engine.connect() as conn: + conn.execute( + ForkRank.table.update() + .where(ForkRank.table.c.id == self.forkrank_id) + .values(**attrs) + ) + return super().update(attrs) + + def delete(self): + with self.diffsync.engine.connect() as conn: + conn.execute( + ForkRank.table.delete().where(ForkRank.table.c.id == self.forkrank_id) + ) + return super().delete() + + +class ForkRankMemberModel(diffsync.DiffSyncModel): + _modelname = "forkrankmember" + _identifiers = ( + "forkrank", + "extension", + ) + _attributes = ( + "rankmember_type", + "active", + ) + + forkrank: str + extension: str + rankmember_type: str + active: bool + + +class YwsdForkRankMemberModel(ForkRankMemberModel): + @classmethod + def create(cls, diffsync, ids, attrs): + with diffsync.engine.connect() as conn: + conn.execute( + ForkRank.member_table.insert().values( + forkrank_id=diffsync.get("forkrank", ids["forkrank"]).forkrank_id, + extension_id=diffsync.get( + "extension", ids["extension"] + ).extension_id, + **attrs, + ) + ) + return super().create(diffsync, ids=ids, attrs=attrs) + + def update(self, attrs): + with self.diffsync.engine.connect() as conn: + conn.execute( + ForkRank.member_table.update() + .where( + _and( + ForkRank.member_table.c.forkrank_id + == self.diffsync.get("forkrank", self.forkrank).forkrank_id, + ForkRank.member_table.c.extension_id + == self.diffsync.get("extension", self.extension).extension_id, + ) + ) + .values(**attrs) + ) + return super().update(attrs) + + def delete(self): + with self.diffsync.engine.connect() as conn: + conn.execute( + ForkRank.member_table.delete().where( + _and( + ForkRank.member_table.c.forkrank_id + == self.diffsync.get("forkrank", self.forkrank).forkrank_id, + ForkRank.member_table.c.extension_id + == self.diffsync.get("extension", self.extension).extension_id, + ) + ) + ) + return super().delete() + + +class BackendNerd(diffsync.DiffSync): + yate = YateModel + extension = ExtensionModel + user = UserModel + forkrank = ForkRankModel + forkrankmember = ForkRankMemberModel + + top_level = ["yate", "extension", "user", "forkrank", "forkrankmember"] + + def load(self, data): + yate_dect = self.yate( + guru3_identifier="dect", hostname="dect", voip_listener="local" + ) + self.add(yate_dect) + yate_sip = self.yate( + guru3_identifier="sip", hostname="sip", voip_listener="local" + ) + self.add(yate_sip) + + for key, value in data["extensions"].items(): + yate = None + if value["type"] in ["dect", "static"]: + # yate = yate_dect.guru3_identifier + yate = yate_sip.guru3_identifier + elif value["type"] in ["sip"]: + yate = yate_sip.guru3_identifier + elif value["type"] in ["callgroup"]: + forkrank = self.forkrank(extension=key, index=0, mode="DEFAULT") + self.add(forkrank) + + for member in value["callgroup_members"]: + frm = self.forkrankmember( + forkrank=forkrank.get_unique_id(), + extension=member, + rankmember_type="DEFAULT", + active=True, + ) + self.add(frm) + + extension = self.extension( + extension=key, + **dict( + (k, v) + for k, v in value.items() + if k in ["name", "outgoing_extension", "dialout_allowed"] + ), + extension_type=("SIMPLE" if not value["trunk"] else "TRUNK") + if not value["type"] == "callgroup" + else "GROUP", + forwarding_mode="DISABLED", + lang="en-GB", + yate=yate, + ) + self.add(extension) + + if value["type"] in ["sip", "static", "dect"]: + user_type = {"sip": "user", "dect": "user", "static": "static"} + user = self.user( + username=key, + displayname=value["name"], + password=value.get("sip_password", key), + user_type=user_type[value["type"]], + trunk=value["trunk"], + static_target=value.get("static_target", ""), + ) + self.add(user) + + +class BackendYwsd(diffsync.DiffSync): + yate = YwsdYateModel + extension = YwsdExtensionModel + user = YwsdUserModel + forkrank = YwsdForkRankModel + forkrankmember = YwsdForkRankMemberModel + + top_level = ["yate", "extension", "user", "forkrank", "forkrankmember"] + + def load(self, database_url): + self.engine = sqlalchemy.create_engine(database_url) + + extension_ids = {} + forkrank_ids = {} + with self.engine.connect() as conn: + for row in conn.execute(Yate.table.select()): + yate = self.yate( + **dict( + (k, v) + for k, v in row._mapping.items() + if k in ["guru3_identifier", "hostname", "voip_listener"] + ), + yate_id=row.id, + ) + self.add(yate) + for row in conn.execute(Extension.table.select()): + extension = self.extension( + **dict( + (k, v) + for k, v in row._mapping.items() + if k + in [ + "extension", + "name", + "short_name", + "outgoing_extension", + "outgoing_name", + "dialout_allowed", + "ringback", + "forwarding_mode", + "forwarding_delay", + "lang", + ] + ), + extension_type=row.type, + extension_id=row.id, + ) + extension_ids[row.id] = row.extension + # TODO: forwarding_extension_id + if row.yate_id: + for yate in self.get_all("yate"): + if yate.yate_id == row.yate_id: + extension.yate = yate.guru3_identifier + self.add(extension) + for row in conn.execute(User.table.select()): + user = self.user( + **dict( + (k, v) + for k, v in row._mapping.items() + if k + in [ + "username", + "displayname", + "password", + "inuse", + "dect_displaymode", + "trunk", + "static_target", + ] + ), + user_type=row.type, + ) + self.add(user) + for row in conn.execute(ForkRank.table.select()): + fr = self.forkrank( + forkrank_id=row.id, + extension=extension_ids[row.extension_id], + **dict( + (k, v) + for k, v in row._mapping.items() + if k in ["index", "mode", "delay"] + ), + ) + forkrank_ids[row.id] = fr.get_unique_id() + self.add(fr) + for row in conn.execute(ForkRank.member_table.select()): + frm = self.forkrankmember( + forkrank=forkrank_ids[row.forkrank_id], + extension=extension_ids[row.extension_id], + **dict( + (k, v) + for k, v in row._mapping.items() + if k in ["rankmember_type", "active"] + ), + ) + self.add(frm) + + +class Routing: + def __init__(self, fp): + self.fp = fp + + def run(self): + while True: + msg = self.fp.queues["dect"].get() + self.fp.queues["dect"].task_done() + + if msg.get("type") == "stop": + break + elif msg.get("type") == "sync": + + logger.info("syncing") + + state_fieldpoc = BackendNerd() + state_fieldpoc.load(self.fp.extensions._c) + state_yate = BackendYwsd() + state_yate.load("postgresql+psycopg2://{}:{}@{}/{}".format( + self.fp.config.database.username, + self.fp.config.database.password, + self.fp.config.database.hostname, + self.fp.config.database.database, + )) + + logger.info("\n" + state_yate.diff_from(state_fieldpoc).str()) + + state_yate.sync_from(state_fieldpoc) diff --git a/fieldpoc_extensions.json b/fieldpoc_extensions.json index a890f65..6d4027a 100644 --- a/fieldpoc_extensions.json +++ b/fieldpoc_extensions.json @@ -1,17 +1,108 @@ { "extensions": { + "1000": { + "name": "strontium", + "type": "sip", + "dialout_allowed": true, + "trunk": false, + "sip_password": "AeShahS2" + }, + "2020": { + "name": "Hotline", + "type": "static", + "dialout_allowed": false, + "trunk": false, + "static_target": "external/nodata//tmp/yate-tcl/hotline.tcl" + }, + "2342": { + "name": "hydrox", + "type": "sip", + "dialout_allowed": true, + "trunk": false, + "sip_password": "Xua2liej" + }, "2574": { "name": "clerie", "type": "dect", + "trunk": false, + "dialout_allowed": true, "dect_claim_token": "2574" }, + "2789": { + "name": "bruttl", + "type": "dect", + "dialout_allowed": true, + "trunk": false, + "dect_ipei": "03586 0682136 3" + }, + "5375": { + "name": "n0emis", + "type": "callgroup", + "dialout_allowed": true, + "trunk": false, + "callgroup_members": [ + "5376", + "5377", + "5379" + ] + }, + "5376": { + "name": "n0emis SIP", + "type": "sip", + "dialout_allowed": true, + "trunk": false, + "outgoing_extension": "5375", + "sip_password": "wieK5xal" + }, + "5377": { + "name": "n0emis DECT", + "type": "dect", + "dialout_allowed": true, + "trunk": false, + "outgoing_extension": "5375", + "dect_ipei": "10345 0136625 3" + }, + "5379": { + "name": "n0emis iPhone", + "type": "sip", + "dialout_allowed": true, + "trunk": false, + "outgoing_extension": "5375", + "sip_password": "uuchahD9" + }, + "7486": { + "name": "floppy", + "type": "dect", + "dialout_allowed": true, + "trunk": false, + "dect_ipei": "12877 0914855 9" + }, + "8463": { + "name": "Time", + "type": "static", + "dialout_allowed": false, + "trunk": false, + "static_target": "external/nodata//tmp/yate-tcl/time.tcl" + }, + "8888": { + "name": "test", + "type": "static", + "dialout_allowed": true, + "trunk": false, + "static_target": "line/01754566012;line=dialout" + }, "9998": { "name": "Temporary Numbers", - "type": "temp" + "trunk": false, + "dialout_allowed": true, + "type": "static" }, "9999": { - "name": "DECT Claim Token Pool", - "type": "claim_token" + "name": "DECT Claim Extensions", + "type": "static", + "dialout_allowed": false, + "trunk": true, + "static_target": "external/nodata//opt/nerdsync/claim.py" } } }