Removed building-blocks of old networking
This commit is contained in:
parent
2622fe8837
commit
422d262ddf
2 changed files with 0 additions and 207 deletions
|
@ -1,87 +0,0 @@
|
||||||
from ESock import ESock
|
|
||||||
import DataTypes
|
|
||||||
import Logging
|
|
||||||
|
|
||||||
import sys
|
|
||||||
from Utils import capture_trace
|
|
||||||
from Utils import is_socket_related_error
|
|
||||||
import socket
|
|
||||||
import _thread
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Server:
|
|
||||||
def __init__(self, host_port_pair, debug=False, compression_level=None):
|
|
||||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
try:
|
|
||||||
self.sock.bind(host_port_pair)
|
|
||||||
except OSError as e:
|
|
||||||
Logging.error(str(e))
|
|
||||||
exit(1)
|
|
||||||
self.sock.listen(2)
|
|
||||||
self.handlers = []
|
|
||||||
self.debug = debug
|
|
||||||
self.compression_level = compression_level
|
|
||||||
|
|
||||||
|
|
||||||
def run(self, handler, handler_args={}):
|
|
||||||
self.handler = handler
|
|
||||||
while 1:
|
|
||||||
sock, info = self.sock.accept()
|
|
||||||
if self.compression_level:
|
|
||||||
sock = ESock(sock, debug=self.debug, compression_level=self.compression_level)
|
|
||||||
else:
|
|
||||||
sock = ESock(sock, debug=self.debug)
|
|
||||||
|
|
||||||
handler = self.handler(sock, info, **handler_args)
|
|
||||||
self.handlers.append(handler)
|
|
||||||
_thread.start_new_thread(self.controller, (handler, ))
|
|
||||||
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
for handler in self.handlers:
|
|
||||||
self.attempt_graceful_close(handler, handler.sock)
|
|
||||||
|
|
||||||
|
|
||||||
def controller(self, handler):
|
|
||||||
while 1:
|
|
||||||
try:
|
|
||||||
data, route = handler.sock.recv()
|
|
||||||
handler.handle(data, route)
|
|
||||||
except Exception as e:
|
|
||||||
if not is_socket_related_error(e):
|
|
||||||
self.print_trace(handler.sock)
|
|
||||||
self.attempt_graceful_close(handler, handler.sock)
|
|
||||||
_thread.exit()
|
|
||||||
|
|
||||||
|
|
||||||
def print_trace(self, sock):
|
|
||||||
Logging.error("An unhandled exception forced the controller for '%s:%d' to terminate." % (sock.address, sock.port))
|
|
||||||
capture_trace()
|
|
||||||
|
|
||||||
|
|
||||||
def attempt_graceful_close(self, handler, sock):
|
|
||||||
try:
|
|
||||||
handler.finish()
|
|
||||||
except Exception:
|
|
||||||
self.print_trace(sock)
|
|
||||||
finally:
|
|
||||||
if handler in self.handlers:
|
|
||||||
del self.handlers[self.handlers.index(handler)]
|
|
||||||
handler.sock.close()
|
|
||||||
|
|
||||||
|
|
||||||
class Handler:
|
|
||||||
def __init__(self, sock, info, **kwargs):
|
|
||||||
self.sock = sock
|
|
||||||
self.info = info
|
|
||||||
self.setup(**kwargs)
|
|
||||||
|
|
||||||
def setup(self, **kwargs):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def handle(self, data):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def finish(self):
|
|
||||||
pass
|
|
120
Shared/ESock.py
120
Shared/ESock.py
|
@ -1,120 +0,0 @@
|
||||||
import json
|
|
||||||
import socket
|
|
||||||
import struct
|
|
||||||
import gzip
|
|
||||||
import DataTypes
|
|
||||||
import Logging
|
|
||||||
from Utils import capture_trace
|
|
||||||
|
|
||||||
MAX_ROUTE_LENGTH = 16
|
|
||||||
|
|
||||||
class ConvertFailedError(ValueError):
|
|
||||||
def __init__(self):
|
|
||||||
super(ValueError, self).__init__("conversion failed (false data type supplied)")
|
|
||||||
|
|
||||||
def convert_data(data, data_type):
|
|
||||||
try:
|
|
||||||
if data_type == DataTypes.str:
|
|
||||||
data = data.decode()
|
|
||||||
elif data_type == DataTypes.int:
|
|
||||||
data = int(data.decode())
|
|
||||||
elif data_type == DataTypes.float:
|
|
||||||
data = float(data.decode())
|
|
||||||
elif data_type == DataTypes.json:
|
|
||||||
data = json.loads(data.decode())
|
|
||||||
except Exception:
|
|
||||||
capture_trace()
|
|
||||||
raise ConvertFailedError()
|
|
||||||
return data
|
|
||||||
|
|
||||||
class ESock:
|
|
||||||
DATA_TYPES = {str : DataTypes.str, dict : DataTypes.json, list : DataTypes.json, bytes : DataTypes.bin, int : DataTypes.int, float : DataTypes.float}
|
|
||||||
|
|
||||||
def __init__(self, sock, debug=False, disconnect_callback=None, compression_level=0):
|
|
||||||
self._sock = sock
|
|
||||||
self.address, self.port = self._sock.getpeername()
|
|
||||||
self.debug = debug
|
|
||||||
self.disconnect_callback = disconnect_callback
|
|
||||||
self.compression_level = compression_level
|
|
||||||
if debug:
|
|
||||||
Logging.info("compression_level=%i" % compression_level)
|
|
||||||
|
|
||||||
|
|
||||||
def __getattr__(self, attr):
|
|
||||||
if attr == "recv":
|
|
||||||
return self.recv
|
|
||||||
elif attr == "send":
|
|
||||||
return self.send
|
|
||||||
elif attr == "_ESock__dict":
|
|
||||||
return self.__eq__
|
|
||||||
return getattr(self._sock, attr)
|
|
||||||
|
|
||||||
|
|
||||||
def recv(self):
|
|
||||||
raw_metadata = self._sock.recv(32)
|
|
||||||
if raw_metadata == b"":
|
|
||||||
self._sock.close()
|
|
||||||
raise socket.error("Connection closed")
|
|
||||||
try:
|
|
||||||
metadata = struct.unpack("cQ16s", raw_metadata)
|
|
||||||
except struct.error:
|
|
||||||
Logging.error("Invalid metadata layout: '%s:%d'" % (self.address, self.port))
|
|
||||||
if self.disconnect_callback != None:
|
|
||||||
self.disconnect_callback()
|
|
||||||
self._sock.close()
|
|
||||||
return None, ""
|
|
||||||
data_type = metadata[0]
|
|
||||||
data_length = metadata[1]
|
|
||||||
route = metadata[2].rstrip(b"\x00").decode()
|
|
||||||
data = b''
|
|
||||||
bufsize = 4096
|
|
||||||
while len(data) < data_length:
|
|
||||||
if len(data) + bufsize <= data_length:
|
|
||||||
packet = self._sock.recv(bufsize)
|
|
||||||
else:
|
|
||||||
packet = self._sock.recv(data_length % bufsize)
|
|
||||||
if not packet:
|
|
||||||
return None
|
|
||||||
data += packet
|
|
||||||
try:
|
|
||||||
data = convert_data(gzip.decompress(data), data_type)
|
|
||||||
except ConvertFailedError:
|
|
||||||
Logging.error("Invalid data type: '%s' ('%s:%d')" % (data_type.decode(), self.address, self.port))
|
|
||||||
if self.disconnect_callback != None:
|
|
||||||
self.disconnect_callback()
|
|
||||||
self._sock.close()
|
|
||||||
return None, ""
|
|
||||||
if self.debug:
|
|
||||||
data_repr = str(data).replace("\n", " ")
|
|
||||||
if len(data_repr) > 80:
|
|
||||||
data_repr = data_repr[:80] + "..."
|
|
||||||
Logging.info("Received %d-long '%s' on route '%s': %s (%s:%d)" % (len(data), type(data).__name__, route, data_repr, self.address, self.port))
|
|
||||||
return data, route
|
|
||||||
|
|
||||||
|
|
||||||
def send(self, data, route=""):
|
|
||||||
length = len(data)
|
|
||||||
original_data_type = type(data)
|
|
||||||
if self.debug:
|
|
||||||
data_repr = str(data).replace("\n", " ")
|
|
||||||
if len(data_repr) > 80:
|
|
||||||
data_repr = data_repr[:80] + "..."
|
|
||||||
Logging.info("Sending %d-long '%s' on route '%s': %s (%s:%d)" % (length, original_data_type.__name__, route, data_repr, self.address, self.port))
|
|
||||||
route = route.encode()
|
|
||||||
if original_data_type in ESock.DATA_TYPES:
|
|
||||||
data_type = ESock.DATA_TYPES[original_data_type]
|
|
||||||
else:
|
|
||||||
data_type = DataTypes.other
|
|
||||||
if original_data_type is str:
|
|
||||||
data = data.encode()
|
|
||||||
elif original_data_type is dict or original_data_type is list:
|
|
||||||
data = json.dumps(data, separators=(',',':')).encode()
|
|
||||||
|
|
||||||
data = gzip.compress(data, self.compression_level)
|
|
||||||
self.sendall(struct.pack("cQ16s", data_type, len(data), route) + data)
|
|
||||||
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
if type(other) is ESock:
|
|
||||||
return self._sock is other._sock
|
|
||||||
return False
|
|
Reference in a new issue