Experimental Highway compatibility

This commit is contained in:
Philip Trauner 2016-09-18 19:49:18 +02:00
parent a583d87f6a
commit 058bc81b99

View file

@ -1,7 +1,6 @@
import Logging
import Routing
import Config
import DataTypes
#from .AsyncServer import Server
from .Broadcast import Broadcast
@ -15,10 +14,11 @@ import struct
from subprocess import Popen, PIPE
from wsgiref.simple_server import make_server
from ws4py.websocket import WebSocket
from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler
from ws4py.server.wsgiutils import WebSocketWSGIApplication
from Highway import Server
"""
WALLABY_SYNC_ROUTE = "w_sync"
SUBLIME_SYNC_ROUTE = "s_sync"
@ -26,7 +26,7 @@ SUBLIME_SYNC_ROUTE = "s_sync"
class Info(Routing.ServerRoute):
def run(self, data, handler):
handler.sock.send({"editor" : len(handler.broadcast.channels[Handler.Channels.EDITOR]),
handler.send({"editor" : len(handler.broadcast.channels[Handler.Channels.EDITOR]),
"wallaby" : len(handler.broadcast.channels[Handler.Channels.WALLABY]),
"routes" : list(handler.routes.keys())}, "info")
@ -38,10 +38,10 @@ class StdStream(Routing.ServerRoute):
def run(self, data, handler):
if type(data) is str:
if handler in self.stream_to.keys():
self.stream_to[handler].sock.send(data, "std_stream")
self.stream_to[handler].send(data, "std_stream")
elif type(data) is dict:
if handler in self.stream_to.keys():
self.stream_to[handler].sock.send(data, "std_stream")
self.stream_to[handler].send(data, "std_stream")
del self.stream_to[handler]
@ -63,13 +63,13 @@ class WallabyControl(Routing.ServerRoute):
if data == "list_wallaby_controllers":
wallaby_controllers = {}
for wallaby_handler in handler.broadcast.channels[Handler.Channels.WALLABY]:
wallaby_controllers["%s:%d" % (wallaby_handler.sock.address, wallaby_handler.sock.port)] = wallaby_handler.name
handler.sock.send({"wallaby_controllers" : wallaby_controllers}, "wallaby_control")
wallaby_controllers["%s:%d" % (wallaby_handler.address, wallaby_handler.port)] = wallaby_handler.name
handler.send({"wallaby_controllers" : wallaby_controllers}, "wallaby_control")
elif data == "list_programs":
handler.sock.send({"programs" : self.programs}, "wallaby_control")
handler.send({"programs" : self.programs}, "wallaby_control")
elif type(data) is dict:
for wallaby_handler in handler.broadcast.channels[Handler.Channels.WALLABY]:
address_pair = "%s:%d" % (wallaby_handler.sock.address, wallaby_handler.sock.port)
address_pair = "%s:%d" % (wallaby_handler.address, wallaby_handler.port)
if address_pair in data.keys():
if type(data[address_pair]) is list:
for action in data[address_pair]:
@ -80,7 +80,7 @@ class WallabyControl(Routing.ServerRoute):
if action in self.actions_with_params.keys():
self.actions_with_params[action](data[address_pair][action], wallaby_handler, handler)
return
handler.sock.send("Wallaby not connected anymore.", "error_report")
handler.send("Wallaby not connected anymore.", "error_report")
def disconnect(self, wallaby_handler, handler):
@ -88,20 +88,20 @@ class WallabyControl(Routing.ServerRoute):
def reboot(self, wallaby_handler, handler):
wallaby_handler.sock.send("reboot", "wallaby_control")
wallaby_handler.send("reboot", "wallaby_control")
def shutdown(self, wallaby_handler, handler):
wallaby_handler.sock.send("shutdown", "wallaby_control")
wallaby_handler.send("shutdown", "wallaby_control")
def run_program(self, program, wallaby_handler, handler):
handler.routes["std_stream"].stream_to.update({wallaby_handler : handler})
wallaby_handler.sock.send({"run" : program}, "wallaby_control")
wallaby_handler.send({"run" : program}, "wallaby_control")
def stop_programs(self, wallaby_handler, handler):
wallaby_handler.sock.send("stop", "wallaby_control")
wallaby_handler.send("stop", "wallaby_control")
class Compile(Routing.ServerRoute):
@ -142,7 +142,7 @@ class Compile(Routing.ServerRoute):
for line in p.communicate():
result += line.decode()
if handler != None:
handler.sock.send({"failed" : error, "returned" : result, "relpath" : relpath}, self.route)
handler.send({"failed" : error, "returned" : result, "relpath" : relpath}, self.route)
@ -168,100 +168,31 @@ class GetInfo(Routing.ServerRoute):
"Unknown (will not subscribe to broadcast)"))
def start(self, handler):
handler.sock.send("", self.route)
handler.send("", self.route)
class Handler(WebSocket):
DATA_TYPES = {str : DataTypes.str, dict : DataTypes.json,
list : DataTypes.json, bytes : DataTypes.bin,
int : DataTypes.int, float : DataTypes.float}
INVALID_ROUTE = 1
INVALID_METADATA_LAYOUT = 2
INVALID_DATA_TYPE = 3
class Handler(Server):
class Channels:
EDITOR = 1
WALLABY = 2
def setup(self, routes, broadcast, compression_level):
Logging.info("Handler for '%s:%d' initalised." % (self.sock.address, self.sock.port))
def setup(self, routes, broadcast, compression_level, debug=False):
super().setup(routes, compression_level, debug=debug)
Logging.info("Handler for '%s:%d' initalised." % (self.address, self.port))
self.broadcast = broadcast
self.compression_level = compression_level
self.channel = None
self.routes = Routing.create_routes(routes, self)
self.exchange_routes = Routing.create_exchange_map(routes)
self.name = "Unknown"
self.raw_send = self.send
self.send = self.patched_send
def patched_send(self, data, route):
length = len(data)
original_data_type = type(data)
if self.debug:
data_repr = str(data).replace("\n", " ")
if len(data_repr) > 80:
data_repr = data_repr[:80] + "..."
Logging.info("Sending %d-long '%s' on route '%s': %s (%s:%d)" %
(length, original_data_type.__name__, route, data_repr, self.address, self.port))
route = self.exchange_routes[route]
if original_data_type in ESock.DATA_TYPES:
data_type = ESock.DATA_TYPES[original_data_type]
else:
data_type = DataTypes.other
if original_data_type is str:
data = data.encode()
elif original_data_type is dict or original_data_type is list:
data = json.dumps(data, separators=(',',':')).encode()
data = gzip.compress(data, self.compression_level)
self.raw_send(struct.pack("ch", data_type, route) + data)
def opened(self):
self.send(self.exchange_routes, "meta")
def received_message(self, message):
try:
metadata = struct.unpack("ch", message[:4])
except struct.error:
Logging.error("Invalid metadata layout: '%s:%d'" % (self.address, self.port))
self.send(Handler.INVALID_METADATA_LAYOUT, "meta")
self.close()
return
data_type = metadata[0]
try:
route = self.routes[self.exchange_routes[metadata[1]]]
except KeyError:
self.send(Handler.INVALID_ROUTE, "meta")
self.close()
return
data = message[4:]
try:
data = convert_data(gzip.decompress(data), data_type)
except ConvertFailedError:
Logging.error("Invalid data type: '%s' ('%s:%d')" % (data_type.decode(), self.address, self.port))
self.send(Handler.INVALID_DATA_TYPE, "meta")
self.close()
if self.debug:
data_repr = str(data).replace("\n", " ")
if len(data_repr) > 80:
data_repr = data_repr[:80] + "..."
Logging.info("Received %d-long '%s' on route '%s': %s (%s:%d)" % (len(data), type(data).__name__, route, data_repr, self.address, self.port))
self.routes[route].run(data, self)
def closed(self):
def closed(self, code, reason):
if self.channel != None:
self.broadcast.remove(self, self.channel)
Logging.info("'%s:%d' disconnected." % (self.sock.address, self.sock.port))
Logging.info("'%s:%d' disconnected." % (self.address, self.port))
"""
def __repr__(self):
return "%s: %s:%d" % (self.name, self.sock.address, self.sock.port)
return "%s: %s:%d" % (self.name, self.address, self.port)
"""
def folder_validator(folder):
@ -308,7 +239,8 @@ s_sync = SyncServer(config.source_path, Handler.Channels.SUBLIME, debug=config.d
server = make_server(config.server_address[0], config.server_address[1],
server_class=WSGIServer, handler_class=WebSocketWSGIRequestHandler,
app=WebSocketWSGIApplication(handler_cls=Handler,
handler_args={"broadcast" : broadcast, "compression_level" : config.compression_level,
handler_args={"debug" : config.debug, "broadcast" : broadcast,
"compression_level" : config.compression_level,
"routes" : {"info" : Info(), #"wallaby_control" : WallabyControl(),
"get_info" : GetInfo(), "compile" : compile,
"std_stream" : StdStream()}}))