Inital rework for switch to WebSocket
This commit is contained in:
parent
b9454f2865
commit
ad37319f1f
1 changed files with 113 additions and 23 deletions
136
Server/Server.py
136
Server/Server.py
|
@ -1,10 +1,11 @@
|
|||
import Logging
|
||||
import Routing
|
||||
import Config
|
||||
import DataTypes
|
||||
|
||||
from Sync import SyncServer
|
||||
|
||||
from .AsyncServer import Server
|
||||
#from .AsyncServer import Server
|
||||
from .Broadcast import Broadcast
|
||||
|
||||
import json
|
||||
|
@ -12,19 +13,24 @@ import os
|
|||
import subprocess
|
||||
import re
|
||||
import platform
|
||||
import struct
|
||||
from subprocess import Popen, PIPE
|
||||
|
||||
from wsgiref.simple_server import make_server
|
||||
from ws4py.websocket import WebSocket
|
||||
from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler
|
||||
from ws4py.server.wsgiutils import WebSocketWSGIApplication
|
||||
|
||||
"""
|
||||
WALLABY_SYNC_ROUTE = "w_sync"
|
||||
SUBLIME_SYNC_ROUTE = "s_sync"
|
||||
|
||||
"""
|
||||
|
||||
class Info(Routing.ServerRoute):
|
||||
def run(self, data, handler):
|
||||
handler.sock.send("Connected ST clients: %d\nConnected Wallabies: %d\nAvaliable routes: %s" % (
|
||||
len(handler.broadcast.channels[Handler.Channels.SUBLIME]),
|
||||
len(handler.broadcast.channels[Handler.Channels.WALLABY]),
|
||||
", ".join([route for route in list(handler.routes.keys())])), "info")
|
||||
handler.sock.send({"editor" : len(handler.broadcast.channels[Handler.Channels.EDITOR]),
|
||||
"wallaby" : len(handler.broadcast.channels[Handler.Channels.WALLABY]),
|
||||
"routes" : list(handler.routes.keys())}, "info")
|
||||
|
||||
|
||||
class StdStream(Routing.ServerRoute):
|
||||
|
@ -145,13 +151,13 @@ class Compile(Routing.ServerRoute):
|
|||
class GetInfo(Routing.ServerRoute):
|
||||
REQUIRED = [Routing.ROUTE]
|
||||
|
||||
SUBLIME_TEXT = "s"
|
||||
EDITOR = "e"
|
||||
WALLABY = "w"
|
||||
|
||||
def run(self, data, handler):
|
||||
if "type" in data:
|
||||
if data["type"] == GetInfo.SUBLIME_TEXT:
|
||||
handler.channel = Handler.Channels.SUBLIME
|
||||
if data["type"] == GetInfo.EDITOR:
|
||||
handler.channel = Handler.Channels.EDITOR
|
||||
handler.broadcast.add(handler, handler.channel)
|
||||
elif data["type"] == GetInfo.WALLABY:
|
||||
handler.channel = Handler.Channels.WALLABY
|
||||
|
@ -159,7 +165,7 @@ class GetInfo(Routing.ServerRoute):
|
|||
if "name" in data:
|
||||
handler.name = data["name"]
|
||||
Logging.info("'%s:%d' has identified as a %s client." % (handler.info[0], handler.info[1],
|
||||
"Sublime Text" if handler.channel == Handler.Channels.SUBLIME else
|
||||
"Editor" if handler.channel == Handler.Channels.EDITOR else
|
||||
"Wallaby" if handler.channel == Handler.Channels.WALLABY else
|
||||
"Unknown (will not subscribe to broadcast)"))
|
||||
|
||||
|
@ -167,27 +173,91 @@ class GetInfo(Routing.ServerRoute):
|
|||
handler.sock.send("", self.route)
|
||||
|
||||
|
||||
class Handler(Server.Handler):
|
||||
class Handler(WebSocket):
|
||||
DATA_TYPES = {str : DataTypes.str, dict : DataTypes.json,
|
||||
list : DataTypes.json, bytes : DataTypes.bin,
|
||||
int : DataTypes.int, float : DataTypes.float}
|
||||
|
||||
INVALID_ROUTE = 1
|
||||
INVALID_METADATA_LAYOUT = 2
|
||||
INVALID_DATA_TYPE
|
||||
|
||||
class Channels:
|
||||
SUBLIME = 1
|
||||
EDITOR = 1
|
||||
WALLABY = 2
|
||||
|
||||
def setup(self, routes, broadcast):
|
||||
def setup(self, routes, broadcast, compression_level):
|
||||
Logging.info("Handler for '%s:%d' initalised." % (self.sock.address, self.sock.port))
|
||||
self.broadcast = broadcast
|
||||
self.compression_level = compression_level
|
||||
self.channel = None
|
||||
self.routes = Routing.create_routes(routes, self)
|
||||
self.exchange_routes = Routing.create_exchange_map(routes)
|
||||
self.name = "Unknown"
|
||||
self.raw_send = self.send
|
||||
self.send = self.patched_send
|
||||
|
||||
|
||||
def handle(self, data, route):
|
||||
if route in self.routes:
|
||||
self.routes[route].run(data, self)
|
||||
def patched_send(self, data, route):
|
||||
length = len(data)
|
||||
original_data_type = type(data)
|
||||
if self.debug:
|
||||
data_repr = str(data).replace("\n", " ")
|
||||
if len(data_repr) > 80:
|
||||
data_repr = data_repr[:80] + "..."
|
||||
Logging.info("Sending %d-long '%s' on route '%s': %s (%s:%d)" %
|
||||
(length, original_data_type.__name__, route, data_repr, self.address, self.port))
|
||||
route = self.exchange_routes[route]
|
||||
if original_data_type in ESock.DATA_TYPES:
|
||||
data_type = ESock.DATA_TYPES[original_data_type]
|
||||
else:
|
||||
self.sock.send("Invalid route '%s'" % route, "error_report")
|
||||
data_type = DataTypes.other
|
||||
if original_data_type is str:
|
||||
data = data.encode()
|
||||
elif original_data_type is dict or original_data_type is list:
|
||||
data = json.dumps(data, separators=(',',':')).encode()
|
||||
|
||||
data = gzip.compress(data, self.compression_level)
|
||||
self.raw_send(struct.pack("ch", data_type, route) + data)
|
||||
|
||||
|
||||
def finish(self):
|
||||
def opened(self):
|
||||
self.send(self.exchange_routes, "meta")
|
||||
|
||||
|
||||
def received_message(self, message):
|
||||
try:
|
||||
metadata = struct.unpack("ch", message[:4])
|
||||
except struct.error:
|
||||
Logging.error("Invalid metadata layout: '%s:%d'" % (self.address, self.port))
|
||||
self.send(Handler.INVALID_METADATA_LAYOUT, "meta")
|
||||
self.close()
|
||||
return
|
||||
data_type = metadata[0]
|
||||
try:
|
||||
route = self.routes[self.exchange_routes[metadata[1]]]
|
||||
except KeyError:
|
||||
self.send(Handler.INVALID_ROUTE, "meta")
|
||||
self.close()
|
||||
return
|
||||
data = message[4:]
|
||||
try:
|
||||
data = convert_data(gzip.decompress(data), data_type)
|
||||
except ConvertFailedError:
|
||||
Logging.error("Invalid data type: '%s' ('%s:%d')" % (data_type.decode(), self.address, self.port))
|
||||
self.send(Handler.INVALID_DATA_TYPE, "meta")
|
||||
self.close()
|
||||
if self.debug:
|
||||
data_repr = str(data).replace("\n", " ")
|
||||
if len(data_repr) > 80:
|
||||
data_repr = data_repr[:80] + "..."
|
||||
Logging.info("Received %d-long '%s' on route '%s': %s (%s:%d)" % (len(data), type(data).__name__, route, data_repr, self.address, self.port))
|
||||
# Route needs to be reintroduced
|
||||
self.routes[route].run(data, self)
|
||||
|
||||
|
||||
|
||||
|
||||
def closed(self):
|
||||
if self.channel != None:
|
||||
self.broadcast.remove(self, self.channel)
|
||||
Logging.info("'%s:%d' disconnected." % (self.sock.address, self.sock.port))
|
||||
|
@ -221,7 +291,10 @@ except FileNotFoundError:
|
|||
config.write_to_file(CONFIG_PATH)
|
||||
config = config.read_from_file(CONFIG_PATH)
|
||||
|
||||
server = Server(config.server_address, debug=config.debug, compression_level=config.compression_level)
|
||||
|
||||
|
||||
|
||||
#server = Server(config.server_address, debug=config.debug, compression_level=config.compression_level)
|
||||
|
||||
broadcast = Broadcast()
|
||||
# Populating broadcast channels with all channels defined in Handler.Channels
|
||||
|
@ -230,20 +303,37 @@ for channel in Handler.Channels.__dict__:
|
|||
broadcast.add_channel(Handler.Channels.__dict__[channel])
|
||||
|
||||
compile = Compile(config.source_path, config.binary_path)
|
||||
"""
|
||||
w_sync = SyncServer(config.binary_path, Handler.Channels.WALLABY, debug=config.debug, deleted_db_path="deleted-w.pickle")
|
||||
s_sync = SyncServer(config.source_path, Handler.Channels.SUBLIME, debug=config.debug, deleted_db_path="deleted-s.pickle", modified_hook=compile.compile)
|
||||
"""
|
||||
|
||||
server = make_server(config.server_address[0], config.server_address[1],
|
||||
server_class=WSGIServer, handler_class=WebSocketWSGIRequestHandler,
|
||||
app=WebSocketWSGIApplication(handler_cls=Handler,
|
||||
handler_args={"broadcast" : broadcast, "compression_level" : config.compression_level,
|
||||
"routes" : {"info" : Info(), #"wallaby_control" : WallabyControl(),
|
||||
"get_info" : GetInfo(), "compile" : compile,
|
||||
"std_stream" : StdStream()}}))
|
||||
server.initialize_websockets_manager()
|
||||
|
||||
try:
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
Logging.header("Gracefully shutting down server.")
|
||||
server.server_close()
|
||||
Logging.success("Server shutdown successful.")
|
||||
"""
|
||||
try:
|
||||
Logging.header("fl0w server started on '%s:%d'" % (config.server_address[0], config.server_address[1]))
|
||||
server.run(Handler,
|
||||
{"broadcast" : broadcast,
|
||||
"routes" : {"info" : Info(), "wallaby_control" : WallabyControl(),
|
||||
"get_info" : GetInfo(), "compile" : compile, "std_stream" : StdStream(),
|
||||
WALLABY_SYNC_ROUTE : w_sync,
|
||||
SUBLIME_SYNC_ROUTE : s_sync}})
|
||||
"get_info" : GetInfo(), "compile" : compile, "std_stream" : StdStream()}})
|
||||
except KeyboardInterrupt:
|
||||
Logging.header("Gracefully shutting down server.")
|
||||
w_sync.stop()
|
||||
s_sync.stop()
|
||||
server.stop()
|
||||
Logging.success("Server shutdown successful.")
|
||||
"""
|
Reference in a new issue