Added sensor locking, stop_programs, shutdown, reboot, made run_program async
Sensor locking: This fixes the iteration errors that occured during polling in case of subscrive/unsubscribe events. Pipes for shutdown, reboot. Pipe to stop all botball programs. run_program blocked all further communication during program runtime, fixed now.
This commit is contained in:
parent
e9f34ae3b0
commit
f9ccbd9ae2
1 changed files with 44 additions and 60 deletions
|
@ -10,6 +10,7 @@ import sys
|
||||||
import subprocess
|
import subprocess
|
||||||
from random import randint
|
from random import randint
|
||||||
from _thread import start_new_thread
|
from _thread import start_new_thread
|
||||||
|
import threading
|
||||||
|
|
||||||
CHANNEL = 2
|
CHANNEL = 2
|
||||||
IS_WALLABY = Utils.is_wallaby()
|
IS_WALLABY = Utils.is_wallaby()
|
||||||
|
@ -42,6 +43,7 @@ class SensorReadout:
|
||||||
def __init__(self, handler, poll_rate=0.2):
|
def __init__(self, handler, poll_rate=0.2):
|
||||||
self.poll_rate = poll_rate
|
self.poll_rate = poll_rate
|
||||||
self.handler = handler
|
self.handler = handler
|
||||||
|
self.peer_lock = threading.Lock()
|
||||||
self.peers = {}
|
self.peers = {}
|
||||||
self.readout_required = {SensorReadout.ANALOG : [],
|
self.readout_required = {SensorReadout.ANALOG : [],
|
||||||
SensorReadout.DIGITAL : []}
|
SensorReadout.DIGITAL : []}
|
||||||
|
@ -69,32 +71,40 @@ class SensorReadout:
|
||||||
if port not in self.readout_required[mode]:
|
if port not in self.readout_required[mode]:
|
||||||
self.readout_required[mode].append(port)
|
self.readout_required[mode].append(port)
|
||||||
if not peer in self.peers:
|
if not peer in self.peers:
|
||||||
|
self.peer_lock.acquire()
|
||||||
self.peers[peer] = {SensorReadout.ANALOG : [],
|
self.peers[peer] = {SensorReadout.ANALOG : [],
|
||||||
SensorReadout.DIGITAL : []}
|
SensorReadout.DIGITAL : []}
|
||||||
|
self.peer_lock.release()
|
||||||
self.peers[peer][mode].append(port)
|
self.peers[peer][mode].append(port)
|
||||||
|
|
||||||
|
|
||||||
def unsubscribe(self, port, mode, peer):
|
def unsubscribe(self, port, mode, peer):
|
||||||
if peer in self.peers:
|
if peer in self.peers:
|
||||||
if port in self.peers[peer][mode]:
|
if port in self.peers[peer][mode]:
|
||||||
|
self.peer_lock.acquire()
|
||||||
del self.peers[peer][mode][self.peers[peer][mode].index(port)]
|
del self.peers[peer][mode][self.peers[peer][mode].index(port)]
|
||||||
|
self.peer_lock.release()
|
||||||
self.determine_required_readouts()
|
self.determine_required_readouts()
|
||||||
|
|
||||||
|
|
||||||
def determine_required_readouts(self):
|
def determine_required_readouts(self):
|
||||||
readout_required = {SensorReadout.ANALOG : [],
|
readout_required = {SensorReadout.ANALOG : [],
|
||||||
SensorReadout.DIGITAL : []}
|
SensorReadout.DIGITAL : []}
|
||||||
|
self.peer_lock.acquire()
|
||||||
for peer in self.peers:
|
for peer in self.peers:
|
||||||
for mode in (SensorReadout.ANALOG, SensorReadout.DIGITAL):
|
for mode in (SensorReadout.ANALOG, SensorReadout.DIGITAL):
|
||||||
for port in self.peers[peer][mode]:
|
for port in self.peers[peer][mode]:
|
||||||
if not port in readout_required[mode]:
|
if not port in readout_required[mode]:
|
||||||
readout_required[mode].append(port)
|
readout_required[mode].append(port)
|
||||||
|
self.peer_lock.release()
|
||||||
self.readout_required = readout_required
|
self.readout_required = readout_required
|
||||||
|
|
||||||
|
|
||||||
def unsubscribe_all(self, peer):
|
def unsubscribe_all(self, peer):
|
||||||
if peer in self.peers:
|
if peer in self.peers:
|
||||||
|
self.peer_lock.acquire()
|
||||||
del self.peers[peer]
|
del self.peers[peer]
|
||||||
|
self.peer_lock.release()
|
||||||
self.determine_required_readouts()
|
self.determine_required_readouts()
|
||||||
|
|
||||||
|
|
||||||
|
@ -120,6 +130,7 @@ class SensorReadout:
|
||||||
for mode in SensorReadout.MODES:
|
for mode in SensorReadout.MODES:
|
||||||
for port in self.readout_required[mode]:
|
for port in self.readout_required[mode]:
|
||||||
current_values[mode][port] = self.get_sensor_value(port, mode)
|
current_values[mode][port] = self.get_sensor_value(port, mode)
|
||||||
|
self.peer_lock.acquire()
|
||||||
for peer in self.peers:
|
for peer in self.peers:
|
||||||
readouts = 0
|
readouts = 0
|
||||||
response = {"analog" : {}, "digital" : {}}
|
response = {"analog" : {}, "digital" : {}}
|
||||||
|
@ -129,6 +140,7 @@ class SensorReadout:
|
||||||
readouts += 1
|
readouts += 1
|
||||||
if readouts != 0:
|
if readouts != 0:
|
||||||
self.handler.pipe(response, "sensor", peer)
|
self.handler.pipe(response, "sensor", peer)
|
||||||
|
self.peer_lock.release()
|
||||||
time.sleep(self.poll_rate)
|
time.sleep(self.poll_rate)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -160,6 +172,14 @@ class ListPrograms(Pipe):
|
||||||
handler.pipe(programs, handler.reverse_routes[self], peer)
|
handler.pipe(programs, handler.reverse_routes[self], peer)
|
||||||
|
|
||||||
|
|
||||||
|
class StopPrograms(Pipe):
|
||||||
|
def run(self, data, peer, handler):
|
||||||
|
if handler.debug:
|
||||||
|
Logging.info("Stopping all botball programs.")
|
||||||
|
output = subprocess.call(["killall", "botball_user_program"],
|
||||||
|
stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
|
||||||
|
|
||||||
|
|
||||||
class RunProgram(Pipe):
|
class RunProgram(Pipe):
|
||||||
def __init__(self, output_unbuffer):
|
def __init__(self, output_unbuffer):
|
||||||
self.command = [output_unbuffer, "-i0", "-o0", "-e0"]
|
self.command = [output_unbuffer, "-i0", "-o0", "-e0"]
|
||||||
|
@ -173,17 +193,18 @@ class RunProgram(Pipe):
|
||||||
if os.path.isfile(path):
|
if os.path.isfile(path):
|
||||||
program = subprocess.Popen(self.command + [path],
|
program = subprocess.Popen(self.command + [path],
|
||||||
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||||
# Poll process for new output until finished
|
start_new_thread(self.stream_stdout, (program, peer, handler))
|
||||||
for line in iter(program.stdout.readline, b""):
|
|
||||||
handler.pipe(line.decode(), "std_stream", peer)
|
|
||||||
else:
|
else:
|
||||||
if handler.debug:
|
if handler.debug:
|
||||||
Logging.warning("Program '%s' not found." % data)
|
Logging.warning("Program '%s' not found." % data[:-1])
|
||||||
|
|
||||||
|
def stream_stdout(self, program, peer, handler):
|
||||||
class StopProgram(Pipe):
|
# Poll process for new output until finished
|
||||||
def run(self, data, peer, handler):
|
for line in iter(program.stdout.readline, b""):
|
||||||
pass
|
handler.pipe(line.decode(), "std_stream", peer)
|
||||||
|
program.wait()
|
||||||
|
exit_code = program.returncode if type(program.returncode) is int else -1
|
||||||
|
handler.pipe({"exit_code" : exit_code}, "std_stream", peer)
|
||||||
|
|
||||||
|
|
||||||
class Sensor(Pipe):
|
class Sensor(Pipe):
|
||||||
|
@ -225,59 +246,20 @@ class Sensor(Pipe):
|
||||||
self.sensor_readout = SensorReadout(handler)
|
self.sensor_readout = SensorReadout(handler)
|
||||||
|
|
||||||
|
|
||||||
class WallabyControl(Route):
|
class Shutdown(Pipe):
|
||||||
def __init__(self, output_unbuffer):
|
def run(self, data, peer, handler):
|
||||||
self.output_unbuffer = output_unbuffer
|
try:
|
||||||
self.actions_with_params = {"run" : self.run_program}
|
subprocess.check_output(["shutdown", "now"])
|
||||||
self.actions_without_params = {"disconnect" : self.disconnect,
|
except subprocess.CalledProcessError:
|
||||||
"reboot" : self.reboot, "shutdown" : self.shutdown, "stop" : self.stop}
|
Logging.warning("Shutdown could not be initiated.")
|
||||||
self.currently_running_program = None
|
|
||||||
|
|
||||||
def run(self, data, handler):
|
|
||||||
if type(data) is str:
|
|
||||||
if data in self.actions_without_params.keys():
|
|
||||||
self.actions_without_params[data](handler)
|
|
||||||
elif type(data) is dict:
|
|
||||||
for action in data:
|
|
||||||
if action in self.actions_with_params.keys():
|
|
||||||
_thread.start_new_thread(self.actions_with_params[action], (handler, data[action]))
|
|
||||||
|
|
||||||
|
|
||||||
def run_program(self, handler, program):
|
class Reboot(Pipe):
|
||||||
command = [self.output_unbuffer, "-i0", "-o0", "-e0"]
|
def run(self, data, peer, handler):
|
||||||
command.append("%s%s/botball_user_program" % (handler.sync.folder, program))
|
try:
|
||||||
self.currently_running_program = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
subprocess.check_output(["reboot"])
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
# Poll process for new output until finished
|
Logging.warning("Reboot could not be initiated.")
|
||||||
for line in iter(self.currently_running_program.stdout.readline, b""):
|
|
||||||
handler.sock.send(line.decode(), "std_stream")
|
|
||||||
|
|
||||||
self.currently_running_program.wait()
|
|
||||||
handler.sock.send({"return_code" : self.currently_running_program.returncode}, "std_stream")
|
|
||||||
self.currently_running_program = None
|
|
||||||
|
|
||||||
|
|
||||||
def stop(self, handler):
|
|
||||||
if self.currently_running_program != None:
|
|
||||||
Logging.info("Killing currently running programm.")
|
|
||||||
self.currently_running_program.kill()
|
|
||||||
else:
|
|
||||||
Logging.info("No program started by fl0w.")
|
|
||||||
|
|
||||||
|
|
||||||
def reboot(self, handler):
|
|
||||||
self.disconnect(handler)
|
|
||||||
os.system("reboot")
|
|
||||||
exit(0)
|
|
||||||
|
|
||||||
def shutdown(self, handler):
|
|
||||||
self.disconnect(handler)
|
|
||||||
os.system("shutdown -h 0")
|
|
||||||
|
|
||||||
def disconnect(self, handler):
|
|
||||||
self.stop(handler)
|
|
||||||
handler.sock.close()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Subscribe(Route):
|
class Subscribe(Route):
|
||||||
|
@ -348,7 +330,9 @@ try:
|
||||||
ws.setup({"subscribe" : Subscribe(), "hostname" : Hostname(),
|
ws.setup({"subscribe" : Subscribe(), "hostname" : Hostname(),
|
||||||
"processes" : Processes(), "sensor" : Sensor(),
|
"processes" : Processes(), "sensor" : Sensor(),
|
||||||
"identify" : Identify(), "list_programs" : ListPrograms(),
|
"identify" : Identify(), "list_programs" : ListPrograms(),
|
||||||
"whoami" : WhoAmI(), "run_program" : RunProgram(config.output_unbuffer)},
|
"whoami" : WhoAmI(), "run_program" : RunProgram(config.output_unbuffer),
|
||||||
|
"stop_programs" : StopPrograms(), "shutdown" : Shutdown(),
|
||||||
|
"reboot" : Reboot()},
|
||||||
debug=config.debug)
|
debug=config.debug)
|
||||||
ws.connect()
|
ws.connect()
|
||||||
ws.run_forever()
|
ws.run_forever()
|
||||||
|
|
Reference in a new issue