#!/usr/bin/env python3 import logging import mitel_ommclient2 import time import hashlib logger = logging.getLogger("fieldpoc.dect") class Dect: def __init__(self, fp): self.fp = fp def _init_client(self): self.c = mitel_ommclient2.OMMClient2( host=self.fp.config.dect.host, username=self.fp.config.dect.username, password=self.fp.config.dect.password, ommsync=True, ) @property def temp_num_prefix(self): return next(self.fp.extensions.extensions_by_type("temp")).num def load_temp_extensions(self): current_temp_extension = 0 used_temp_extensions = self.c.find_users(lambda u: u.num.startswith(self.temp_num_prefix)) for u in used_temp_extensions: temp_num = u.num self.fp.temp_extensions[temp_num] = { "name": f"Temp {temp_num[4:]}", "type": "dect", "trunk": False, "dialout_allowed": False, } def get_temp_number(self): current_temp_extension = 0 used_temp_extensions = [num[len(self.temp_num_prefix):] for num, ext in self.fp.temp_extensions.items()] while "{:0>4}".format(current_temp_extension) in used_temp_extensions: current_temp_extension += 1 return "{}{:0>4}".format(self.temp_num_prefix, current_temp_extension) def get_sip_password_for_number(self, num): return hashlib.sha256(bytes.fromhex((self.fp.config.dect.sipsecret + str(num))[-16:])).hexdigest()[:16] def create_and_bind_user(self, d, num): u = self.c.create_user(num) self.c.attach_user_device(u.uid, d.ppn) self.c.set_user_relation_fixed(u.uid) self.c.set_user_sipauth(u.uid, num, self.get_sip_password_for_number(num)) return u def run(self): logger.info("initialising connection to OMM") self._init_client() self.load_temp_extensions() while True: msg = self.fp.queues["dect"].get() self.fp.queues["dect"].task_done() if msg.get("type") == "stop": break elif msg.get("type") == "sync": logger.info("syncing") extensions = self.fp.extensions.extensions_by_type("dect") extensions_by_num = {e.num: e for e in extensions} extensions_by_ipei = {e._c['dect_ipei']: e for _, e in extensions_by_num.items() if e._c.get('dect_ipei')} created_tmp_ext = False users_by_ext = {} users_by_uid = {} for user in self.c.get_users(): e = extensions_by_num.get(user.num) if not e: # user in omm, but not as dect in nerd if user.num.startswith(next(self.fp.extensions.extensions_by_type("temp")).num): users_by_ext[user.num] = user users_by_uid[user.uid] = user else: pass # TODO: delete in omm continue elif e._c['name'] != user.name: self.c.set_user_name(user.uid, e._c['name']) if e._c.get('dect_ipei') and user.relType != mitel_ommclient2.types.PPRelTypeType("Unbound"): d = self.c.get_device(user.ppn) if d.ipei != e._c['dect_ipei']: logger.debug(f"Detaching {user} {d}") self.c.detach_user_device(user.uid, user.ppn) self.c.set_user_sipauth(user.uid, e.num, self.get_sip_password_for_number(e.num)) users_by_ext[user.num] = user users_by_uid[user.uid] = user for d in self.c.get_devices(): e = extensions_by_ipei.get(d.ipei) if e: # device is in nerd u = users_by_ext.get(e.num) if u and d.relType == mitel_ommclient2.types.PPRelTypeType("Unbound"): logger.debug(f'Binding user for {d}') self.c.attach_user_device(u.uid, d.ppn) self.c.set_user_relation_fixed(u.uid) elif d.relType != mitel_ommclient2.types.PPRelTypeType("Unbound"): ui = users_by_uid.get(d.uid) if ui.num != e.num: logger.debug(f'User for {d} has wrong number') if self.fp.temp_extensions.get(ui.num): self.fp.temp_extensions.pop(ui.num) self.c.set_user_num(d.uid, e.num) self.c.set_user_sipauth(d.uid, e.num, self.get_sip_password_for_number(e.num)) self.c.set_user_name(user.uid, e._c['name']) else: logger.debug(f'Creating and binding user for {d}') user = self.create_and_bind_user(d, e.num) self.c.set_user_name(user.uid, e._c['name']) elif d.relType == mitel_ommclient2.types.PPRelTypeType("Unbound"): temp_num = self.get_temp_number() logger.debug(f'Creating and binding tmp-user for {d}: {temp_num}') user = self.create_and_bind_user(d, temp_num) self.c.set_user_name(user.uid, f"Temp {temp_num[4:]}") self.fp.temp_extensions[temp_num] = { "name": f"Temp {temp_num[4:]}", "type": "dect", "trunk": False, "dialout_allowed": False, } created_tmp_ext = True if created_tmp_ext: self.fp.queues['routing'].put({"type": "sync"}) elif msg.get("type") == "claim": e = None for ext in self.fp.extensions.extensions_by_type("dect"): if ext._c.get('dect_claim_token') and ext._c['dect_claim_token'] == msg.get("token")[4:]: e = ext break if e: user = next(self.c.find_users(lambda u: u.num == msg.get("extension"))) if self.fp.temp_extensions.get(user.num): self.fp.temp_extensions.pop(user.num) self.c.set_user_num(user.uid, e.num) self.c.set_user_sipauth(user.uid, e.num, self.get_sip_password_for_number(e.num)) self.c.set_user_name(user.uid, e._c['name']) self.c.connection.close()