Routes are just regular dicts now, which means that their ids are strings and they are bound to the key "routes". This allows the next feature to work: Error reporting on disconnected peers. Define a peer_unavaliable method in your handler and you will be notified when your pipe call has failed due to an unavaliable peer. Handled in the "meta" route. Piping has become such an integral feature to Highway that it doesn't make sense to disable it anymore. This is why the argument is missing in the constructor from now on.
418 lines
12 KiB
Python
418 lines
12 KiB
Python
import Logging
|
|
import gzip
|
|
import struct
|
|
import json
|
|
import binascii
|
|
import os
|
|
|
|
from Utils import capture_trace
|
|
from threading import Lock
|
|
|
|
from ws4py.websocket import WebSocket
|
|
from ws4py.client.threadedclient import WebSocketClient
|
|
|
|
INDEXED_DICT = 5
|
|
NoneType = None.__class__
|
|
|
|
DATA_TYPES = {str : 0, dict : 1,
|
|
list : 1, bytes : 2,
|
|
int : 3, float : 4, INDEXED_DICT : 5,
|
|
NoneType : 6}
|
|
|
|
|
|
def reverse_dict(dict):
|
|
return {v : k for k, v in dict.items()}
|
|
|
|
|
|
REVERSE_DATA_TYPES = reverse_dict(DATA_TYPES)
|
|
|
|
INVALID_ROUTE = 1
|
|
INVALID_METADATA_LAYOUT = 2
|
|
INVALID_DATA_TYPE = 3
|
|
|
|
META_ROUTE = "meta"
|
|
META_ROUTE_INDEX = 0
|
|
|
|
PIPE_ROUTE = "pipe"
|
|
|
|
PACK_FORMAT = "BH"
|
|
METADATA_LENGTH = struct.calcsize(PACK_FORMAT)
|
|
|
|
# Percise enough for now
|
|
PIPE_ID_LENGTH = 8
|
|
|
|
# Perspective: Packaged by client 1 for server
|
|
PIPE_SRC_PACK_FORMAT = "BH%is" % PIPE_ID_LENGTH
|
|
PIPE_SRC_METADATA_LENGTH = struct.calcsize(PIPE_SRC_PACK_FORMAT)
|
|
PIPE_SRC_METADATA_LENGTH = struct.calcsize(PIPE_SRC_PACK_FORMAT)
|
|
|
|
# Perspective: Packaged by server for client 2
|
|
PIPE_DEST_PACK_FORMAT = "B%is" % PIPE_ID_LENGTH
|
|
PIPE_DEST_METADATA_LENGTH = struct.calcsize(PIPE_DEST_PACK_FORMAT)
|
|
PIPE_DEST_METADATA_LENGTH = struct.calcsize(PIPE_DEST_PACK_FORMAT)
|
|
|
|
|
|
# Routing related
|
|
class Route:
|
|
def run(self, data, handler):
|
|
pass
|
|
|
|
def start(self, handler):
|
|
pass
|
|
|
|
def stop(self, handler):
|
|
pass
|
|
|
|
|
|
class Pipe(Route):
|
|
def run(self, data, peer, handler):
|
|
pass
|
|
|
|
|
|
def create_routes(routes):
|
|
routes = routes.copy()
|
|
for prefix in routes:
|
|
if type(routes[prefix]) is tuple or type(routes[prefix]) is list:
|
|
routes[prefix] = routes[prefix][0](**routes[prefix][1])
|
|
return routes
|
|
|
|
|
|
def launch_routes(created_routes, handler):
|
|
for prefix in created_routes:
|
|
try:
|
|
created_routes[prefix].start(handler)
|
|
except AttributeError:
|
|
pass
|
|
|
|
|
|
def close_routes(created_routes, handler):
|
|
for prefix in created_routes:
|
|
try:
|
|
created_routes[prefix].stop(handler)
|
|
except AttributeError:
|
|
pass
|
|
|
|
|
|
def create_exchange_map(routes):
|
|
exchange_map = {0 : META_ROUTE}
|
|
exchange_id = 1
|
|
for route in routes:
|
|
if route != META_ROUTE:
|
|
exchange_map[exchange_id] = route
|
|
exchange_id += 1
|
|
return exchange_map
|
|
|
|
|
|
def convert_exchange_map(routes):
|
|
exchange_map = {}
|
|
for key in routes:
|
|
if key.isnumeric() and type(routes[key]) is str:
|
|
exchange_map[int(key)] = routes[key]
|
|
else:
|
|
return None
|
|
return exchange_map
|
|
|
|
|
|
class ConvertFailedError(ValueError):
|
|
def __init__(self):
|
|
super(ValueError, self).__init__("conversion failed (invalid data type supplied)")
|
|
|
|
|
|
# Built-in routes
|
|
class Meta(Route):
|
|
def run(self, data, handler):
|
|
if type(data) is dict:
|
|
if "routes" in data:
|
|
peer_exchange_routes = convert_exchange_map(data["routes"])
|
|
if peer_exchange_routes != None:
|
|
handler.peer_exchange_routes = peer_exchange_routes
|
|
if handler.debug:
|
|
Logging.success("Received peer exchange routes: %s" % str(data))
|
|
handler.peer_reverse_exchange_routes = reverse_dict(handler.peer_exchange_routes)
|
|
if issubclass(handler.__class__, Client):
|
|
handler.send({"routes" : handler.exchange_routes}, META_ROUTE)
|
|
try:
|
|
handler.ready()
|
|
except AttributeError:
|
|
pass
|
|
if handler.debug:
|
|
Logging.info("Launching routes.")
|
|
launch_routes(handler.routes, handler)
|
|
if handler.debug:
|
|
Logging.info("Routes launched.")
|
|
else:
|
|
Logging.error("Received invalid exchange routes.")
|
|
if "unavaliable" in data:
|
|
try:
|
|
for peer in data["unavaliable"]:
|
|
handler.peer_unavaliable(peer)
|
|
except AttributeError:
|
|
Logging.warning("Handler does not implement a 'peer_unavaliabe' method.")
|
|
if type(data) is int:
|
|
if data == 1:
|
|
Logging.error("Last route was invalid.")
|
|
elif data == 2:
|
|
Logging.error("Last metadata layout was invalid.")
|
|
elif data == 3:
|
|
Logging.error("Last data type was invalid.")
|
|
else:
|
|
Logging.error("Unknown error code.")
|
|
|
|
|
|
class ServerPipe(Route):
|
|
def start(self, handler):
|
|
if issubclass(handler.__class__, Client):
|
|
Logging.error("Server-exclusive route")
|
|
handler.close()
|
|
|
|
|
|
def run(self, data, handler):
|
|
data_type, m_route, id_ = parse_pipe_src_metadata(data)
|
|
id_ = id_.decode()
|
|
data = convert_data(data[PIPE_SRC_METADATA_LENGTH:], data_type)
|
|
route = handler.exchange_routes[m_route]
|
|
if id_ in handler.peers:
|
|
if handler.debug:
|
|
Logging.info("Forwarding to '%s' on route '%s'" % (id_, route),
|
|
color=Logging.LIGHT_YELLOW)
|
|
handler.peers[id_].send(pack_pipe_dest_message(data, handler.id_), route)
|
|
else:
|
|
handler.send({"unavaliable" : [id_]}, "meta")
|
|
if handler.debug:
|
|
Logging.error("'%s' is not present in peers." % id_)
|
|
|
|
|
|
class DummyPipe(Route):
|
|
def run(self, data, handler):
|
|
pass
|
|
|
|
|
|
# Message packing and unpacking
|
|
def create_metadata(data_type, converted_route, indexed_dict=False):
|
|
return struct.pack(PACK_FORMAT,
|
|
DATA_TYPES[data_type] if not indexed_dict else DATA_TYPES[INDEXED_DICT],
|
|
converted_route)
|
|
|
|
|
|
# Perspective: Packaged by client 1 for server
|
|
def create_pipe_src_metadata(data_type, converted_route, id_, indexed_dict=False):
|
|
return struct.pack(PIPE_SRC_PACK_FORMAT,
|
|
DATA_TYPES[data_type] if not indexed_dict else DATA_TYPES[INDEXED_DICT],
|
|
converted_route, id_.encode())
|
|
|
|
# Perspective: Packaged by server for client 2
|
|
def create_pipe_dest_metadata(data_type, id_, indexed_dict=False):
|
|
return struct.pack(PIPE_DEST_PACK_FORMAT,
|
|
DATA_TYPES[data_type] if not indexed_dict else DATA_TYPES[INDEXED_DICT],
|
|
id_.encode())
|
|
|
|
|
|
def pack_message(data, exchange_route,
|
|
debug=False, indexed_dict=False):
|
|
data, original_data_type = prepare_data(data)
|
|
return create_metadata(original_data_type, exchange_route,
|
|
indexed_dict=indexed_dict) + data
|
|
|
|
|
|
def pack_pipe_src_message(data, exchange_route, id_, debug=False,
|
|
indexed_dict=False):
|
|
data, original_data_type = prepare_data(data)
|
|
return create_pipe_src_metadata(original_data_type, exchange_route, id_,
|
|
indexed_dict=indexed_dict) + data
|
|
|
|
def pack_pipe_dest_message(data, id_, debug=False,
|
|
indexed_dict=False):
|
|
data, original_data_type = prepare_data(data)
|
|
return create_pipe_dest_metadata(original_data_type, id_,
|
|
indexed_dict=indexed_dict) + data
|
|
|
|
|
|
def parse_metadata(message):
|
|
metadata = struct.unpack(PACK_FORMAT, message[:METADATA_LENGTH])
|
|
return REVERSE_DATA_TYPES[metadata[0]], metadata[1]
|
|
|
|
|
|
def parse_pipe_src_metadata(message):
|
|
metadata = struct.unpack(PIPE_SRC_PACK_FORMAT, message[:PIPE_SRC_METADATA_LENGTH])
|
|
return REVERSE_DATA_TYPES[metadata[0]], metadata[1], metadata[2]
|
|
|
|
|
|
def parse_pipe_dest_metadata(message):
|
|
metadata = struct.unpack(PIPE_DEST_PACK_FORMAT, message[:PIPE_DEST_METADATA_LENGTH])
|
|
return REVERSE_DATA_TYPES[metadata[0]], metadata[1]
|
|
|
|
def convert_data(data, data_type, debug=False):
|
|
try:
|
|
if data_type == str:
|
|
try:
|
|
data = data.decode()
|
|
except UnicodeDecodeError:
|
|
Logging.warning("Unicode characters are not properly encoded. "
|
|
"Falling back to unicode_escape.")
|
|
data = data.decode("unicode_escape")
|
|
elif data_type == int:
|
|
data = int(data.decode())
|
|
elif data_type == float:
|
|
data = float(data.decode())
|
|
elif data_type in (dict, list):
|
|
data = json.loads(data.decode())
|
|
elif data_type == INDEXED_DICT:
|
|
data = json.loads(data.decode())
|
|
indexed_data = {}
|
|
for key in data:
|
|
indexed_data[int(key)] = data[key]
|
|
data = indexed_data
|
|
elif data_type == NoneType:
|
|
data = None
|
|
except Exception:
|
|
Logging.error("Data conversion failed.")
|
|
capture_trace()
|
|
return None
|
|
return data
|
|
|
|
|
|
def prepare_data(data):
|
|
original_data_type = type(data)
|
|
if original_data_type is str:
|
|
data = data.encode()
|
|
elif original_data_type in (int, float):
|
|
data = str(data).encode()
|
|
elif original_data_type is dict or original_data_type is list:
|
|
data = json.dumps(data, separators=(',',':')).encode()
|
|
elif original_data_type is NoneType:
|
|
data = "".encode()
|
|
return data, original_data_type
|
|
|
|
|
|
class Shared:
|
|
def setup(self, routes, debug=False):
|
|
self.routes = routes
|
|
self.routes[META_ROUTE] = Meta()
|
|
self.routes = create_routes(self.routes)
|
|
self.reverse_routes = reverse_dict(self.routes)
|
|
self.exchange_routes = create_exchange_map(self.routes)
|
|
self.reverse_exchange_routes = reverse_dict(self.exchange_routes)
|
|
# Peer routes have not been received yet. As per convention the meta route
|
|
# has to exist and we need it for our first send to succeed (otherwise it
|
|
# would fail during route lookup).
|
|
self.peer_exchange_routes = {META_ROUTE_INDEX : META_ROUTE}
|
|
self.peer_reverse_exchange_routes = reverse_dict(self.peer_exchange_routes)
|
|
self.debug = debug
|
|
|
|
|
|
def override_methods(self):
|
|
self.opened = self.patched_opened
|
|
self.received_message = self.patched_received_message
|
|
self.raw_send = self.send
|
|
self.send = self.patched_send
|
|
|
|
|
|
def patched_opened(self):
|
|
self.address, self.port = self.peer_address
|
|
try:
|
|
self.post_opened()
|
|
except AttributeError:
|
|
pass
|
|
|
|
|
|
def patched_received_message(self, message):
|
|
message = message.data
|
|
data_type, m_route = parse_metadata(message)
|
|
try:
|
|
route = self.exchange_routes[m_route]
|
|
except KeyError:
|
|
self.send(INVALID_ROUTE, META_ROUTE)
|
|
Logging.error("Received message with non-existing route '%d' from '%s:%d'" % (
|
|
m_route, self.address, self.port))
|
|
return
|
|
data = convert_data(message[METADATA_LENGTH:], data_type)
|
|
if self.debug:
|
|
data_repr = str(data).replace("\n", " ")
|
|
if len(data_repr) > 80:
|
|
data_repr = data_repr[:80] + "..."
|
|
Logging.info("Received '%s' on route '%s': %s (%s:%d)" % (
|
|
type(data).__name__ if not data_type == INDEXED_DICT else "indexed_dict",
|
|
route, data_repr, self.address,
|
|
self.port))
|
|
try:
|
|
route = self.routes[route]
|
|
except:
|
|
Logging.warning("'%s' does not exist." % route)
|
|
else:
|
|
if not issubclass(route.__class__, Pipe):
|
|
route.run(data, self)
|
|
else:
|
|
data_type, peer = parse_pipe_dest_metadata(data)
|
|
peer = peer.decode()
|
|
data = convert_data(data[PIPE_DEST_METADATA_LENGTH:], data_type, debug=self.debug)
|
|
route.run(data, peer, self)
|
|
|
|
|
|
def patched_send(self, data, route, indexed_dict=False):
|
|
try:
|
|
self.raw_send(pack_message(data,
|
|
self.peer_reverse_exchange_routes[route],
|
|
debug=self.debug, indexed_dict=indexed_dict),
|
|
binary=True)
|
|
except KeyError:
|
|
Logging.error("'%s' is not a valid peer route." % route)
|
|
else:
|
|
if self.debug:
|
|
data_repr = str(data).replace("\n", " ")
|
|
if len(data_repr) > 80:
|
|
data_repr = data_repr[:80] + "..."
|
|
Logging.info("Sent '%s' on route '%s': %s (%s:%d)" % (
|
|
type(data).__name__, route, data_repr, self.address,
|
|
self.port))
|
|
|
|
|
|
|
|
class Server(WebSocket, Shared):
|
|
def setup(self, routes, websockets, debug=False):
|
|
routes[PIPE_ROUTE] = ServerPipe()
|
|
super().setup(routes, debug=debug)
|
|
self.websockets = websockets
|
|
self._last_websockets = self.websockets.copy()
|
|
self._peers = {}
|
|
id_ = binascii.b2a_hex(os.urandom(PIPE_ID_LENGTH // 2)).decode()
|
|
while id_ in self.peers:
|
|
id_ = binascii.b2a_hex(os.urandom(PIPE_ID_LENGTH // 2)).decode()
|
|
self.id_ = id_
|
|
self.override_methods()
|
|
|
|
|
|
@property
|
|
def peers(self):
|
|
if self.websockets != self._last_websockets:
|
|
if self.debug:
|
|
Logging.info("Rebuilding peers. (currently connected: %i)" %
|
|
len(self.websockets))
|
|
self._peers = {}
|
|
for websocket in self.websockets:
|
|
self._peers[self.websockets[websocket].id_] = self.websockets[websocket]
|
|
self._last_websockets = self.websockets.copy()
|
|
return self._peers
|
|
|
|
|
|
def post_opened(self):
|
|
self.send({"routes" : self.exchange_routes}, META_ROUTE)
|
|
|
|
|
|
class Client(WebSocketClient, Shared):
|
|
def setup(self, routes, debug=False):
|
|
routes[PIPE_ROUTE] = DummyPipe()
|
|
super().setup(routes, debug=debug)
|
|
|
|
self.override_methods()
|
|
|
|
def pipe(self, data, route, id_, indexed_dict=False):
|
|
try:
|
|
self.send(pack_pipe_src_message(data,
|
|
self.peer_reverse_exchange_routes[route], id_, debug=self.debug),
|
|
PIPE_ROUTE, indexed_dict=indexed_dict)
|
|
except KeyError:
|
|
Logging.warning("'%s' does not exist." % route)
|
|
|
|
|
|
def peer_unavaliable(self, peer, handler):
|
|
pass
|