diff --git a/Shared/Highway.py b/Shared/Highway.py index eb8aef6..033a230 100644 --- a/Shared/Highway.py +++ b/Shared/Highway.py @@ -2,6 +2,9 @@ import Logging import gzip import struct import json +import binascii +import os + from Utils import capture_trace from threading import Lock @@ -30,10 +33,26 @@ INVALID_DATA_TYPE = 3 META_ROUTE = "meta" META_ROUTE_INDEX = 0 +PIPE_ROUTE = "pipe" + PACK_FORMAT = "BH" METADATA_LENGTH = struct.calcsize(PACK_FORMAT) +# Percise enough for now +PIPE_ID_LENGTH = 8 +# Perspective: Packaged by client 1 for server +PIPE_SRC_PACK_FORMAT = "BH%is" % PIPE_ID_LENGTH +PIPE_SRC_METADATA_LENGTH = struct.calcsize(PIPE_SRC_PACK_FORMAT) +PIPE_SRC_METADATA_LENGTH = struct.calcsize(PIPE_SRC_PACK_FORMAT) + +# Perspective: Packaged by server for client 2 +PIPE_DEST_PACK_FORMAT = "B%is" % PIPE_ID_LENGTH +PIPE_DEST_METADATA_LENGTH = struct.calcsize(PIPE_DEST_PACK_FORMAT) +PIPE_DEST_METADATA_LENGTH = struct.calcsize(PIPE_DEST_PACK_FORMAT) + + +# Routing related class Route: def run(self, data, handler): pass @@ -42,6 +61,11 @@ class Route: pass +class Pipe(Route): + def run(self, data, peer, handler): + pass + + def create_routes(routes): routes = routes.copy() for prefix in routes: @@ -59,10 +83,10 @@ def launch_routes(created_routes, handler): def create_exchange_map(routes): - exchange_map = {0 : "meta"} + exchange_map = {0 : META_ROUTE} exchange_id = 1 for route in routes: - if route != "meta": + if route != META_ROUTE: exchange_map[exchange_id] = route exchange_id += 1 return exchange_map @@ -80,6 +104,7 @@ class ConvertFailedError(ValueError): super(ValueError, self).__init__("conversion failed (invalid data type supplied)") +# Built-in routes class Meta(Route): def run(self, data, handler): if type(data) is dict: @@ -109,23 +134,53 @@ class Meta(Route): Logging.error("Unknown error code.") +class ServerPipe(Route): + def start(self, handler): + if issubclass(handler.__class__, Client): + Logging.error("Server-exclusive route") + handler.close() + + + def run(self, data, handler): + data_type, m_route, id_ = parse_pipe_src_metadata(data) + id_ = id_.decode() + data = convert_data(data[PIPE_SRC_METADATA_LENGTH:], data_type) + route = handler.exchange_routes[m_route] + if id_ in handler.peers: + if handler.debug: + Logging.info("Forwarding to '%s' on route '%s'" % (id_, route), + color=Logging.LIGHT_YELLOW) + handler.peers[id_].send(pack_pipe_dest_message(data, handler.id_), route) + else: + Logging.error("'%s' is not present in peers." % id_) + + +class DummyPipe(Route): + def run(self, data, handler): + pass + + + + + +# Message packing and unpacking def create_metadata(data_type, converted_route, indexed_dict=False): return struct.pack(PACK_FORMAT, DATA_TYPES[data_type] if not indexed_dict else DATA_TYPES[INDEXED_DICT], converted_route) -def prepare_data(data): - original_data_type = type(data) - if original_data_type is str: - data = data.encode() - elif original_data_type in (int, float): - data = str(data).encode() - elif original_data_type is dict or original_data_type is list: - data = json.dumps(data, separators=(',',':')).encode() - elif original_data_type is NoneType: - data = "".encode() - return data, original_data_type +# Perspective: Packaged by client 1 for server +def create_pipe_src_metadata(data_type, converted_route, id_, indexed_dict=False): + return struct.pack(PIPE_SRC_PACK_FORMAT, + DATA_TYPES[data_type] if not indexed_dict else DATA_TYPES[INDEXED_DICT], + converted_route, id_.encode()) + +# Perspective: Packaged by server for client 2 +def create_pipe_dest_metadata(data_type, id_, indexed_dict=False): + return struct.pack(PIPE_DEST_PACK_FORMAT, + DATA_TYPES[data_type] if not indexed_dict else DATA_TYPES[INDEXED_DICT], + id_.encode()) def pack_message(data, exchange_route, @@ -135,12 +190,33 @@ def pack_message(data, exchange_route, indexed_dict=indexed_dict) + data +def pack_pipe_src_message(data, exchange_route, id_, debug=False, + indexed_dict=False): + data, original_data_type = prepare_data(data) + return create_pipe_src_metadata(original_data_type, exchange_route, id_, + indexed_dict=indexed_dict) + data + +def pack_pipe_dest_message(data, id_, debug=False, + indexed_dict=False): + data, original_data_type = prepare_data(data) + return create_pipe_dest_metadata(original_data_type, id_, + indexed_dict=indexed_dict) + data + def parse_metadata(message): metadata = struct.unpack(PACK_FORMAT, message[:METADATA_LENGTH]) return REVERSE_DATA_TYPES[metadata[0]], metadata[1] +def parse_pipe_src_metadata(message): + metadata = struct.unpack(PIPE_SRC_PACK_FORMAT, message[:PIPE_SRC_METADATA_LENGTH]) + return REVERSE_DATA_TYPES[metadata[0]], metadata[1], metadata[2] + + +def parse_pipe_dest_metadata(message): + metadata = struct.unpack(PIPE_DEST_PACK_FORMAT, message[:PIPE_DEST_METADATA_LENGTH]) + return REVERSE_DATA_TYPES[metadata[0]], metadata[1] + def convert_data(data, data_type, debug=False): try: if data_type == str: @@ -171,10 +247,23 @@ def convert_data(data, data_type, debug=False): return data +def prepare_data(data): + original_data_type = type(data) + if original_data_type is str: + data = data.encode() + elif original_data_type in (int, float): + data = str(data).encode() + elif original_data_type is dict or original_data_type is list: + data = json.dumps(data, separators=(',',':')).encode() + elif original_data_type is NoneType: + data = "".encode() + return data, original_data_type + + class Shared: def setup(self, routes, debug=False): self.routes = routes - self.routes["meta"] = Meta() + self.routes[META_ROUTE] = Meta() self.routes = create_routes(self.routes) self.reverse_routes = reverse_dict(self.routes) self.exchange_routes = create_exchange_map(self.routes) @@ -222,7 +311,14 @@ class Shared: route, data_repr, self.address, self.port)) try: - self.routes[route].run(data, self) + route = self.routes[route] + if not issubclass(route.__class__, Pipe): + route.run(data, self) + else: + data_type, peer = parse_pipe_dest_metadata(data) + peer = peer.decode() + data = convert_data(data[PIPE_DEST_METADATA_LENGTH:], data_type, debug=self.debug) + route.run(data, peer, self) except KeyError: Logging.warning("'%s' does not exist." % route) @@ -247,16 +343,50 @@ class Shared: class Server(WebSocket, Shared): - def setup(self, routes, debug=False): + def setup(self, routes, websockets, piping=False, debug=False): + if piping: + routes[PIPE_ROUTE] = ServerPipe() super().setup(routes, debug=debug) + self.websockets = websockets + self._last_websockets = self.websockets.copy() + self._peers = {} + id_ = binascii.b2a_hex(os.urandom(PIPE_ID_LENGTH // 2)).decode() + while id_ in self.peers: + id_ = binascii.b2a_hex(os.urandom(PIPE_ID_LENGTH // 2)).decode() + self.id_ = id_ self.override_methods() + @property + def peers(self): + if self.websockets != self._last_websockets: + if self.debug: + Logging.info("Rebuilding peers. (currently connected: %i)" % + len(self.websockets)) + self._peers = {} + for websocket in self.websockets: + self._peers[self.websockets[websocket].id_] = self.websockets[websocket] + self._last_websockets = self.websockets.copy() + return self._peers + + def post_opened(self): self.send(self.exchange_routes, META_ROUTE, indexed_dict=True) class Client(WebSocketClient, Shared): - def setup(self, routes, debug=False): + def setup(self, routes, piping=False, debug=False): + if piping: + self.pipe = self.__pipe + routes[PIPE_ROUTE] = DummyPipe() super().setup(routes, debug=debug) + self.override_methods() + + def __pipe(self, data, route, id_): + try: + self.send(pack_pipe_src_message(data, + self.peer_reverse_exchange_routes[route], id_, debug=self.debug), + PIPE_ROUTE) + except KeyError: + Logging.warning("'%s' does not exist." % route) \ No newline at end of file