Removed piping from parameters, peers now subscription based
Peers can now be subscribed to. Simply getting the current state is still supported.
This commit is contained in:
parent
2ca28cba4f
commit
d3ac8cf64c
1 changed files with 65 additions and 12 deletions
|
@ -99,23 +99,43 @@ class Subscribe(Route):
|
||||||
"Unknown (will not subscribe to broadcast)"))
|
"Unknown (will not subscribe to broadcast)"))
|
||||||
if "name" in data:
|
if "name" in data:
|
||||||
handler.name = data["name"]
|
handler.name = data["name"]
|
||||||
|
handler.routes["peers"].push_changes(handler)
|
||||||
|
|
||||||
|
|
||||||
class Peers(Route):
|
class Peers(Route):
|
||||||
|
"""
|
||||||
|
{"subscribe" : [1, 2]}
|
||||||
|
{"unsubscribe" : [1, 2]}
|
||||||
|
{"channels" : [1, 2]}
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
self.subscriptions = {}
|
||||||
|
|
||||||
def run(self, data, handler):
|
def run(self, data, handler):
|
||||||
|
for event in ("subscribe", "unsubscribe", "channels"):
|
||||||
|
if event in data:
|
||||||
|
channels = []
|
||||||
|
for channel in data[event]:
|
||||||
|
if channel in Subscribe.CHANNELS:
|
||||||
|
channels.append(channel)
|
||||||
|
if event == "unsubscribe":
|
||||||
|
for channel in channels:
|
||||||
|
self.unsubscribe(handler, channel)
|
||||||
|
else:
|
||||||
|
if event == "subscribe":
|
||||||
|
for channel in channels:
|
||||||
|
self.subscribe(handler, channel)
|
||||||
|
# Send on channels and on subscribe
|
||||||
|
self.send_connected_peers(handler, channels)
|
||||||
|
|
||||||
|
|
||||||
|
def send_connected_peers(self, handler, channels):
|
||||||
out = {}
|
out = {}
|
||||||
check_type = False
|
|
||||||
if type(data) is dict:
|
|
||||||
if "channel" in data:
|
|
||||||
check_type = True
|
|
||||||
# We can use the in keyword this way
|
|
||||||
if type(data["channel"]) is int:
|
|
||||||
data["channel"] = (data["channel"], )
|
|
||||||
peers = handler.peers
|
peers = handler.peers
|
||||||
for peer_id in peers:
|
for peer_id in peers:
|
||||||
# Only check for type inclusion if check_type is True
|
# Only check for type inclusion if check_type is True
|
||||||
if not check_type or peers[peer_id].channel in data["channel"]:
|
peer = peers[peer_id]
|
||||||
peer = peers[peer_id]
|
if peer.channel in channels:
|
||||||
if peer is not handler:
|
if peer is not handler:
|
||||||
out[peer_id] = {"name" : peer.name,
|
out[peer_id] = {"name" : peer.name,
|
||||||
"address" : peer.address, "port" : peer.port,
|
"address" : peer.address, "port" : peer.port,
|
||||||
|
@ -123,9 +143,41 @@ class Peers(Route):
|
||||||
handler.send(out, handler.reverse_routes[self])
|
handler.send(out, handler.reverse_routes[self])
|
||||||
|
|
||||||
|
|
||||||
|
def subscribe(self, handler, channel):
|
||||||
|
if handler not in self.subscriptions:
|
||||||
|
self.subscriptions[handler] = [channel]
|
||||||
|
else:
|
||||||
|
if channel not in self.subscriptions[handler]:
|
||||||
|
self.subscriptions[handler].append(channel)
|
||||||
|
|
||||||
|
|
||||||
|
def unsubscribe(self, handler, channel):
|
||||||
|
if handler in self.subscriptions:
|
||||||
|
if channel in self.subscriptions[handler]:
|
||||||
|
del self.subscriptions[handler][self.subscriptions[handler].index(channel)]
|
||||||
|
|
||||||
|
|
||||||
|
def unsubscribe_all(self, handler):
|
||||||
|
if handler in self.subscriptions:
|
||||||
|
del self.subscriptions[handler]
|
||||||
|
|
||||||
|
|
||||||
|
def push_changes(self, handler):
|
||||||
|
out = {}
|
||||||
|
to_unsubscribe = []
|
||||||
|
peers = handler.peers
|
||||||
|
for handler_ in self.subscriptions:
|
||||||
|
try:
|
||||||
|
self.send_connected_peers(handler_, self.subscriptions[handler_])
|
||||||
|
except RuntimeError:
|
||||||
|
to_unsubscribe.append(handler_)
|
||||||
|
for handler in to_unsubscribe:
|
||||||
|
self.unsubscribe_all(handler)
|
||||||
|
|
||||||
|
|
||||||
class Handler(Server):
|
class Handler(Server):
|
||||||
def setup(self, routes, broadcast, websockets, debug=False):
|
def setup(self, routes, broadcast, websockets, debug=False):
|
||||||
super().setup(routes, websockets, piping=True, debug=debug)
|
super().setup(routes, websockets, debug=debug)
|
||||||
self.broadcast = broadcast
|
self.broadcast = broadcast
|
||||||
self.channel = None
|
self.channel = None
|
||||||
self.name = "Unknown"
|
self.name = "Unknown"
|
||||||
|
@ -141,7 +193,7 @@ class Handler(Server):
|
||||||
self.broadcast.remove(self, self.channel)
|
self.broadcast.remove(self, self.channel)
|
||||||
if self.debug:
|
if self.debug:
|
||||||
Logging.info("'%s:%d' disconnected." % (self.address, self.port))
|
Logging.info("'%s:%d' disconnected." % (self.address, self.port))
|
||||||
|
self.routes["peers"].push_changes(self)
|
||||||
|
|
||||||
|
|
||||||
def folder_validator(folder):
|
def folder_validator(folder):
|
||||||
|
@ -195,6 +247,7 @@ server.set_app(WebSocketWSGIApplication(handler_cls=Handler,
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
Logging.header("Server loop starting.")
|
||||||
server.serve_forever()
|
server.serve_forever()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
Logging.header("Gracefully shutting down server.")
|
Logging.header("Gracefully shutting down server.")
|
||||||
|
|
Reference in a new issue