From f9ccbd9ae2f79fa4de6325142e6b1158a6f74668 Mon Sep 17 00:00:00 2001 From: Philip Trauner Date: Mon, 23 Jan 2017 09:20:59 +0100 Subject: [PATCH] Added sensor locking, stop_programs, shutdown, reboot, made run_program async Sensor locking: This fixes the iteration errors that occured during polling in case of subscrive/unsubscribe events. Pipes for shutdown, reboot. Pipe to stop all botball programs. run_program blocked all further communication during program runtime, fixed now. --- Wallaby/Wallaby.py | 104 +++++++++++++++++++-------------------------- 1 file changed, 44 insertions(+), 60 deletions(-) diff --git a/Wallaby/Wallaby.py b/Wallaby/Wallaby.py index 7c17181..5ecae34 100644 --- a/Wallaby/Wallaby.py +++ b/Wallaby/Wallaby.py @@ -10,6 +10,7 @@ import sys import subprocess from random import randint from _thread import start_new_thread +import threading CHANNEL = 2 IS_WALLABY = Utils.is_wallaby() @@ -42,6 +43,7 @@ class SensorReadout: def __init__(self, handler, poll_rate=0.2): self.poll_rate = poll_rate self.handler = handler + self.peer_lock = threading.Lock() self.peers = {} self.readout_required = {SensorReadout.ANALOG : [], SensorReadout.DIGITAL : []} @@ -69,32 +71,40 @@ class SensorReadout: if port not in self.readout_required[mode]: self.readout_required[mode].append(port) if not peer in self.peers: + self.peer_lock.acquire() self.peers[peer] = {SensorReadout.ANALOG : [], SensorReadout.DIGITAL : []} + self.peer_lock.release() self.peers[peer][mode].append(port) def unsubscribe(self, port, mode, peer): if peer in self.peers: if port in self.peers[peer][mode]: + self.peer_lock.acquire() del self.peers[peer][mode][self.peers[peer][mode].index(port)] + self.peer_lock.release() self.determine_required_readouts() def determine_required_readouts(self): readout_required = {SensorReadout.ANALOG : [], SensorReadout.DIGITAL : []} + self.peer_lock.acquire() for peer in self.peers: for mode in (SensorReadout.ANALOG, SensorReadout.DIGITAL): for port in self.peers[peer][mode]: if not port in readout_required[mode]: readout_required[mode].append(port) + self.peer_lock.release() self.readout_required = readout_required def unsubscribe_all(self, peer): if peer in self.peers: + self.peer_lock.acquire() del self.peers[peer] + self.peer_lock.release() self.determine_required_readouts() @@ -120,6 +130,7 @@ class SensorReadout: for mode in SensorReadout.MODES: for port in self.readout_required[mode]: current_values[mode][port] = self.get_sensor_value(port, mode) + self.peer_lock.acquire() for peer in self.peers: readouts = 0 response = {"analog" : {}, "digital" : {}} @@ -129,6 +140,7 @@ class SensorReadout: readouts += 1 if readouts != 0: self.handler.pipe(response, "sensor", peer) + self.peer_lock.release() time.sleep(self.poll_rate) @staticmethod @@ -160,6 +172,14 @@ class ListPrograms(Pipe): handler.pipe(programs, handler.reverse_routes[self], peer) +class StopPrograms(Pipe): + def run(self, data, peer, handler): + if handler.debug: + Logging.info("Stopping all botball programs.") + output = subprocess.call(["killall", "botball_user_program"], + stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) + + class RunProgram(Pipe): def __init__(self, output_unbuffer): self.command = [output_unbuffer, "-i0", "-o0", "-e0"] @@ -173,17 +193,18 @@ class RunProgram(Pipe): if os.path.isfile(path): program = subprocess.Popen(self.command + [path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - # Poll process for new output until finished - for line in iter(program.stdout.readline, b""): - handler.pipe(line.decode(), "std_stream", peer) + start_new_thread(self.stream_stdout, (program, peer, handler)) else: if handler.debug: - Logging.warning("Program '%s' not found." % data) + Logging.warning("Program '%s' not found." % data[:-1]) - -class StopProgram(Pipe): - def run(self, data, peer, handler): - pass + def stream_stdout(self, program, peer, handler): + # Poll process for new output until finished + for line in iter(program.stdout.readline, b""): + handler.pipe(line.decode(), "std_stream", peer) + program.wait() + exit_code = program.returncode if type(program.returncode) is int else -1 + handler.pipe({"exit_code" : exit_code}, "std_stream", peer) class Sensor(Pipe): @@ -225,59 +246,20 @@ class Sensor(Pipe): self.sensor_readout = SensorReadout(handler) -class WallabyControl(Route): - def __init__(self, output_unbuffer): - self.output_unbuffer = output_unbuffer - self.actions_with_params = {"run" : self.run_program} - self.actions_without_params = {"disconnect" : self.disconnect, - "reboot" : self.reboot, "shutdown" : self.shutdown, "stop" : self.stop} - self.currently_running_program = None - - def run(self, data, handler): - if type(data) is str: - if data in self.actions_without_params.keys(): - self.actions_without_params[data](handler) - elif type(data) is dict: - for action in data: - if action in self.actions_with_params.keys(): - _thread.start_new_thread(self.actions_with_params[action], (handler, data[action])) +class Shutdown(Pipe): + def run(self, data, peer, handler): + try: + subprocess.check_output(["shutdown", "now"]) + except subprocess.CalledProcessError: + Logging.warning("Shutdown could not be initiated.") - def run_program(self, handler, program): - command = [self.output_unbuffer, "-i0", "-o0", "-e0"] - command.append("%s%s/botball_user_program" % (handler.sync.folder, program)) - self.currently_running_program = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - - # Poll process for new output until finished - for line in iter(self.currently_running_program.stdout.readline, b""): - handler.sock.send(line.decode(), "std_stream") - - self.currently_running_program.wait() - handler.sock.send({"return_code" : self.currently_running_program.returncode}, "std_stream") - self.currently_running_program = None - - - def stop(self, handler): - if self.currently_running_program != None: - Logging.info("Killing currently running programm.") - self.currently_running_program.kill() - else: - Logging.info("No program started by fl0w.") - - - def reboot(self, handler): - self.disconnect(handler) - os.system("reboot") - exit(0) - - def shutdown(self, handler): - self.disconnect(handler) - os.system("shutdown -h 0") - - def disconnect(self, handler): - self.stop(handler) - handler.sock.close() - +class Reboot(Pipe): + def run(self, data, peer, handler): + try: + subprocess.check_output(["reboot"]) + except subprocess.CalledProcessError: + Logging.warning("Reboot could not be initiated.") class Subscribe(Route): @@ -348,7 +330,9 @@ try: ws.setup({"subscribe" : Subscribe(), "hostname" : Hostname(), "processes" : Processes(), "sensor" : Sensor(), "identify" : Identify(), "list_programs" : ListPrograms(), - "whoami" : WhoAmI(), "run_program" : RunProgram(config.output_unbuffer)}, + "whoami" : WhoAmI(), "run_program" : RunProgram(config.output_unbuffer), + "stop_programs" : StopPrograms(), "shutdown" : Shutdown(), + "reboot" : Reboot()}, debug=config.debug) ws.connect() ws.run_forever()