This repository has been archived on 2025-06-04. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
fl0w-old/Shared/Highway.py
Philip Trauner 1a918b5843 Added selective debug suppression
"pipe" can get quite noisy (main reason was recursion because of output)
2017-03-15 20:50:16 +01:00

422 lines
12 KiB
Python

import Logging
import gzip
import struct
import json
import binascii
import os
from Utils import capture_trace
from threading import Lock
from ws4py.websocket import WebSocket
from ws4py.client.threadedclient import WebSocketClient
INDEXED_DICT = 5
NoneType = None.__class__
DATA_TYPES = {str : 0, dict : 1,
list : 1, bytes : 2,
int : 3, float : 4, INDEXED_DICT : 5,
NoneType : 6}
def reverse_dict(dict):
return {v : k for k, v in dict.items()}
REVERSE_DATA_TYPES = reverse_dict(DATA_TYPES)
INVALID_ROUTE = 1
INVALID_METADATA_LAYOUT = 2
INVALID_DATA_TYPE = 3
META_ROUTE = "meta"
META_ROUTE_INDEX = 0
PIPE_ROUTE = "pipe"
PACK_FORMAT = "BH"
METADATA_LENGTH = struct.calcsize(PACK_FORMAT)
# Percise enough for now
PIPE_ID_LENGTH = 8
# Perspective: Packaged by client 1 for server
PIPE_SRC_PACK_FORMAT = "BH%is" % PIPE_ID_LENGTH
PIPE_SRC_METADATA_LENGTH = struct.calcsize(PIPE_SRC_PACK_FORMAT)
PIPE_SRC_METADATA_LENGTH = struct.calcsize(PIPE_SRC_PACK_FORMAT)
# Perspective: Packaged by server for client 2
PIPE_DEST_PACK_FORMAT = "B%is" % PIPE_ID_LENGTH
PIPE_DEST_METADATA_LENGTH = struct.calcsize(PIPE_DEST_PACK_FORMAT)
PIPE_DEST_METADATA_LENGTH = struct.calcsize(PIPE_DEST_PACK_FORMAT)
# Routing related
class Route:
def run(self, data, handler):
pass
def start(self, handler):
pass
def stop(self, handler):
pass
class Pipe(Route):
def run(self, data, peer, handler):
pass
def create_routes(routes):
routes = routes.copy()
for prefix in routes:
if type(routes[prefix]) is tuple or type(routes[prefix]) is list:
routes[prefix] = routes[prefix][0](**routes[prefix][1])
return routes
def launch_routes(created_routes, handler):
for prefix in created_routes:
try:
created_routes[prefix].start(handler)
except AttributeError:
pass
def close_routes(created_routes, handler):
for prefix in created_routes:
try:
created_routes[prefix].stop(handler)
except AttributeError:
pass
def create_exchange_map(routes):
exchange_map = {0 : META_ROUTE}
exchange_id = 1
for route in routes:
if route != META_ROUTE:
exchange_map[exchange_id] = route
exchange_id += 1
return exchange_map
def convert_exchange_map(routes):
exchange_map = {}
for key in routes:
if key.isnumeric() and type(routes[key]) is str:
exchange_map[int(key)] = routes[key]
else:
return None
return exchange_map
class ConvertFailedError(ValueError):
def __init__(self):
super(ValueError, self).__init__("conversion failed (invalid data type supplied)")
# Built-in routes
class Meta(Route):
def run(self, data, handler):
if type(data) is dict:
if "routes" in data:
peer_exchange_routes = convert_exchange_map(data["routes"])
if peer_exchange_routes != None:
handler.peer_exchange_routes = peer_exchange_routes
if handler.debug:
Logging.success("Received peer exchange routes: %s" % str(data))
handler.peer_reverse_exchange_routes = reverse_dict(handler.peer_exchange_routes)
if issubclass(handler.__class__, Client):
handler.send({"routes" : handler.exchange_routes}, META_ROUTE)
try:
handler.ready()
except AttributeError:
pass
if handler.debug:
Logging.info("Launching routes.")
launch_routes(handler.routes, handler)
if handler.debug:
Logging.info("Routes launched.")
else:
Logging.error("Received invalid exchange routes.")
if "unavaliable" in data:
try:
for peer in data["unavaliable"]:
handler.peer_unavaliable(peer)
except AttributeError:
Logging.warning("Handler does not implement a 'peer_unavaliabe' method.")
if type(data) is int:
if data == 1:
Logging.error("Last route was invalid.")
elif data == 2:
Logging.error("Last metadata layout was invalid.")
elif data == 3:
Logging.error("Last data type was invalid.")
else:
Logging.error("Unknown error code.")
class ServerPipe(Route):
def start(self, handler):
if issubclass(handler.__class__, Client):
Logging.error("Server-exclusive route")
handler.close()
def run(self, data, handler):
data_type, m_route, id_ = parse_pipe_src_metadata(data)
id_ = id_.decode()
data = convert_data(data[PIPE_SRC_METADATA_LENGTH:], data_type)
route = handler.exchange_routes[m_route]
if id_ in handler.peers:
if handler.debug:
Logging.info("Forwarding to '%s' on route '%s'" % (id_, route),
color=Logging.LIGHT_YELLOW)
handler.peers[id_].send(pack_pipe_dest_message(data, handler.id_), route)
else:
handler.send({"unavaliable" : [id_]}, "meta")
if handler.debug:
Logging.error("'%s' is not present in peers." % id_)
class DummyPipe(Route):
def run(self, data, handler):
pass
# Message packing and unpacking
def create_metadata(data_type, converted_route, indexed_dict=False):
return struct.pack(PACK_FORMAT,
DATA_TYPES[data_type] if not indexed_dict else DATA_TYPES[INDEXED_DICT],
converted_route)
# Perspective: Packaged by client 1 for server
def create_pipe_src_metadata(data_type, converted_route, id_, indexed_dict=False):
return struct.pack(PIPE_SRC_PACK_FORMAT,
DATA_TYPES[data_type] if not indexed_dict else DATA_TYPES[INDEXED_DICT],
converted_route, id_.encode())
# Perspective: Packaged by server for client 2
def create_pipe_dest_metadata(data_type, id_, indexed_dict=False):
return struct.pack(PIPE_DEST_PACK_FORMAT,
DATA_TYPES[data_type] if not indexed_dict else DATA_TYPES[INDEXED_DICT],
id_.encode())
def pack_message(data, exchange_route,
debug=False, indexed_dict=False):
data, original_data_type = prepare_data(data)
return create_metadata(original_data_type, exchange_route,
indexed_dict=indexed_dict) + data
def pack_pipe_src_message(data, exchange_route, id_, debug=False,
indexed_dict=False):
data, original_data_type = prepare_data(data)
return create_pipe_src_metadata(original_data_type, exchange_route, id_,
indexed_dict=indexed_dict) + data
def pack_pipe_dest_message(data, id_, debug=False,
indexed_dict=False):
data, original_data_type = prepare_data(data)
return create_pipe_dest_metadata(original_data_type, id_,
indexed_dict=indexed_dict) + data
def parse_metadata(message):
metadata = struct.unpack(PACK_FORMAT, message[:METADATA_LENGTH])
return REVERSE_DATA_TYPES[metadata[0]], metadata[1]
def parse_pipe_src_metadata(message):
metadata = struct.unpack(PIPE_SRC_PACK_FORMAT, message[:PIPE_SRC_METADATA_LENGTH])
return REVERSE_DATA_TYPES[metadata[0]], metadata[1], metadata[2]
def parse_pipe_dest_metadata(message):
metadata = struct.unpack(PIPE_DEST_PACK_FORMAT, message[:PIPE_DEST_METADATA_LENGTH])
return REVERSE_DATA_TYPES[metadata[0]], metadata[1]
def convert_data(data, data_type, debug=False):
try:
if data_type == str:
try:
data = data.decode()
except UnicodeDecodeError:
Logging.warning("Unicode characters are not properly encoded. "
"Falling back to unicode_escape.")
data = data.decode("unicode_escape")
elif data_type == int:
data = int(data.decode())
elif data_type == float:
data = float(data.decode())
elif data_type in (dict, list):
data = json.loads(data.decode())
elif data_type == INDEXED_DICT:
data = json.loads(data.decode())
indexed_data = {}
for key in data:
indexed_data[int(key)] = data[key]
data = indexed_data
elif data_type == NoneType:
data = None
except Exception:
Logging.error("Data conversion failed.")
capture_trace()
return None
return data
def prepare_data(data):
original_data_type = type(data)
if original_data_type is str:
data = data.encode()
elif original_data_type in (int, float):
data = str(data).encode()
elif original_data_type is dict or original_data_type is list:
data = json.dumps(data, separators=(',',':')).encode()
elif original_data_type is NoneType:
data = "".encode()
return data, original_data_type
class Shared:
def setup(self, routes, debug_suppress_routes=None, debug=False):
self.routes = routes
self.routes[META_ROUTE] = Meta()
self.routes = create_routes(self.routes)
self.reverse_routes = reverse_dict(self.routes)
self.exchange_routes = create_exchange_map(self.routes)
self.reverse_exchange_routes = reverse_dict(self.exchange_routes)
# Peer routes have not been received yet. As per convention the meta route
# has to exist and we need it for our first send to succeed (otherwise it
# would fail during route lookup).
self.peer_exchange_routes = {META_ROUTE_INDEX : META_ROUTE}
self.peer_reverse_exchange_routes = reverse_dict(self.peer_exchange_routes)
if debug_suppress_routes != None:
self.debug_suppress_routes = debug_suppress_routes
else:
self.debug_suppress_routes = []
self.debug = debug
def override_methods(self):
self.opened = self.patched_opened
self.received_message = self.patched_received_message
self.raw_send = self.send
self.send = self.patched_send
def patched_opened(self):
self.address, self.port = self.peer_address
try:
self.post_opened()
except AttributeError:
pass
def patched_received_message(self, message):
message = message.data
data_type, m_route = parse_metadata(message)
try:
route = self.exchange_routes[m_route]
except KeyError:
self.send(INVALID_ROUTE, META_ROUTE)
Logging.error("Received message with non-existing route '%d' from '%s:%d'" % (
m_route, self.address, self.port))
return
data = convert_data(message[METADATA_LENGTH:], data_type)
if self.debug and route not in self.debug_suppress_routes:
data_repr = str(data).replace("\n", " ")
if len(data_repr) > 80:
data_repr = data_repr[:80] + "..."
Logging.info("Received '%s' on route '%s': %s (%s:%d)" % (
type(data).__name__ if not data_type == INDEXED_DICT else "indexed_dict",
route, data_repr, self.address,
self.port))
try:
route = self.routes[route]
except:
Logging.warning("'%s' does not exist." % route)
else:
if not issubclass(route.__class__, Pipe):
route.run(data, self)
else:
data_type, peer = parse_pipe_dest_metadata(data)
peer = peer.decode()
data = convert_data(data[PIPE_DEST_METADATA_LENGTH:], data_type, debug=self.debug)
route.run(data, peer, self)
def patched_send(self, data, route, indexed_dict=False):
try:
self.raw_send(pack_message(data,
self.peer_reverse_exchange_routes[route],
debug=self.debug, indexed_dict=indexed_dict),
binary=True)
except KeyError:
Logging.error("'%s' is not a valid peer route." % route)
else:
if self.debug and route not in self.debug_suppress_routes:
data_repr = str(data).replace("\n", " ")
if len(data_repr) > 80:
data_repr = data_repr[:80] + "..."
Logging.info("Sent '%s' on route '%s': %s (%s:%d)" % (
type(data).__name__, route, data_repr, self.address,
self.port))
class Server(WebSocket, Shared):
def setup(self, routes, websockets, debug_suppress_routes=None, debug=False):
routes[PIPE_ROUTE] = ServerPipe()
super().setup(routes, debug_suppress_routes=debug_suppress_routes, debug=debug)
self.websockets = websockets
self._last_websockets = self.websockets.copy()
self._peers = {}
id_ = binascii.b2a_hex(os.urandom(PIPE_ID_LENGTH // 2)).decode()
while id_ in self.peers:
id_ = binascii.b2a_hex(os.urandom(PIPE_ID_LENGTH // 2)).decode()
self.id_ = id_
self.override_methods()
@property
def peers(self):
if self.websockets != self._last_websockets:
if self.debug:
Logging.info("Rebuilding peers. (currently connected: %i)" %
len(self.websockets))
self._peers = {}
for websocket in self.websockets:
self._peers[self.websockets[websocket].id_] = self.websockets[websocket]
self._last_websockets = self.websockets.copy()
return self._peers
def post_opened(self):
self.send({"routes" : self.exchange_routes}, META_ROUTE)
class Client(WebSocketClient, Shared):
def setup(self, routes, debug_suppress_routes=None, debug=False):
routes[PIPE_ROUTE] = DummyPipe()
super().setup(routes, debug_suppress_routes=debug_suppress_routes, debug=debug)
self.override_methods()
def pipe(self, data, route, id_, indexed_dict=False):
try:
self.send(pack_pipe_src_message(data,
self.peer_reverse_exchange_routes[route], id_, debug=self.debug),
PIPE_ROUTE, indexed_dict=indexed_dict)
except KeyError:
Logging.warning("'%s' does not exist." % route)
def peer_unavaliable(self, peer, handler):
pass