You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

99 lines
4.1 KiB
Python

#!/usr/bin/env python3
import logging
import selectors
import socket
import socketserver
import time
import threading
logger = logging.getLogger("fieldpoc.controller")
class Controller:
def __init__(self, fp):
self.fp = fp
self.handlers = [] # collect control sockets for handlers, so we can shut them down on demand
def get_handler(self):
class HandleRequest(socketserver.BaseRequestHandler):
fp = self.fp
controller = self
def setup(self):
self.handler_read, self.handler_write = socket.socketpair()
self.controller.handlers.append(self.handler_write)
def handle(self):
self.request.sendall("FieldPOC interactive controller\n".encode("utf-8"))
sel = selectors.DefaultSelector()
sel.register(self.handler_read, selectors.EVENT_READ)
sel.register(self.request, selectors.EVENT_READ)
while True:
self.request.sendall("> ".encode("utf-8"))
for key, mask in sel.select():
if key.fileobj == self.handler_read:
self.request.sendall("\ndisconnecting\n".encode("utf-8"))
return
elif key.fileobj == self.request:
data = self.request.recv(1024).decode("utf-8").strip()
if data == "help":
self.request.sendall("""Availiable commands:
help Show this info
handlers Show currently running handlers
sync Start syncing
queues Show queue stats
reload Reload extension config file
claim <ext> <token> claim dect extension
exit Disconnect
""".encode("utf-8"))
elif data == "":
continue
elif data == "quit" or data == "exit":
self.request.sendall("disconnecting\n".encode("utf-8"))
return
elif data == "handlers":
self.request.sendall(("\n".join([str(h) for h in self.controller.handlers]) + "\n").encode("utf-8"))
elif data == "sync":
self.fp.queue_all({"type": "sync"})
elif data == "queues":
self.request.sendall(("\n".join(["{} {}".format(name, queue.qsize()) for name, queue in self.fp.queues.items()]) + "\n").encode("utf-8"))
elif data == "reload":
self.fp.reload_config()
elif data.startswith("claim"):
data = data.split(" ")
if len(data) == 3:
self.fp.queue_all({"type": "claim", "extension": data[1], "token": data[2]})
else:
self.request.sendall("error: You have to specify calling extension and token\n".encode("utf-8"))
else:
self.request.sendall("Unknown command, type 'help'\n".encode("utf-8"))
def finish(self):
self.controller.handlers.remove(self.handler_write)
return HandleRequest
def run(self):
logger.info("starting server")
class Server(socketserver.ThreadingTCPServer):
allow_reuse_address = True
with Server((self.fp.config.controller.host, self.fp.config.controller.port), self.get_handler()) as server:
threading.Thread(target=server.serve_forever).start()
while True:
msg = self.fp.queues["controller"].get()
self.fp.queues["controller"].task_done()
if msg.get("type") == "stop":
logger.info("stopping server")
server.shutdown()
for h in self.handlers:
h.send(b'\0')
break