diff --git a/Shared/Highway.py b/Shared/Highway.py index 511feda..91c7de4 100644 --- a/Shared/Highway.py +++ b/Shared/Highway.py @@ -3,16 +3,19 @@ import gzip import Routing import struct import json +from Utils import capture_trace from threading import Lock from ws4py.websocket import WebSocket from ws4py.client.threadedclient import WebSocketClient INDEXED_DICT = 5 +NoneType = None.__class__ DATA_TYPES = {str : 0, dict : 1, list : 1, bytes : 2, - int : 3, float : 4, INDEXED_DICT : 5} + int : 3, float : 4, INDEXED_DICT : 5, + NoneType : 6} def reverse_dict(dict): @@ -31,6 +34,7 @@ COMPRESSION_LEVEL = 3 + class ConvertFailedError(ValueError): def __init__(self): super(ValueError, self).__init__("conversion failed (invalid data type supplied)") @@ -42,6 +46,22 @@ class Metadata: self.m_route = m_route +class Meta: + def run(self, data, handler): + if type(data) is dict: + handler.peer_exchange_routes = data + Logging.info("Received peer exchange routes: %s" % str(data)) + handler.peer_reverse_exchange_routes = reverse_dict(handler.peer_exchange_routes) + if issubclass(handler.__class__, Client): + handler.send(handler.exchange_routes, META_ROUTE, indexed_dict=True) + Routing.launch_routes(handler.routes, handler) + try: + handler.ready() + except AttributeError: + Logging.warning("Consider to add a ready method to your handler. " + "Without one, it's pretty much useless.") + + def create_metadata(data_type, converted_route, indexed_dict=False): return struct.pack("bh", @@ -59,6 +79,8 @@ def prepare_data(data): data = data.encode() elif original_data_type is dict or original_data_type is list: data = json.dumps(data, separators=(',',':')).encode() + elif original_data_type is NoneType: + data = "".encode() return data, original_data_type @@ -66,14 +88,6 @@ def pack_message(data, exchange_route, compression_level, debug=False, indexed_dict=False): data, original_data_type = prepare_data(data) data = gzip.compress(data, compression_level) - """ - if debug: - data_repr = str(data).replace("\n", " ") - if len(data_repr) > 80: - data_repr = data_repr[:80] + "..." - Logging.info("Packaged '%s' on route '%s': %s" % - (original_data_type.__name__, route, data_repr)) - """ return create_metadata(original_data_type, exchange_route, indexed_dict=indexed_dict) + data @@ -84,8 +98,6 @@ def parse_metadata(message): return Metadata(metadata[0], metadata[1]) - - def parse_message(data, data_type): data = convert_data(gzip.decompress(data[4:]), data_type) return data @@ -107,7 +119,11 @@ def convert_data(data, data_type, debug=False): for key in data: indexed_data[int(key)] = data[key] data = indexed_data + elif data_type == NoneType: + data = None except Exception: + Logging.error("Data conversion failed.") + capture_trace() return None return data @@ -115,38 +131,58 @@ def convert_data(data, data_type, debug=False): class Shared: def setup(self, routes, compression_level=COMPRESSION_LEVEL, debug=False): self.routes = routes + self.routes["meta"] = Meta() self.compression_level = compression_level - self.address, self.port = self.peer_address self.debug = debug - def opened(self): + def override_methods(self): + self.opened = self.patched_opened + self.received_message = self.patched_received_message + self.raw_send = self.send + self.send = self.patched_send + + + def patched_opened(self): + self.address, self.port = self.peer_address self.routes = Routing.create_routes(self.routes, self) + self.exchange_routes = Routing.create_exchange_map(self.routes) + self.reverse_exchange_routes = reverse_dict(self.exchange_routes) + # Peer routes have not been received yet. As per convention the meta route + # has to exist and we need it for our first send to succeed (otherwise it + # would fail during route lookup + self.peer_exchange_routes = {-1 : META_ROUTE} + self.peer_reverse_exchange_routes = reverse_dict(self.peer_exchange_routes) + try: + self.post_opened() + except AttributeError: + pass - def received_message(self, message): + def patched_received_message(self, message): message = message.data metadata = parse_metadata(message) try: - route = self.routes[self.exchange_routes[metadata.m_route]] + route = self.exchange_routes[metadata.m_route] except KeyError: self.send(Handler.INVALID_ROUTE, META_ROUTE) Logging.error("Received message with non-existing route '%d' from '%s:%d'" % ( metadata.m_route, self.address, self.port)) return - data = convert_data(message, metadata.data_type) + data = parse_message(message, metadata.data_type) if self.debug: data_repr = str(data).replace("\n", " ") if len(data_repr) > 80: data_repr = data_repr[:80] + "..." Logging.info("Received '%s' on route '%s': %s (%s:%d)" % ( - type(data).__name__, route, data_repr, self.address, + type(data).__name__ if not metadata.data_type == INDEXED_DICT else "indexed_dict", + route, data_repr, self.address, self.port)) self.routes[route].run(data, self) def patched_send(self, data, route, indexed_dict=False): - self.raw_send(pack_message(data, self.reverse_exchange_routes[route], + self.raw_send(pack_message(data, self.peer_reverse_exchange_routes[route], self.compression_level, debug=self.debug, indexed_dict=indexed_dict), binary=True) if self.debug: @@ -159,56 +195,17 @@ class Shared: - -class Server(Shared, WebSocket): +class Server(WebSocket, Shared): def setup(self, routes, compression_level=COMPRESSION_LEVEL, debug=False): super().setup(routes, compression_level, debug=debug) - # Send replacement can't be done in the parent setup method because - # both client and server use a method from a different module. - self.raw_send = self.send - self.send = self.patched_send - - - - def opened(self): - super().opened() - self.exchange_routes = Routing.create_exchange_map(self.routes) - self.reverse_exchange_routes = reverse_dict(self.exchange_routes) - self.patched_send(self.exchange_routes, META_ROUTE, indexed_dict=True) - Routing.launch_routes(self.routes, self) - + self.override_methods() + def post_opened(self): + self.send(self.exchange_routes, META_ROUTE, indexed_dict=True) class Client(WebSocketClient, Shared): def setup(self, routes, compression_level=COMPRESSION_LEVEL, debug=False): super().setup(routes, compression_level, debug=debug) - # Send replacement can't be done in the parent setup method because - # both client and server use a method from a different module. - self.metadata_lock = Lock() - self.raw_send = self.send - self.send = self.patched_send - self._received_message = self.received_message - self.received_message = self.receive_routes - - - def receive_routes(self, message): - self.metadata_lock.acquire() - message = message.data - metadata = parse_metadata(message) - if metadata.m_route != -1: - Logging.error("Invalid route on first message (not meta)") - self.close() - return - data = parse_message(message, metadata.data_type) - if Routing.validate_exchange_map(data): - self.exchange_routes = data - self.reverse_exchange_routes = reverse_dict(self.exchange_routes) - self.received_message = self._received_message - Logging.info("Routes successfully exchanged: %s" % str(self.exchange_routes)) - else: - Logging.error("Invalid exchange map.") - self.close() - return - self.metadata_lock.release() + self.override_methods()