diff --git a/Server/Server.py b/Server/Server.py index 9a46bbe..37d2ff6 100644 --- a/Server/Server.py +++ b/Server/Server.py @@ -1,7 +1,6 @@ import Logging import Routing import Config -import DataTypes #from .AsyncServer import Server from .Broadcast import Broadcast @@ -15,10 +14,11 @@ import struct from subprocess import Popen, PIPE from wsgiref.simple_server import make_server -from ws4py.websocket import WebSocket from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler from ws4py.server.wsgiutils import WebSocketWSGIApplication +from Highway import Server + """ WALLABY_SYNC_ROUTE = "w_sync" SUBLIME_SYNC_ROUTE = "s_sync" @@ -26,7 +26,7 @@ SUBLIME_SYNC_ROUTE = "s_sync" class Info(Routing.ServerRoute): def run(self, data, handler): - handler.sock.send({"editor" : len(handler.broadcast.channels[Handler.Channels.EDITOR]), + handler.send({"editor" : len(handler.broadcast.channels[Handler.Channels.EDITOR]), "wallaby" : len(handler.broadcast.channels[Handler.Channels.WALLABY]), "routes" : list(handler.routes.keys())}, "info") @@ -38,10 +38,10 @@ class StdStream(Routing.ServerRoute): def run(self, data, handler): if type(data) is str: if handler in self.stream_to.keys(): - self.stream_to[handler].sock.send(data, "std_stream") + self.stream_to[handler].send(data, "std_stream") elif type(data) is dict: if handler in self.stream_to.keys(): - self.stream_to[handler].sock.send(data, "std_stream") + self.stream_to[handler].send(data, "std_stream") del self.stream_to[handler] @@ -63,13 +63,13 @@ class WallabyControl(Routing.ServerRoute): if data == "list_wallaby_controllers": wallaby_controllers = {} for wallaby_handler in handler.broadcast.channels[Handler.Channels.WALLABY]: - wallaby_controllers["%s:%d" % (wallaby_handler.sock.address, wallaby_handler.sock.port)] = wallaby_handler.name - handler.sock.send({"wallaby_controllers" : wallaby_controllers}, "wallaby_control") + wallaby_controllers["%s:%d" % (wallaby_handler.address, wallaby_handler.port)] = wallaby_handler.name + handler.send({"wallaby_controllers" : wallaby_controllers}, "wallaby_control") elif data == "list_programs": - handler.sock.send({"programs" : self.programs}, "wallaby_control") + handler.send({"programs" : self.programs}, "wallaby_control") elif type(data) is dict: for wallaby_handler in handler.broadcast.channels[Handler.Channels.WALLABY]: - address_pair = "%s:%d" % (wallaby_handler.sock.address, wallaby_handler.sock.port) + address_pair = "%s:%d" % (wallaby_handler.address, wallaby_handler.port) if address_pair in data.keys(): if type(data[address_pair]) is list: for action in data[address_pair]: @@ -80,7 +80,7 @@ class WallabyControl(Routing.ServerRoute): if action in self.actions_with_params.keys(): self.actions_with_params[action](data[address_pair][action], wallaby_handler, handler) return - handler.sock.send("Wallaby not connected anymore.", "error_report") + handler.send("Wallaby not connected anymore.", "error_report") def disconnect(self, wallaby_handler, handler): @@ -88,20 +88,20 @@ class WallabyControl(Routing.ServerRoute): def reboot(self, wallaby_handler, handler): - wallaby_handler.sock.send("reboot", "wallaby_control") + wallaby_handler.send("reboot", "wallaby_control") def shutdown(self, wallaby_handler, handler): - wallaby_handler.sock.send("shutdown", "wallaby_control") + wallaby_handler.send("shutdown", "wallaby_control") def run_program(self, program, wallaby_handler, handler): handler.routes["std_stream"].stream_to.update({wallaby_handler : handler}) - wallaby_handler.sock.send({"run" : program}, "wallaby_control") + wallaby_handler.send({"run" : program}, "wallaby_control") def stop_programs(self, wallaby_handler, handler): - wallaby_handler.sock.send("stop", "wallaby_control") + wallaby_handler.send("stop", "wallaby_control") class Compile(Routing.ServerRoute): @@ -142,7 +142,7 @@ class Compile(Routing.ServerRoute): for line in p.communicate(): result += line.decode() if handler != None: - handler.sock.send({"failed" : error, "returned" : result, "relpath" : relpath}, self.route) + handler.send({"failed" : error, "returned" : result, "relpath" : relpath}, self.route) @@ -168,100 +168,31 @@ class GetInfo(Routing.ServerRoute): "Unknown (will not subscribe to broadcast)")) def start(self, handler): - handler.sock.send("", self.route) + handler.send("", self.route) -class Handler(WebSocket): - DATA_TYPES = {str : DataTypes.str, dict : DataTypes.json, - list : DataTypes.json, bytes : DataTypes.bin, - int : DataTypes.int, float : DataTypes.float} - - INVALID_ROUTE = 1 - INVALID_METADATA_LAYOUT = 2 - INVALID_DATA_TYPE = 3 - +class Handler(Server): class Channels: EDITOR = 1 WALLABY = 2 - def setup(self, routes, broadcast, compression_level): - Logging.info("Handler for '%s:%d' initalised." % (self.sock.address, self.sock.port)) + def setup(self, routes, broadcast, compression_level, debug=False): + super().setup(routes, compression_level, debug=debug) + Logging.info("Handler for '%s:%d' initalised." % (self.address, self.port)) self.broadcast = broadcast - self.compression_level = compression_level self.channel = None - self.routes = Routing.create_routes(routes, self) - self.exchange_routes = Routing.create_exchange_map(routes) self.name = "Unknown" - self.raw_send = self.send - self.send = self.patched_send - - def patched_send(self, data, route): - length = len(data) - original_data_type = type(data) - if self.debug: - data_repr = str(data).replace("\n", " ") - if len(data_repr) > 80: - data_repr = data_repr[:80] + "..." - Logging.info("Sending %d-long '%s' on route '%s': %s (%s:%d)" % - (length, original_data_type.__name__, route, data_repr, self.address, self.port)) - route = self.exchange_routes[route] - if original_data_type in ESock.DATA_TYPES: - data_type = ESock.DATA_TYPES[original_data_type] - else: - data_type = DataTypes.other - if original_data_type is str: - data = data.encode() - elif original_data_type is dict or original_data_type is list: - data = json.dumps(data, separators=(',',':')).encode() - - data = gzip.compress(data, self.compression_level) - self.raw_send(struct.pack("ch", data_type, route) + data) - def opened(self): - self.send(self.exchange_routes, "meta") - - - def received_message(self, message): - try: - metadata = struct.unpack("ch", message[:4]) - except struct.error: - Logging.error("Invalid metadata layout: '%s:%d'" % (self.address, self.port)) - self.send(Handler.INVALID_METADATA_LAYOUT, "meta") - self.close() - return - data_type = metadata[0] - try: - route = self.routes[self.exchange_routes[metadata[1]]] - except KeyError: - self.send(Handler.INVALID_ROUTE, "meta") - self.close() - return - data = message[4:] - try: - data = convert_data(gzip.decompress(data), data_type) - except ConvertFailedError: - Logging.error("Invalid data type: '%s' ('%s:%d')" % (data_type.decode(), self.address, self.port)) - self.send(Handler.INVALID_DATA_TYPE, "meta") - self.close() - if self.debug: - data_repr = str(data).replace("\n", " ") - if len(data_repr) > 80: - data_repr = data_repr[:80] + "..." - Logging.info("Received %d-long '%s' on route '%s': %s (%s:%d)" % (len(data), type(data).__name__, route, data_repr, self.address, self.port)) - self.routes[route].run(data, self) - - - - - def closed(self): + def closed(self, code, reason): if self.channel != None: self.broadcast.remove(self, self.channel) - Logging.info("'%s:%d' disconnected." % (self.sock.address, self.sock.port)) - + Logging.info("'%s:%d' disconnected." % (self.address, self.port)) + """ def __repr__(self): - return "%s: %s:%d" % (self.name, self.sock.address, self.sock.port) + return "%s: %s:%d" % (self.name, self.address, self.port) + """ def folder_validator(folder): @@ -308,7 +239,8 @@ s_sync = SyncServer(config.source_path, Handler.Channels.SUBLIME, debug=config.d server = make_server(config.server_address[0], config.server_address[1], server_class=WSGIServer, handler_class=WebSocketWSGIRequestHandler, app=WebSocketWSGIApplication(handler_cls=Handler, - handler_args={"broadcast" : broadcast, "compression_level" : config.compression_level, + handler_args={"debug" : config.debug, "broadcast" : broadcast, + "compression_level" : config.compression_level, "routes" : {"info" : Info(), #"wallaby_control" : WallabyControl(), "get_info" : GetInfo(), "compile" : compile, "std_stream" : StdStream()}}))