Added output streaming
stdout as well as stderr can now be subscribed to. ANSI control char filtering is still wip
This commit is contained in:
parent
1a918b5843
commit
526695e209
1 changed files with 58 additions and 2 deletions
|
@ -13,6 +13,7 @@ from _thread import start_new_thread
|
|||
from ctypes import cdll
|
||||
import threading
|
||||
import json
|
||||
import re
|
||||
|
||||
from behem0th import Client as SyncClient
|
||||
from behem0th import EventHandler
|
||||
|
@ -320,15 +321,66 @@ class Processes(Pipe):
|
|||
"processes", peer)
|
||||
|
||||
|
||||
class Output(Pipe):
|
||||
def __init__(self):
|
||||
self.content = []
|
||||
self.handler = None
|
||||
self.subscribers = []
|
||||
|
||||
self.escape_regex = re.compile(r"(\[\d+m?)")
|
||||
|
||||
self.stdout = Output.Std(Logging.stdout, self.push)
|
||||
self.stderr = Output.Std(Logging.stderr, self.push)
|
||||
|
||||
Logging.stdout = self.stdout
|
||||
Logging.stderr = self.stderr
|
||||
|
||||
def run(self, data, peer, handler):
|
||||
if data == "subscribe":
|
||||
self.subscribers.append(peer)
|
||||
elif data == "unsubscribe":
|
||||
self.unsubscribe(peer)
|
||||
|
||||
|
||||
def start(self, handler):
|
||||
self.handler = handler
|
||||
|
||||
|
||||
def push(self, s):
|
||||
s = self.escape_regex.sub("", s)
|
||||
for peer in self.subscribers:
|
||||
self.handler.pipe(s, "output", peer)
|
||||
|
||||
|
||||
def unsubscribe(self, peer):
|
||||
if peer in self.subscribers:
|
||||
del self.subscribers[self.subscribers.index(peer)]
|
||||
|
||||
class Std:
|
||||
def __init__(self, old_std, write_callback):
|
||||
self.old_std = old_std
|
||||
self.write_callback = write_callback
|
||||
|
||||
|
||||
def write(self, s):
|
||||
self.old_std.write(s)
|
||||
self.write_callback(s)
|
||||
|
||||
|
||||
def flush(self):
|
||||
self.old_std.flush()
|
||||
|
||||
|
||||
class Handler(Client):
|
||||
def setup(self, routes, debug=False):
|
||||
super().setup(routes, debug=debug)
|
||||
super().setup(routes, debug_suppress_routes=["pipe"], debug=debug)
|
||||
|
||||
|
||||
def peer_unavaliable(self, peer):
|
||||
if self.debug:
|
||||
Logging.info("Unsubscribing '%s' from all sensor updates." % peer)
|
||||
self.routes["sensor"].sensor_readout.unsubscribe_all(peer)
|
||||
self.routes["output"].unsubscribe(peer)
|
||||
|
||||
|
||||
if IS_WALLABY:
|
||||
|
@ -343,6 +395,9 @@ else:
|
|||
if len(sys.argv) == 2:
|
||||
if os.path.exists(sys.argv[1]):
|
||||
PATH = os.path.abspath(sys.argv[1])
|
||||
else:
|
||||
Logging.error("Location does not exist.")
|
||||
exit(1)
|
||||
else:
|
||||
Logging.error("Location has to be provided in dev-env.")
|
||||
exit(1)
|
||||
|
@ -377,7 +432,8 @@ try:
|
|||
"identify" : Identify(), "list_programs" : ListPrograms(),
|
||||
"whoami" : WhoAmI(), "run_program" : RunProgram(config.output_unbuffer),
|
||||
"stop_programs" : StopPrograms(), "shutdown" : Shutdown(),
|
||||
"reboot" : Reboot(), "binary_received" : BinaryReceived()},
|
||||
"reboot" : Reboot(), "binary_received" : BinaryReceived(),
|
||||
"output" : Output()},
|
||||
debug=config.debug)
|
||||
ws.connect()
|
||||
sync_client = SyncClient(path=PATH, verbose_log=config.debug,
|
||||
|
|
Reference in a new issue