Added initial piping support, refactoring

A Highway pipe is a lightweight client -> client connection piped through the server.
The advantages of this approach are obvious:
* No need to implement the concept yourself
* It's incredibly useful

It's opt-in at the moment because it definitely isn't anywhere close to stable and requires a lot of boiler-plate code.
Documentation will be avaliable later on.
This commit is contained in:
Philip Trauner 2016-12-01 22:46:22 +01:00
parent 203d62db0d
commit 37817afa51

View file

@ -2,6 +2,9 @@ import Logging
import gzip import gzip
import struct import struct
import json import json
import binascii
import os
from Utils import capture_trace from Utils import capture_trace
from threading import Lock from threading import Lock
@ -30,10 +33,26 @@ INVALID_DATA_TYPE = 3
META_ROUTE = "meta" META_ROUTE = "meta"
META_ROUTE_INDEX = 0 META_ROUTE_INDEX = 0
PIPE_ROUTE = "pipe"
PACK_FORMAT = "BH" PACK_FORMAT = "BH"
METADATA_LENGTH = struct.calcsize(PACK_FORMAT) METADATA_LENGTH = struct.calcsize(PACK_FORMAT)
# Percise enough for now
PIPE_ID_LENGTH = 8
# Perspective: Packaged by client 1 for server
PIPE_SRC_PACK_FORMAT = "BH%is" % PIPE_ID_LENGTH
PIPE_SRC_METADATA_LENGTH = struct.calcsize(PIPE_SRC_PACK_FORMAT)
PIPE_SRC_METADATA_LENGTH = struct.calcsize(PIPE_SRC_PACK_FORMAT)
# Perspective: Packaged by server for client 2
PIPE_DEST_PACK_FORMAT = "B%is" % PIPE_ID_LENGTH
PIPE_DEST_METADATA_LENGTH = struct.calcsize(PIPE_DEST_PACK_FORMAT)
PIPE_DEST_METADATA_LENGTH = struct.calcsize(PIPE_DEST_PACK_FORMAT)
# Routing related
class Route: class Route:
def run(self, data, handler): def run(self, data, handler):
pass pass
@ -42,6 +61,11 @@ class Route:
pass pass
class Pipe(Route):
def run(self, data, peer, handler):
pass
def create_routes(routes): def create_routes(routes):
routes = routes.copy() routes = routes.copy()
for prefix in routes: for prefix in routes:
@ -59,10 +83,10 @@ def launch_routes(created_routes, handler):
def create_exchange_map(routes): def create_exchange_map(routes):
exchange_map = {0 : "meta"} exchange_map = {0 : META_ROUTE}
exchange_id = 1 exchange_id = 1
for route in routes: for route in routes:
if route != "meta": if route != META_ROUTE:
exchange_map[exchange_id] = route exchange_map[exchange_id] = route
exchange_id += 1 exchange_id += 1
return exchange_map return exchange_map
@ -80,6 +104,7 @@ class ConvertFailedError(ValueError):
super(ValueError, self).__init__("conversion failed (invalid data type supplied)") super(ValueError, self).__init__("conversion failed (invalid data type supplied)")
# Built-in routes
class Meta(Route): class Meta(Route):
def run(self, data, handler): def run(self, data, handler):
if type(data) is dict: if type(data) is dict:
@ -109,23 +134,53 @@ class Meta(Route):
Logging.error("Unknown error code.") Logging.error("Unknown error code.")
class ServerPipe(Route):
def start(self, handler):
if issubclass(handler.__class__, Client):
Logging.error("Server-exclusive route")
handler.close()
def run(self, data, handler):
data_type, m_route, id_ = parse_pipe_src_metadata(data)
id_ = id_.decode()
data = convert_data(data[PIPE_SRC_METADATA_LENGTH:], data_type)
route = handler.exchange_routes[m_route]
if id_ in handler.peers:
if handler.debug:
Logging.info("Forwarding to '%s' on route '%s'" % (id_, route),
color=Logging.LIGHT_YELLOW)
handler.peers[id_].send(pack_pipe_dest_message(data, handler.id_), route)
else:
Logging.error("'%s' is not present in peers." % id_)
class DummyPipe(Route):
def run(self, data, handler):
pass
# Message packing and unpacking
def create_metadata(data_type, converted_route, indexed_dict=False): def create_metadata(data_type, converted_route, indexed_dict=False):
return struct.pack(PACK_FORMAT, return struct.pack(PACK_FORMAT,
DATA_TYPES[data_type] if not indexed_dict else DATA_TYPES[INDEXED_DICT], DATA_TYPES[data_type] if not indexed_dict else DATA_TYPES[INDEXED_DICT],
converted_route) converted_route)
def prepare_data(data): # Perspective: Packaged by client 1 for server
original_data_type = type(data) def create_pipe_src_metadata(data_type, converted_route, id_, indexed_dict=False):
if original_data_type is str: return struct.pack(PIPE_SRC_PACK_FORMAT,
data = data.encode() DATA_TYPES[data_type] if not indexed_dict else DATA_TYPES[INDEXED_DICT],
elif original_data_type in (int, float): converted_route, id_.encode())
data = str(data).encode()
elif original_data_type is dict or original_data_type is list: # Perspective: Packaged by server for client 2
data = json.dumps(data, separators=(',',':')).encode() def create_pipe_dest_metadata(data_type, id_, indexed_dict=False):
elif original_data_type is NoneType: return struct.pack(PIPE_DEST_PACK_FORMAT,
data = "".encode() DATA_TYPES[data_type] if not indexed_dict else DATA_TYPES[INDEXED_DICT],
return data, original_data_type id_.encode())
def pack_message(data, exchange_route, def pack_message(data, exchange_route,
@ -135,12 +190,33 @@ def pack_message(data, exchange_route,
indexed_dict=indexed_dict) + data indexed_dict=indexed_dict) + data
def pack_pipe_src_message(data, exchange_route, id_, debug=False,
indexed_dict=False):
data, original_data_type = prepare_data(data)
return create_pipe_src_metadata(original_data_type, exchange_route, id_,
indexed_dict=indexed_dict) + data
def pack_pipe_dest_message(data, id_, debug=False,
indexed_dict=False):
data, original_data_type = prepare_data(data)
return create_pipe_dest_metadata(original_data_type, id_,
indexed_dict=indexed_dict) + data
def parse_metadata(message): def parse_metadata(message):
metadata = struct.unpack(PACK_FORMAT, message[:METADATA_LENGTH]) metadata = struct.unpack(PACK_FORMAT, message[:METADATA_LENGTH])
return REVERSE_DATA_TYPES[metadata[0]], metadata[1] return REVERSE_DATA_TYPES[metadata[0]], metadata[1]
def parse_pipe_src_metadata(message):
metadata = struct.unpack(PIPE_SRC_PACK_FORMAT, message[:PIPE_SRC_METADATA_LENGTH])
return REVERSE_DATA_TYPES[metadata[0]], metadata[1], metadata[2]
def parse_pipe_dest_metadata(message):
metadata = struct.unpack(PIPE_DEST_PACK_FORMAT, message[:PIPE_DEST_METADATA_LENGTH])
return REVERSE_DATA_TYPES[metadata[0]], metadata[1]
def convert_data(data, data_type, debug=False): def convert_data(data, data_type, debug=False):
try: try:
if data_type == str: if data_type == str:
@ -171,10 +247,23 @@ def convert_data(data, data_type, debug=False):
return data return data
def prepare_data(data):
original_data_type = type(data)
if original_data_type is str:
data = data.encode()
elif original_data_type in (int, float):
data = str(data).encode()
elif original_data_type is dict or original_data_type is list:
data = json.dumps(data, separators=(',',':')).encode()
elif original_data_type is NoneType:
data = "".encode()
return data, original_data_type
class Shared: class Shared:
def setup(self, routes, debug=False): def setup(self, routes, debug=False):
self.routes = routes self.routes = routes
self.routes["meta"] = Meta() self.routes[META_ROUTE] = Meta()
self.routes = create_routes(self.routes) self.routes = create_routes(self.routes)
self.reverse_routes = reverse_dict(self.routes) self.reverse_routes = reverse_dict(self.routes)
self.exchange_routes = create_exchange_map(self.routes) self.exchange_routes = create_exchange_map(self.routes)
@ -222,7 +311,14 @@ class Shared:
route, data_repr, self.address, route, data_repr, self.address,
self.port)) self.port))
try: try:
self.routes[route].run(data, self) route = self.routes[route]
if not issubclass(route.__class__, Pipe):
route.run(data, self)
else:
data_type, peer = parse_pipe_dest_metadata(data)
peer = peer.decode()
data = convert_data(data[PIPE_DEST_METADATA_LENGTH:], data_type, debug=self.debug)
route.run(data, peer, self)
except KeyError: except KeyError:
Logging.warning("'%s' does not exist." % route) Logging.warning("'%s' does not exist." % route)
@ -247,16 +343,50 @@ class Shared:
class Server(WebSocket, Shared): class Server(WebSocket, Shared):
def setup(self, routes, debug=False): def setup(self, routes, websockets, piping=False, debug=False):
if piping:
routes[PIPE_ROUTE] = ServerPipe()
super().setup(routes, debug=debug) super().setup(routes, debug=debug)
self.websockets = websockets
self._last_websockets = self.websockets.copy()
self._peers = {}
id_ = binascii.b2a_hex(os.urandom(PIPE_ID_LENGTH // 2)).decode()
while id_ in self.peers:
id_ = binascii.b2a_hex(os.urandom(PIPE_ID_LENGTH // 2)).decode()
self.id_ = id_
self.override_methods() self.override_methods()
@property
def peers(self):
if self.websockets != self._last_websockets:
if self.debug:
Logging.info("Rebuilding peers. (currently connected: %i)" %
len(self.websockets))
self._peers = {}
for websocket in self.websockets:
self._peers[self.websockets[websocket].id_] = self.websockets[websocket]
self._last_websockets = self.websockets.copy()
return self._peers
def post_opened(self): def post_opened(self):
self.send(self.exchange_routes, META_ROUTE, indexed_dict=True) self.send(self.exchange_routes, META_ROUTE, indexed_dict=True)
class Client(WebSocketClient, Shared): class Client(WebSocketClient, Shared):
def setup(self, routes, debug=False): def setup(self, routes, piping=False, debug=False):
if piping:
self.pipe = self.__pipe
routes[PIPE_ROUTE] = DummyPipe()
super().setup(routes, debug=debug) super().setup(routes, debug=debug)
self.override_methods() self.override_methods()
def __pipe(self, data, route, id_):
try:
self.send(pack_pipe_src_message(data,
self.peer_reverse_exchange_routes[route], id_, debug=self.debug),
PIPE_ROUTE)
except KeyError:
Logging.warning("'%s' does not exist." % route)