diff --git a/Wallaby/Wallaby.py b/Wallaby/Wallaby.py index 1f64a6f..4961bc2 100644 --- a/Wallaby/Wallaby.py +++ b/Wallaby/Wallaby.py @@ -13,6 +13,7 @@ from _thread import start_new_thread from ctypes import cdll import threading import json +import re from behem0th import Client as SyncClient from behem0th import EventHandler @@ -320,15 +321,66 @@ class Processes(Pipe): "processes", peer) +class Output(Pipe): + def __init__(self): + self.content = [] + self.handler = None + self.subscribers = [] + + self.escape_regex = re.compile(r"(\[\d+m?)") + + self.stdout = Output.Std(Logging.stdout, self.push) + self.stderr = Output.Std(Logging.stderr, self.push) + + Logging.stdout = self.stdout + Logging.stderr = self.stderr + + def run(self, data, peer, handler): + if data == "subscribe": + self.subscribers.append(peer) + elif data == "unsubscribe": + self.unsubscribe(peer) + + + def start(self, handler): + self.handler = handler + + + def push(self, s): + s = self.escape_regex.sub("", s) + for peer in self.subscribers: + self.handler.pipe(s, "output", peer) + + + def unsubscribe(self, peer): + if peer in self.subscribers: + del self.subscribers[self.subscribers.index(peer)] + + class Std: + def __init__(self, old_std, write_callback): + self.old_std = old_std + self.write_callback = write_callback + + + def write(self, s): + self.old_std.write(s) + self.write_callback(s) + + + def flush(self): + self.old_std.flush() + + class Handler(Client): def setup(self, routes, debug=False): - super().setup(routes, debug=debug) + super().setup(routes, debug_suppress_routes=["pipe"], debug=debug) def peer_unavaliable(self, peer): if self.debug: Logging.info("Unsubscribing '%s' from all sensor updates." % peer) self.routes["sensor"].sensor_readout.unsubscribe_all(peer) + self.routes["output"].unsubscribe(peer) if IS_WALLABY: @@ -343,6 +395,9 @@ else: if len(sys.argv) == 2: if os.path.exists(sys.argv[1]): PATH = os.path.abspath(sys.argv[1]) + else: + Logging.error("Location does not exist.") + exit(1) else: Logging.error("Location has to be provided in dev-env.") exit(1) @@ -377,7 +432,8 @@ try: "identify" : Identify(), "list_programs" : ListPrograms(), "whoami" : WhoAmI(), "run_program" : RunProgram(config.output_unbuffer), "stop_programs" : StopPrograms(), "shutdown" : Shutdown(), - "reboot" : Reboot(), "binary_received" : BinaryReceived()}, + "reboot" : Reboot(), "binary_received" : BinaryReceived(), + "output" : Output()}, debug=config.debug) ws.connect() sync_client = SyncClient(path=PATH, verbose_log=config.debug,