#!/usr/bin/env python3 import logging import selectors import socket import socketserver import time import threading logger = logging.getLogger("fieldpoc.controller") class Controller: def __init__(self, fp): self.fp = fp self.handlers = [] # collect control sockets for handlers, so we can shut them down on demand def get_handler(self): class HandleRequest(socketserver.BaseRequestHandler): fp = self.fp controller = self def setup(self): self.handler_read, self.handler_write = socket.socketpair() self.controller.handlers.append(self.handler_write) def handle(self): self.request.sendall("FieldPOC interactive controller\n".encode("utf-8")) sel = selectors.DefaultSelector() sel.register(self.handler_read, selectors.EVENT_READ) sel.register(self.request, selectors.EVENT_READ) while True: self.request.sendall("> ".encode("utf-8")) for key, mask in sel.select(): if key.fileobj == self.handler_read: self.request.sendall("\ndisconnecting\n".encode("utf-8")) return elif key.fileobj == self.request: data = self.request.recv(1024).decode("utf-8").strip() if data == "help": self.request.sendall("""Availiable commands: help Show this info handlers Show currently running handlers sync Start syncing queues Show queue stats reload Reload extension config file claim claim dect extension exit Disconnect """.encode("utf-8")) elif data == "": continue elif data == "quit" or data == "exit": self.request.sendall("disconnecting\n".encode("utf-8")) return elif data == "handlers": self.request.sendall(("\n".join([str(h) for h in self.controller.handlers]) + "\n").encode("utf-8")) elif data == "sync": self.fp.queue_all({"type": "sync"}) elif data == "queues": self.request.sendall(("\n".join(["{} {}".format(name, queue.qsize()) for name, queue in self.fp.queues.items()]) + "\n").encode("utf-8")) elif data == "reload": self.fp.reload_config() elif data.startswith("claim"): data = data.split(" ") if len(data) == 3: self.fp.queue_all({"type": "claim", "extension": data[1], "token": data[2]}) else: self.request.sendall("error: You have to specify calling extension and token\n".encode("utf-8")) else: self.request.sendall("Unknown command, type 'help'\n".encode("utf-8")) def finish(self): self.controller.handlers.remove(self.handler_write) return HandleRequest def run(self): logger.info("starting server") class Server(socketserver.ThreadingTCPServer): allow_reuse_address = True with Server((self.fp.config.controller.host, self.fp.config.controller.port), self.get_handler()) as server: threading.Thread(target=server.serve_forever).start() while True: msg = self.fp.queues["controller"].get() self.fp.queues["controller"].task_done() if msg.get("type") == "stop": logger.info("stopping server") server.shutdown() for h in self.handlers: h.send(b'\0') break