Routes aren't an indexed_dict anymore, piping error handling, piping on by default
Routes are just regular dicts now, which means that their ids are strings and they are bound to the key "routes". This allows the next feature to work: Error reporting on disconnected peers. Define a peer_unavaliable method in your handler and you will be notified when your pipe call has failed due to an unavaliable peer. Handled in the "meta" route. Piping has become such an integral feature to Highway that it doesn't make sense to disable it anymore. This is why the argument is missing in the constructor from now on.
This commit is contained in:
parent
d3ac8cf64c
commit
0934f6fe69
1 changed files with 47 additions and 30 deletions
|
@ -103,11 +103,14 @@ def create_exchange_map(routes):
|
||||||
return exchange_map
|
return exchange_map
|
||||||
|
|
||||||
|
|
||||||
def validate_exchange_map(routes):
|
def convert_exchange_map(routes):
|
||||||
|
exchange_map = {}
|
||||||
for key in routes:
|
for key in routes:
|
||||||
if not type(key) is int and type(routes[key]) is str:
|
if key.isnumeric() and type(routes[key]) is str:
|
||||||
return False
|
exchange_map[int(key)] = routes[key]
|
||||||
return True
|
else:
|
||||||
|
return None
|
||||||
|
return exchange_map
|
||||||
|
|
||||||
|
|
||||||
class ConvertFailedError(ValueError):
|
class ConvertFailedError(ValueError):
|
||||||
|
@ -119,21 +122,32 @@ class ConvertFailedError(ValueError):
|
||||||
class Meta(Route):
|
class Meta(Route):
|
||||||
def run(self, data, handler):
|
def run(self, data, handler):
|
||||||
if type(data) is dict:
|
if type(data) is dict:
|
||||||
handler.peer_exchange_routes = data
|
if "routes" in data:
|
||||||
if handler.debug:
|
peer_exchange_routes = convert_exchange_map(data["routes"])
|
||||||
Logging.success("Received peer exchange routes: %s" % str(data))
|
if peer_exchange_routes != None:
|
||||||
handler.peer_reverse_exchange_routes = reverse_dict(handler.peer_exchange_routes)
|
handler.peer_exchange_routes = peer_exchange_routes
|
||||||
if issubclass(handler.__class__, Client):
|
if handler.debug:
|
||||||
handler.send(handler.exchange_routes, META_ROUTE, indexed_dict=True)
|
Logging.success("Received peer exchange routes: %s" % str(data))
|
||||||
try:
|
handler.peer_reverse_exchange_routes = reverse_dict(handler.peer_exchange_routes)
|
||||||
handler.ready()
|
if issubclass(handler.__class__, Client):
|
||||||
except AttributeError:
|
handler.send({"routes" : handler.exchange_routes}, META_ROUTE)
|
||||||
pass
|
try:
|
||||||
if handler.debug:
|
handler.ready()
|
||||||
Logging.info("Launching routes.")
|
except AttributeError:
|
||||||
launch_routes(handler.routes, handler)
|
pass
|
||||||
if handler.debug:
|
if handler.debug:
|
||||||
Logging.info("Routes launched.")
|
Logging.info("Launching routes.")
|
||||||
|
launch_routes(handler.routes, handler)
|
||||||
|
if handler.debug:
|
||||||
|
Logging.info("Routes launched.")
|
||||||
|
else:
|
||||||
|
Logging.error("Received invalid exchange routes.")
|
||||||
|
if "unavaliable" in data:
|
||||||
|
try:
|
||||||
|
for peer in data["unavaliable"]:
|
||||||
|
handler.peer_unavaliable(peer)
|
||||||
|
except AttributeError:
|
||||||
|
Logging.warning("Handler does not implement a 'peer_unavaliabe' method.")
|
||||||
if type(data) is int:
|
if type(data) is int:
|
||||||
if data == 1:
|
if data == 1:
|
||||||
Logging.error("Last route was invalid.")
|
Logging.error("Last route was invalid.")
|
||||||
|
@ -163,7 +177,9 @@ class ServerPipe(Route):
|
||||||
color=Logging.LIGHT_YELLOW)
|
color=Logging.LIGHT_YELLOW)
|
||||||
handler.peers[id_].send(pack_pipe_dest_message(data, handler.id_), route)
|
handler.peers[id_].send(pack_pipe_dest_message(data, handler.id_), route)
|
||||||
else:
|
else:
|
||||||
Logging.error("'%s' is not present in peers." % id_)
|
handler.send({"unavaliable" : [id_]}, "meta")
|
||||||
|
if handler.debug:
|
||||||
|
Logging.error("'%s' is not present in peers." % id_)
|
||||||
|
|
||||||
|
|
||||||
class DummyPipe(Route):
|
class DummyPipe(Route):
|
||||||
|
@ -352,9 +368,8 @@ class Shared:
|
||||||
|
|
||||||
|
|
||||||
class Server(WebSocket, Shared):
|
class Server(WebSocket, Shared):
|
||||||
def setup(self, routes, websockets, piping=False, debug=False):
|
def setup(self, routes, websockets, debug=False):
|
||||||
if piping:
|
routes[PIPE_ROUTE] = ServerPipe()
|
||||||
routes[PIPE_ROUTE] = ServerPipe()
|
|
||||||
super().setup(routes, debug=debug)
|
super().setup(routes, debug=debug)
|
||||||
self.websockets = websockets
|
self.websockets = websockets
|
||||||
self._last_websockets = self.websockets.copy()
|
self._last_websockets = self.websockets.copy()
|
||||||
|
@ -380,22 +395,24 @@ class Server(WebSocket, Shared):
|
||||||
|
|
||||||
|
|
||||||
def post_opened(self):
|
def post_opened(self):
|
||||||
self.send(self.exchange_routes, META_ROUTE, indexed_dict=True)
|
self.send({"routes" : self.exchange_routes}, META_ROUTE)
|
||||||
|
|
||||||
|
|
||||||
class Client(WebSocketClient, Shared):
|
class Client(WebSocketClient, Shared):
|
||||||
def setup(self, routes, piping=False, debug=False):
|
def setup(self, routes, debug=False):
|
||||||
if piping:
|
routes[PIPE_ROUTE] = DummyPipe()
|
||||||
self.pipe = self.__pipe
|
|
||||||
routes[PIPE_ROUTE] = DummyPipe()
|
|
||||||
super().setup(routes, debug=debug)
|
super().setup(routes, debug=debug)
|
||||||
|
|
||||||
self.override_methods()
|
self.override_methods()
|
||||||
|
|
||||||
def __pipe(self, data, route, id_):
|
def pipe(self, data, route, id_, indexed_dict=False):
|
||||||
try:
|
try:
|
||||||
self.send(pack_pipe_src_message(data,
|
self.send(pack_pipe_src_message(data,
|
||||||
self.peer_reverse_exchange_routes[route], id_, debug=self.debug),
|
self.peer_reverse_exchange_routes[route], id_, debug=self.debug),
|
||||||
PIPE_ROUTE)
|
PIPE_ROUTE, indexed_dict=indexed_dict)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
Logging.warning("'%s' does not exist." % route)
|
Logging.warning("'%s' does not exist." % route)
|
||||||
|
|
||||||
|
|
||||||
|
def peer_unavaliable(self, peer, handler):
|
||||||
|
pass
|
||||||
|
|
Reference in a new issue