Made it work (finally)
Replaced receive_routes with a shared route called meta. Highway manages itself with it's own networking capabilities. (Meta as fuck) Added NoneType to sendable types (gzip doesn't like None). Removed old, non-reachable code. Gave up on multiple inheritence. To inherit from Shared override_methods has to be called. It will monkey-patch all required methods. (less hacky than before) Ripped out tons of server/client specific code and made it shared.
This commit is contained in:
parent
42c08217dc
commit
37976049a6
1 changed files with 59 additions and 62 deletions
|
@ -3,16 +3,19 @@ import gzip
|
||||||
import Routing
|
import Routing
|
||||||
import struct
|
import struct
|
||||||
import json
|
import json
|
||||||
|
from Utils import capture_trace
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
|
|
||||||
from ws4py.websocket import WebSocket
|
from ws4py.websocket import WebSocket
|
||||||
from ws4py.client.threadedclient import WebSocketClient
|
from ws4py.client.threadedclient import WebSocketClient
|
||||||
|
|
||||||
INDEXED_DICT = 5
|
INDEXED_DICT = 5
|
||||||
|
NoneType = None.__class__
|
||||||
|
|
||||||
DATA_TYPES = {str : 0, dict : 1,
|
DATA_TYPES = {str : 0, dict : 1,
|
||||||
list : 1, bytes : 2,
|
list : 1, bytes : 2,
|
||||||
int : 3, float : 4, INDEXED_DICT : 5}
|
int : 3, float : 4, INDEXED_DICT : 5,
|
||||||
|
NoneType : 6}
|
||||||
|
|
||||||
|
|
||||||
def reverse_dict(dict):
|
def reverse_dict(dict):
|
||||||
|
@ -31,6 +34,7 @@ COMPRESSION_LEVEL = 3
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class ConvertFailedError(ValueError):
|
class ConvertFailedError(ValueError):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(ValueError, self).__init__("conversion failed (invalid data type supplied)")
|
super(ValueError, self).__init__("conversion failed (invalid data type supplied)")
|
||||||
|
@ -42,6 +46,22 @@ class Metadata:
|
||||||
self.m_route = m_route
|
self.m_route = m_route
|
||||||
|
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
def run(self, data, handler):
|
||||||
|
if type(data) is dict:
|
||||||
|
handler.peer_exchange_routes = data
|
||||||
|
Logging.info("Received peer exchange routes: %s" % str(data))
|
||||||
|
handler.peer_reverse_exchange_routes = reverse_dict(handler.peer_exchange_routes)
|
||||||
|
if issubclass(handler.__class__, Client):
|
||||||
|
handler.send(handler.exchange_routes, META_ROUTE, indexed_dict=True)
|
||||||
|
Routing.launch_routes(handler.routes, handler)
|
||||||
|
try:
|
||||||
|
handler.ready()
|
||||||
|
except AttributeError:
|
||||||
|
Logging.warning("Consider to add a ready method to your handler. "
|
||||||
|
"Without one, it's pretty much useless.")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def create_metadata(data_type, converted_route, indexed_dict=False):
|
def create_metadata(data_type, converted_route, indexed_dict=False):
|
||||||
return struct.pack("bh",
|
return struct.pack("bh",
|
||||||
|
@ -59,6 +79,8 @@ def prepare_data(data):
|
||||||
data = data.encode()
|
data = data.encode()
|
||||||
elif original_data_type is dict or original_data_type is list:
|
elif original_data_type is dict or original_data_type is list:
|
||||||
data = json.dumps(data, separators=(',',':')).encode()
|
data = json.dumps(data, separators=(',',':')).encode()
|
||||||
|
elif original_data_type is NoneType:
|
||||||
|
data = "".encode()
|
||||||
return data, original_data_type
|
return data, original_data_type
|
||||||
|
|
||||||
|
|
||||||
|
@ -66,14 +88,6 @@ def pack_message(data, exchange_route, compression_level,
|
||||||
debug=False, indexed_dict=False):
|
debug=False, indexed_dict=False):
|
||||||
data, original_data_type = prepare_data(data)
|
data, original_data_type = prepare_data(data)
|
||||||
data = gzip.compress(data, compression_level)
|
data = gzip.compress(data, compression_level)
|
||||||
"""
|
|
||||||
if debug:
|
|
||||||
data_repr = str(data).replace("\n", " ")
|
|
||||||
if len(data_repr) > 80:
|
|
||||||
data_repr = data_repr[:80] + "..."
|
|
||||||
Logging.info("Packaged '%s' on route '%s': %s" %
|
|
||||||
(original_data_type.__name__, route, data_repr))
|
|
||||||
"""
|
|
||||||
return create_metadata(original_data_type, exchange_route,
|
return create_metadata(original_data_type, exchange_route,
|
||||||
indexed_dict=indexed_dict) + data
|
indexed_dict=indexed_dict) + data
|
||||||
|
|
||||||
|
@ -84,8 +98,6 @@ def parse_metadata(message):
|
||||||
return Metadata(metadata[0], metadata[1])
|
return Metadata(metadata[0], metadata[1])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def parse_message(data, data_type):
|
def parse_message(data, data_type):
|
||||||
data = convert_data(gzip.decompress(data[4:]), data_type)
|
data = convert_data(gzip.decompress(data[4:]), data_type)
|
||||||
return data
|
return data
|
||||||
|
@ -107,7 +119,11 @@ def convert_data(data, data_type, debug=False):
|
||||||
for key in data:
|
for key in data:
|
||||||
indexed_data[int(key)] = data[key]
|
indexed_data[int(key)] = data[key]
|
||||||
data = indexed_data
|
data = indexed_data
|
||||||
|
elif data_type == NoneType:
|
||||||
|
data = None
|
||||||
except Exception:
|
except Exception:
|
||||||
|
Logging.error("Data conversion failed.")
|
||||||
|
capture_trace()
|
||||||
return None
|
return None
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
@ -115,38 +131,58 @@ def convert_data(data, data_type, debug=False):
|
||||||
class Shared:
|
class Shared:
|
||||||
def setup(self, routes, compression_level=COMPRESSION_LEVEL, debug=False):
|
def setup(self, routes, compression_level=COMPRESSION_LEVEL, debug=False):
|
||||||
self.routes = routes
|
self.routes = routes
|
||||||
|
self.routes["meta"] = Meta()
|
||||||
self.compression_level = compression_level
|
self.compression_level = compression_level
|
||||||
self.address, self.port = self.peer_address
|
|
||||||
self.debug = debug
|
self.debug = debug
|
||||||
|
|
||||||
|
|
||||||
def opened(self):
|
def override_methods(self):
|
||||||
|
self.opened = self.patched_opened
|
||||||
|
self.received_message = self.patched_received_message
|
||||||
|
self.raw_send = self.send
|
||||||
|
self.send = self.patched_send
|
||||||
|
|
||||||
|
|
||||||
|
def patched_opened(self):
|
||||||
|
self.address, self.port = self.peer_address
|
||||||
self.routes = Routing.create_routes(self.routes, self)
|
self.routes = Routing.create_routes(self.routes, self)
|
||||||
|
self.exchange_routes = Routing.create_exchange_map(self.routes)
|
||||||
|
self.reverse_exchange_routes = reverse_dict(self.exchange_routes)
|
||||||
|
# Peer routes have not been received yet. As per convention the meta route
|
||||||
|
# has to exist and we need it for our first send to succeed (otherwise it
|
||||||
|
# would fail during route lookup
|
||||||
|
self.peer_exchange_routes = {-1 : META_ROUTE}
|
||||||
|
self.peer_reverse_exchange_routes = reverse_dict(self.peer_exchange_routes)
|
||||||
|
try:
|
||||||
|
self.post_opened()
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def received_message(self, message):
|
def patched_received_message(self, message):
|
||||||
message = message.data
|
message = message.data
|
||||||
metadata = parse_metadata(message)
|
metadata = parse_metadata(message)
|
||||||
try:
|
try:
|
||||||
route = self.routes[self.exchange_routes[metadata.m_route]]
|
route = self.exchange_routes[metadata.m_route]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
self.send(Handler.INVALID_ROUTE, META_ROUTE)
|
self.send(Handler.INVALID_ROUTE, META_ROUTE)
|
||||||
Logging.error("Received message with non-existing route '%d' from '%s:%d'" % (
|
Logging.error("Received message with non-existing route '%d' from '%s:%d'" % (
|
||||||
metadata.m_route, self.address, self.port))
|
metadata.m_route, self.address, self.port))
|
||||||
return
|
return
|
||||||
data = convert_data(message, metadata.data_type)
|
data = parse_message(message, metadata.data_type)
|
||||||
if self.debug:
|
if self.debug:
|
||||||
data_repr = str(data).replace("\n", " ")
|
data_repr = str(data).replace("\n", " ")
|
||||||
if len(data_repr) > 80:
|
if len(data_repr) > 80:
|
||||||
data_repr = data_repr[:80] + "..."
|
data_repr = data_repr[:80] + "..."
|
||||||
Logging.info("Received '%s' on route '%s': %s (%s:%d)" % (
|
Logging.info("Received '%s' on route '%s': %s (%s:%d)" % (
|
||||||
type(data).__name__, route, data_repr, self.address,
|
type(data).__name__ if not metadata.data_type == INDEXED_DICT else "indexed_dict",
|
||||||
|
route, data_repr, self.address,
|
||||||
self.port))
|
self.port))
|
||||||
self.routes[route].run(data, self)
|
self.routes[route].run(data, self)
|
||||||
|
|
||||||
|
|
||||||
def patched_send(self, data, route, indexed_dict=False):
|
def patched_send(self, data, route, indexed_dict=False):
|
||||||
self.raw_send(pack_message(data, self.reverse_exchange_routes[route],
|
self.raw_send(pack_message(data, self.peer_reverse_exchange_routes[route],
|
||||||
self.compression_level, debug=self.debug,
|
self.compression_level, debug=self.debug,
|
||||||
indexed_dict=indexed_dict), binary=True)
|
indexed_dict=indexed_dict), binary=True)
|
||||||
if self.debug:
|
if self.debug:
|
||||||
|
@ -159,56 +195,17 @@ class Shared:
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class Server(WebSocket, Shared):
|
||||||
class Server(Shared, WebSocket):
|
|
||||||
def setup(self, routes, compression_level=COMPRESSION_LEVEL, debug=False):
|
def setup(self, routes, compression_level=COMPRESSION_LEVEL, debug=False):
|
||||||
super().setup(routes, compression_level, debug=debug)
|
super().setup(routes, compression_level, debug=debug)
|
||||||
# Send replacement can't be done in the parent setup method because
|
self.override_methods()
|
||||||
# both client and server use a method from a different module.
|
|
||||||
self.raw_send = self.send
|
|
||||||
self.send = self.patched_send
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def opened(self):
|
|
||||||
super().opened()
|
|
||||||
self.exchange_routes = Routing.create_exchange_map(self.routes)
|
|
||||||
self.reverse_exchange_routes = reverse_dict(self.exchange_routes)
|
|
||||||
self.patched_send(self.exchange_routes, META_ROUTE, indexed_dict=True)
|
|
||||||
Routing.launch_routes(self.routes, self)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def post_opened(self):
|
||||||
|
self.send(self.exchange_routes, META_ROUTE, indexed_dict=True)
|
||||||
|
|
||||||
|
|
||||||
class Client(WebSocketClient, Shared):
|
class Client(WebSocketClient, Shared):
|
||||||
def setup(self, routes, compression_level=COMPRESSION_LEVEL, debug=False):
|
def setup(self, routes, compression_level=COMPRESSION_LEVEL, debug=False):
|
||||||
super().setup(routes, compression_level, debug=debug)
|
super().setup(routes, compression_level, debug=debug)
|
||||||
# Send replacement can't be done in the parent setup method because
|
self.override_methods()
|
||||||
# both client and server use a method from a different module.
|
|
||||||
self.metadata_lock = Lock()
|
|
||||||
self.raw_send = self.send
|
|
||||||
self.send = self.patched_send
|
|
||||||
self._received_message = self.received_message
|
|
||||||
self.received_message = self.receive_routes
|
|
||||||
|
|
||||||
|
|
||||||
def receive_routes(self, message):
|
|
||||||
self.metadata_lock.acquire()
|
|
||||||
message = message.data
|
|
||||||
metadata = parse_metadata(message)
|
|
||||||
if metadata.m_route != -1:
|
|
||||||
Logging.error("Invalid route on first message (not meta)")
|
|
||||||
self.close()
|
|
||||||
return
|
|
||||||
data = parse_message(message, metadata.data_type)
|
|
||||||
if Routing.validate_exchange_map(data):
|
|
||||||
self.exchange_routes = data
|
|
||||||
self.reverse_exchange_routes = reverse_dict(self.exchange_routes)
|
|
||||||
self.received_message = self._received_message
|
|
||||||
Logging.info("Routes successfully exchanged: %s" % str(self.exchange_routes))
|
|
||||||
else:
|
|
||||||
Logging.error("Invalid exchange map.")
|
|
||||||
self.close()
|
|
||||||
return
|
|
||||||
self.metadata_lock.release()
|
|
||||||
|
|
Reference in a new issue