diff --git a/Server/Server.py b/Server/Server.py index 49ae8f8..ed8e84e 100644 --- a/Server/Server.py +++ b/Server/Server.py @@ -1,10 +1,11 @@ import Logging import Routing import Config +import DataTypes from Sync import SyncServer -from .AsyncServer import Server +#from .AsyncServer import Server from .Broadcast import Broadcast import json @@ -12,19 +13,24 @@ import os import subprocess import re import platform +import struct from subprocess import Popen, PIPE +from wsgiref.simple_server import make_server +from ws4py.websocket import WebSocket +from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler +from ws4py.server.wsgiutils import WebSocketWSGIApplication +""" WALLABY_SYNC_ROUTE = "w_sync" SUBLIME_SYNC_ROUTE = "s_sync" - +""" class Info(Routing.ServerRoute): def run(self, data, handler): - handler.sock.send("Connected ST clients: %d\nConnected Wallabies: %d\nAvaliable routes: %s" % ( - len(handler.broadcast.channels[Handler.Channels.SUBLIME]), - len(handler.broadcast.channels[Handler.Channels.WALLABY]), - ", ".join([route for route in list(handler.routes.keys())])), "info") + handler.sock.send({"editor" : len(handler.broadcast.channels[Handler.Channels.EDITOR]), + "wallaby" : len(handler.broadcast.channels[Handler.Channels.WALLABY]), + "routes" : list(handler.routes.keys())}, "info") class StdStream(Routing.ServerRoute): @@ -145,13 +151,13 @@ class Compile(Routing.ServerRoute): class GetInfo(Routing.ServerRoute): REQUIRED = [Routing.ROUTE] - SUBLIME_TEXT = "s" + EDITOR = "e" WALLABY = "w" def run(self, data, handler): if "type" in data: - if data["type"] == GetInfo.SUBLIME_TEXT: - handler.channel = Handler.Channels.SUBLIME + if data["type"] == GetInfo.EDITOR: + handler.channel = Handler.Channels.EDITOR handler.broadcast.add(handler, handler.channel) elif data["type"] == GetInfo.WALLABY: handler.channel = Handler.Channels.WALLABY @@ -159,7 +165,7 @@ class GetInfo(Routing.ServerRoute): if "name" in data: handler.name = data["name"] Logging.info("'%s:%d' has identified as a %s client." % (handler.info[0], handler.info[1], - "Sublime Text" if handler.channel == Handler.Channels.SUBLIME else + "Editor" if handler.channel == Handler.Channels.EDITOR else "Wallaby" if handler.channel == Handler.Channels.WALLABY else "Unknown (will not subscribe to broadcast)")) @@ -167,27 +173,91 @@ class GetInfo(Routing.ServerRoute): handler.sock.send("", self.route) -class Handler(Server.Handler): +class Handler(WebSocket): + DATA_TYPES = {str : DataTypes.str, dict : DataTypes.json, + list : DataTypes.json, bytes : DataTypes.bin, + int : DataTypes.int, float : DataTypes.float} + + INVALID_ROUTE = 1 + INVALID_METADATA_LAYOUT = 2 + INVALID_DATA_TYPE + class Channels: - SUBLIME = 1 + EDITOR = 1 WALLABY = 2 - def setup(self, routes, broadcast): + def setup(self, routes, broadcast, compression_level): Logging.info("Handler for '%s:%d' initalised." % (self.sock.address, self.sock.port)) self.broadcast = broadcast + self.compression_level = compression_level self.channel = None self.routes = Routing.create_routes(routes, self) + self.exchange_routes = Routing.create_exchange_map(routes) self.name = "Unknown" + self.raw_send = self.send + self.send = self.patched_send - - def handle(self, data, route): - if route in self.routes: - self.routes[route].run(data, self) + def patched_send(self, data, route): + length = len(data) + original_data_type = type(data) + if self.debug: + data_repr = str(data).replace("\n", " ") + if len(data_repr) > 80: + data_repr = data_repr[:80] + "..." + Logging.info("Sending %d-long '%s' on route '%s': %s (%s:%d)" % + (length, original_data_type.__name__, route, data_repr, self.address, self.port)) + route = self.exchange_routes[route] + if original_data_type in ESock.DATA_TYPES: + data_type = ESock.DATA_TYPES[original_data_type] else: - self.sock.send("Invalid route '%s'" % route, "error_report") + data_type = DataTypes.other + if original_data_type is str: + data = data.encode() + elif original_data_type is dict or original_data_type is list: + data = json.dumps(data, separators=(',',':')).encode() + + data = gzip.compress(data, self.compression_level) + self.raw_send(struct.pack("ch", data_type, route) + data) - def finish(self): + def opened(self): + self.send(self.exchange_routes, "meta") + + + def received_message(self, message): + try: + metadata = struct.unpack("ch", message[:4]) + except struct.error: + Logging.error("Invalid metadata layout: '%s:%d'" % (self.address, self.port)) + self.send(Handler.INVALID_METADATA_LAYOUT, "meta") + self.close() + return + data_type = metadata[0] + try: + route = self.routes[self.exchange_routes[metadata[1]]] + except KeyError: + self.send(Handler.INVALID_ROUTE, "meta") + self.close() + return + data = message[4:] + try: + data = convert_data(gzip.decompress(data), data_type) + except ConvertFailedError: + Logging.error("Invalid data type: '%s' ('%s:%d')" % (data_type.decode(), self.address, self.port)) + self.send(Handler.INVALID_DATA_TYPE, "meta") + self.close() + if self.debug: + data_repr = str(data).replace("\n", " ") + if len(data_repr) > 80: + data_repr = data_repr[:80] + "..." + Logging.info("Received %d-long '%s' on route '%s': %s (%s:%d)" % (len(data), type(data).__name__, route, data_repr, self.address, self.port)) + # Route needs to be reintroduced + self.routes[route].run(data, self) + + + + + def closed(self): if self.channel != None: self.broadcast.remove(self, self.channel) Logging.info("'%s:%d' disconnected." % (self.sock.address, self.sock.port)) @@ -221,7 +291,10 @@ except FileNotFoundError: config.write_to_file(CONFIG_PATH) config = config.read_from_file(CONFIG_PATH) -server = Server(config.server_address, debug=config.debug, compression_level=config.compression_level) + + + +#server = Server(config.server_address, debug=config.debug, compression_level=config.compression_level) broadcast = Broadcast() # Populating broadcast channels with all channels defined in Handler.Channels @@ -230,20 +303,37 @@ for channel in Handler.Channels.__dict__: broadcast.add_channel(Handler.Channels.__dict__[channel]) compile = Compile(config.source_path, config.binary_path) +""" w_sync = SyncServer(config.binary_path, Handler.Channels.WALLABY, debug=config.debug, deleted_db_path="deleted-w.pickle") s_sync = SyncServer(config.source_path, Handler.Channels.SUBLIME, debug=config.debug, deleted_db_path="deleted-s.pickle", modified_hook=compile.compile) +""" +server = make_server(config.server_address[0], config.server_address[1], + server_class=WSGIServer, handler_class=WebSocketWSGIRequestHandler, + app=WebSocketWSGIApplication(handler_cls=Handler, + handler_args={"broadcast" : broadcast, "compression_level" : config.compression_level, + "routes" : {"info" : Info(), #"wallaby_control" : WallabyControl(), + "get_info" : GetInfo(), "compile" : compile, + "std_stream" : StdStream()}})) +server.initialize_websockets_manager() + +try: + server.serve_forever() +except KeyboardInterrupt: + Logging.header("Gracefully shutting down server.") + server.server_close() + Logging.success("Server shutdown successful.") +""" try: Logging.header("fl0w server started on '%s:%d'" % (config.server_address[0], config.server_address[1])) server.run(Handler, {"broadcast" : broadcast, "routes" : {"info" : Info(), "wallaby_control" : WallabyControl(), - "get_info" : GetInfo(), "compile" : compile, "std_stream" : StdStream(), - WALLABY_SYNC_ROUTE : w_sync, - SUBLIME_SYNC_ROUTE : s_sync}}) + "get_info" : GetInfo(), "compile" : compile, "std_stream" : StdStream()}}) except KeyboardInterrupt: Logging.header("Gracefully shutting down server.") w_sync.stop() s_sync.stop() server.stop() Logging.success("Server shutdown successful.") +""" \ No newline at end of file