Initial commit
This commit is contained in:
commit
673b8c3686
11 changed files with 612 additions and 0 deletions
1
.gitattributes
vendored
Normal file
1
.gitattributes
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
*.wav filter=lfs diff=lfs merge=lfs -text
|
13
.gitignore
vendored
Normal file
13
.gitignore
vendored
Normal file
|
@ -0,0 +1,13 @@
|
|||
.DS_Store
|
||||
*.pyc
|
||||
__pycache__
|
||||
Testing
|
||||
*.cfg
|
||||
server.info
|
||||
server.cfg
|
||||
wallaby_testing*
|
||||
Binaries
|
||||
Source
|
||||
deleted*.db*
|
||||
.fl0w
|
||||
*.pickle
|
2
AUTHORS.MD
Normal file
2
AUTHORS.MD
Normal file
|
@ -0,0 +1,2 @@
|
|||
**Authors:**
|
||||
[Philip Trauner](http://github.com/PhilipTrauner) <[philip.trauner@arztpraxis.io](mailto:philip.trauner@arztpraxis.io)>
|
21
LICENSE
Normal file
21
LICENSE
Normal file
|
@ -0,0 +1,21 @@
|
|||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2017 Philip Trauner
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
12
Pipfile
Normal file
12
Pipfile
Normal file
|
@ -0,0 +1,12 @@
|
|||
[[source]]
|
||||
url = "https://pypi.python.org/simple"
|
||||
verify_ssl = true
|
||||
name = "pypi"
|
||||
|
||||
[packages]
|
||||
|
||||
"highway.py" = "==0.2.2"
|
||||
Meh = "==1.2.1"
|
||||
bottle = "==0.12.13"
|
||||
|
||||
[dev-packages]
|
81
Pipfile.lock
generated
Normal file
81
Pipfile.lock
generated
Normal file
|
@ -0,0 +1,81 @@
|
|||
{
|
||||
"_meta": {
|
||||
"hash": {
|
||||
"sha256": "e3369dc6de17fcf7c41860f7590e3dd5c8e72f3dfce1f55a02ae96f079b01aa1"
|
||||
},
|
||||
"host-environment-markers": {
|
||||
"implementation_name": "cpython",
|
||||
"implementation_version": "3.6.3",
|
||||
"os_name": "posix",
|
||||
"platform_machine": "x86_64",
|
||||
"platform_python_implementation": "CPython",
|
||||
"platform_release": "17.2.0",
|
||||
"platform_system": "Darwin",
|
||||
"platform_version": "Darwin Kernel Version 17.2.0: Fri Sep 29 18:27:05 PDT 2017; root:xnu-4570.20.62~3/RELEASE_X86_64",
|
||||
"python_full_version": "3.6.3",
|
||||
"python_version": "3.6",
|
||||
"sys_platform": "darwin"
|
||||
},
|
||||
"pipfile-spec": 6,
|
||||
"requires": {},
|
||||
"sources": [
|
||||
{
|
||||
"name": "pypi",
|
||||
"url": "https://pypi.python.org/simple",
|
||||
"verify_ssl": true
|
||||
}
|
||||
]
|
||||
},
|
||||
"default": {
|
||||
"bottle": {
|
||||
"hashes": [
|
||||
"sha256:39b751aee0b167be8dffb63ca81b735bbf1dd0905b3bc42761efedee8f123355"
|
||||
],
|
||||
"version": "==0.12.13"
|
||||
},
|
||||
"highway.py": {
|
||||
"hashes": [
|
||||
"sha256:0bf001ebaaed14590702dc90f18c6120957544ba0168a85a874023387125beea"
|
||||
],
|
||||
"version": "==0.2.2"
|
||||
},
|
||||
"meh": {
|
||||
"hashes": [
|
||||
"sha256:c1dd9d92eda20a98f0fd91fe6889390ba5cf04107be8789c0368e178f11e8f43"
|
||||
],
|
||||
"version": "==1.2.1"
|
||||
},
|
||||
"ujson": {
|
||||
"hashes": [
|
||||
"sha256:f66073e5506e91d204ab0c614a148d5aa938bdbf104751be66f8ad7a222f5f86"
|
||||
],
|
||||
"version": "==1.35"
|
||||
},
|
||||
"websockets": {
|
||||
"hashes": [
|
||||
"sha256:e9c1cdbb591432c59d0b5ca64fd30b6d517024767f152fc169563b26e7bcc9da",
|
||||
"sha256:85ae1e4b36aa2e90de56d211d2de36d7c093d00277a9afdd9b4f81e69c0214ab",
|
||||
"sha256:2aa6d52264cecb08d39741e8fda49f5ac4872aef02617230c84d02e861f3cc5a",
|
||||
"sha256:8a29100079f5b91a72bcd25d35a7354db985d3babae42d00b9d629f9a0aaa8ac",
|
||||
"sha256:de743ef26b002efceea7d7756e99e5d38bf5d4f27563b8d27df2a9a5cc57340a",
|
||||
"sha256:aa42ecef3aed807e23218c264b1e82004cdd131a6698a10b57fc3d8af8f651fc",
|
||||
"sha256:c4c5b5ce2d66cb0cf193c14bc9726adca095febef0f7b2c04e5e3fa3487a97a4",
|
||||
"sha256:e1e568136ad5cb6768504be36d470a136b072acbf3ea882303aee6361be01941",
|
||||
"sha256:e8992f1db371f2a1c5af59e032d9dc7c1aa92f16241efcda695b7d955b4de0c2",
|
||||
"sha256:3d38f76f71654268e5533b45df125ff208fee242a102d4b5ca958da5cf5fb345",
|
||||
"sha256:4128212ab6f91afda03a0c697add261bdf6946b47928db83f07298ea2cd8d937",
|
||||
"sha256:b19e7ede1ba80ee9de6f5b8ccd31beee25402e68bef7c13eeb0b8bc46bc4b7b7",
|
||||
"sha256:7347af28fcc70eb45be409760c2a428f8199e7f73c04a621916c3c219ed7ad27",
|
||||
"sha256:2f5b7f3920f29609086fb0b63552bb1f86a04b8cbdcc0dbf3775cc90d489dfc8",
|
||||
"sha256:4a932c17cb11c361c286c04842dc2385cc7157019bbba8b64808acbc89a95584",
|
||||
"sha256:5ddc5fc121eb76771e990f071071d9530e27d20e8cfb804d9f5823de055837af",
|
||||
"sha256:a7e7585c8e3c0f9277ad7d6ee6ccddc69649cd216255d5e255d68f90482aeefa",
|
||||
"sha256:3fcc7dfb365e81ff8206f950c86d1e73accdf3be2f9110c0cb73be32d2e7a9a5",
|
||||
"sha256:09dfec40e9b73e8808c39ecdbc1733e33915a2b26b90c54566afc0af546a9ec3",
|
||||
"sha256:43e5b9f51dd0000a4c6f646e2ade0c886bd14a784ffac08b9e079bd17a63bcc5"
|
||||
],
|
||||
"version": "==3.4"
|
||||
}
|
||||
},
|
||||
"develop": {}
|
||||
}
|
11
README.md
Normal file
11
README.md
Normal file
|
@ -0,0 +1,11 @@
|
|||
<img src="https://cloud.githubusercontent.com/assets/9287847/23527929/2e61683c-ff98-11e6-8f3a-2ae440d63663.png" alt="fl0w logo" height="100">
|
||||
|
||||
# c0re
|
||||
|
||||
### Installation
|
||||
|
||||
```bash
|
||||
git clone https://github.com/robot0nfire/c0re.git --recursive --depth=1
|
||||
cd c0re
|
||||
pip3 install -r requirements.txt
|
||||
```
|
236
app.py
Normal file
236
app.py
Normal file
|
@ -0,0 +1,236 @@
|
|||
from os.path import isfile, isdir
|
||||
from threading import Lock
|
||||
from re import compile as re_compile
|
||||
from subprocess import check_output, CalledProcessError, Popen, PIPE, STDOUT
|
||||
from _thread import start_new_thread
|
||||
from time import sleep
|
||||
from asyncio import get_event_loop, ensure_future
|
||||
|
||||
# bottle (web-server)
|
||||
from bottle import route, static_file
|
||||
from bottle import run as run_bottle
|
||||
|
||||
# highway
|
||||
from highway import Server
|
||||
from highway import Handler as Handler_
|
||||
|
||||
# highway utilities
|
||||
from highway import log as logging
|
||||
from highway import ConnectionClosed
|
||||
|
||||
from meh import Config, Option, ExceptionInConfigError
|
||||
|
||||
from sensor_readout import SensorReadout
|
||||
from sensor_readout import REVERSE_NAMED_MODES
|
||||
from sensor_readout import valid_port as valid_sensor_port
|
||||
|
||||
from utils import play_sound, PlaybackFailure, valid_port
|
||||
|
||||
CONFIG_PATH = "fl0w.cfg"
|
||||
|
||||
INDEX_PATH = "dashb0ard"
|
||||
STATIC_PATH = INDEX_PATH + "/static"
|
||||
|
||||
config = Config()
|
||||
config += Option("address", "127.0.0.1")
|
||||
config += Option("ws_port", 3077, validator=valid_port)
|
||||
config += Option("http_port", 8080, validator=valid_port)
|
||||
config += Option("highway_debug", False)
|
||||
config += Option("bottle_debug", False)
|
||||
config += Option("fl0w_debug", False)
|
||||
config += Option("identify_sound", "resources/identify.wav", validator=isfile)
|
||||
config += Option("output_unbufferer", "stdbuf -o 0")
|
||||
|
||||
try:
|
||||
config = config.load(CONFIG_PATH)
|
||||
except (IOError, ExceptionInConfigError):
|
||||
config.dump(CONFIG_PATH)
|
||||
config = config.load(CONFIG_PATH)
|
||||
|
||||
|
||||
class Handler(Handler_):
|
||||
def on_close(self, code, reason):
|
||||
logging.info("Unsubscribing '%s:%i' from sensor readouts." % (*self.remote_address, ))
|
||||
sensor_readout.handler_disconnected(self)
|
||||
|
||||
|
||||
server = Server(Handler, debug=config.highway_debug)
|
||||
sensor_readout = SensorReadout()
|
||||
|
||||
|
||||
loop = get_event_loop()
|
||||
ensure_future(sensor_readout.run())
|
||||
|
||||
"""
|
||||
{"analog" : {1, 2, 3}, "digital" : {1, 2, 3}}
|
||||
"""
|
||||
async def process_sensor_request(data, handler, route, action):
|
||||
successful = True
|
||||
for mode in REVERSE_NAMED_MODES:
|
||||
if mode in data:
|
||||
for port in data[mode]:
|
||||
if type(port) is int:
|
||||
if valid_sensor_port(port,
|
||||
REVERSE_NAMED_MODES[mode]):
|
||||
|
||||
action(port, REVERSE_NAMED_MODES[mode],
|
||||
handler)
|
||||
else:
|
||||
successful = False
|
||||
break
|
||||
else:
|
||||
successful = False
|
||||
break
|
||||
await handler.send(successful, route)
|
||||
|
||||
|
||||
@server.route("identify")
|
||||
async def identify(data, handler):
|
||||
successful = False
|
||||
try:
|
||||
play_sound(config.identify_sound)
|
||||
successful = True
|
||||
except PlaybackFailure:
|
||||
pass
|
||||
await handler.send(successful, "identify")
|
||||
|
||||
|
||||
@server.route("sensor_poll_rate")
|
||||
async def sensor_poll_rate(data, handler):
|
||||
successful = False
|
||||
if type(data) in (int, float) and data >= 0.1:
|
||||
sensor_readout.poll_rate = data
|
||||
successful = True
|
||||
await handler.send(successful, "sensor_poll_rate")
|
||||
|
||||
|
||||
@server.route("sensor_unsubscribe_all")
|
||||
async def sensor_unsubscribe_all(data, handler):
|
||||
sensor_readout.unsubscribe_all(handler)
|
||||
|
||||
|
||||
|
||||
@server.route("sensor_subscribe")
|
||||
async def sensor_subscribe(data, handler):
|
||||
await process_sensor_request(data, handler, "sensor_subscribe",
|
||||
sensor_readout.subscribe)
|
||||
|
||||
|
||||
@server.route("sensor_unsubscribe")
|
||||
async def sensor_unsubscribe(data, handler):
|
||||
await process_sensor_request(data, handler, "sensor_unsubscribe",
|
||||
sensor_readout.unsubscribe)
|
||||
|
||||
|
||||
@server.route("shutdown")
|
||||
async def shutdown(data, handler):
|
||||
successful = False
|
||||
try:
|
||||
check_output(["shutdown", "now"])
|
||||
successful = True
|
||||
except CalledProcessError:
|
||||
pass
|
||||
await handler.send(successful, "shutdown")
|
||||
|
||||
|
||||
@server.route("reboot")
|
||||
async def reboot(data, handler):
|
||||
successful = False
|
||||
try:
|
||||
check_output(["reboot"])
|
||||
successful = True
|
||||
except CalledProcessError:
|
||||
pass
|
||||
await handler.send(successful, "reboot")
|
||||
|
||||
|
||||
@server.route("upgrade")
|
||||
async def upgrade(data, handler):
|
||||
await stream_program_output("apt-get update && apt-get upgrade",
|
||||
"upgrade", handler)
|
||||
|
||||
|
||||
@server.route("kill_botball")
|
||||
async def kill_botball(data, handler):
|
||||
await stream_program_output("killall botball_user_program",
|
||||
"kill_botball", handler)
|
||||
|
||||
|
||||
@server.route("reset_coproc")
|
||||
async def reset_coproc(data, handler):
|
||||
await stream_program_output("wallaby_reset_coproc",
|
||||
"reset_coproc", handler)
|
||||
|
||||
|
||||
@server.route("reset_coproc")
|
||||
async def restart_x11(data, handler):
|
||||
await stream_program_output("systemctl restart x11",
|
||||
"restart_x11", handler)
|
||||
|
||||
|
||||
@server.route("restart_harrogate")
|
||||
async def restart_harrogate(data, handler):
|
||||
await stream_program_output("systemctl restart harrogate",
|
||||
"restart_harrogate", handler)
|
||||
|
||||
|
||||
@server.route("restart_networking")
|
||||
async def restart_networking(data, handler):
|
||||
await stream_program_output("systemctl restart networking",
|
||||
"restart_networking", handler)
|
||||
|
||||
|
||||
async def stream_program_output(command, route, handler):
|
||||
await handler.send("> %s\n" % command, route + "_output")
|
||||
command = "%s %s" % (config.output_unbufferer, command)
|
||||
program = Popen(command, stdout=PIPE, stderr=STDOUT, shell=True)
|
||||
|
||||
# Stream output async
|
||||
loop = get_event_loop()
|
||||
ensure_future(_stream_program_output(program, route, handler))
|
||||
|
||||
|
||||
async def _stream_program_output(program, route, handler):
|
||||
has_disconnected = False
|
||||
# Poll process for new output until finished
|
||||
for line in iter(program.stdout.readline, b""):
|
||||
line = line.decode()
|
||||
try:
|
||||
await handler.send(line, route + "_output")
|
||||
except ConnectionClosed:
|
||||
has_disconnected = True
|
||||
logging.warning(line.rstrip("\n"))
|
||||
program.wait()
|
||||
if not has_disconnected:
|
||||
exit_code = program.returncode if type(program.returncode) is int else -1
|
||||
await handler.send(exit_code, route + "_exit")
|
||||
|
||||
|
||||
|
||||
@route("/")
|
||||
def index():
|
||||
return static_file("index.html", root="dashb0ard")
|
||||
|
||||
@route("/static/<filepath:path>")
|
||||
def static(filepath):
|
||||
return static_file(filepath, root="dashb0ard/static")
|
||||
|
||||
|
||||
if isdir(STATIC_PATH) and isdir(INDEX_PATH):
|
||||
start_new_thread(run_bottle, (), {"host" : config.address,
|
||||
"port" : config.http_port, "quiet" : not config.bottle_debug})
|
||||
|
||||
logging.header("Serving dashb0ard on 'http://%s:%s'" % (config.address,
|
||||
config.http_port))
|
||||
|
||||
else:
|
||||
logging.error("dashb0ard not found.")
|
||||
|
||||
|
||||
try:
|
||||
logging.header("Starting fl0w on 'ws://%s:%s'" % (config.address,
|
||||
config.ws_port))
|
||||
server.start(config.address, config.ws_port)
|
||||
except KeyboardInterrupt:
|
||||
# server.stop()
|
||||
pass
|
BIN
resources/identify.wav
(Stored with Git LFS)
Normal file
BIN
resources/identify.wav
(Stored with Git LFS)
Normal file
Binary file not shown.
158
sensor_readout.py
Normal file
158
sensor_readout.py
Normal file
|
@ -0,0 +1,158 @@
|
|||
from random import randint
|
||||
from threading import Lock
|
||||
from _thread import start_new_thread
|
||||
from os.path import exists
|
||||
from asyncio import sleep
|
||||
|
||||
|
||||
from utils import is_wallaby
|
||||
|
||||
from highway import log as logging
|
||||
|
||||
|
||||
|
||||
ANALOG = 1
|
||||
DIGITAL = 2
|
||||
NAMED_MODES = {ANALOG : "analog", DIGITAL : "digital"}
|
||||
REVERSE_NAMED_MODES = {v : k for k, v in NAMED_MODES.items()}
|
||||
MODES = tuple(NAMED_MODES.keys())
|
||||
|
||||
CONFIG_PATH = "fl0w.cfg"
|
||||
IS_WALLABY = is_wallaby()
|
||||
LIB_WALLABY = "/usr/lib/libwallaby.so"
|
||||
|
||||
READOUT_TEMPLATE = {
|
||||
ANALOG : [],
|
||||
DIGITAL : []
|
||||
}
|
||||
|
||||
def valid_port(port, mode):
|
||||
if mode == ANALOG:
|
||||
return port >= 0 and port <= 5
|
||||
elif mode == DIGITAL:
|
||||
return port >= 0 and port <= 9
|
||||
return False
|
||||
|
||||
class SensorReadout:
|
||||
def __init__(self, poll_rate=0.2):
|
||||
self.poll_rate = poll_rate
|
||||
|
||||
self.handler_lock = Lock()
|
||||
self.handlers = {}
|
||||
|
||||
self.running = True
|
||||
|
||||
self.readout_required = READOUT_TEMPLATE.copy()
|
||||
|
||||
# Wallaby library avaliable?
|
||||
if not exists(LIB_WALLABY):
|
||||
logging.warning("Wallaby library unavaliable.")
|
||||
self.get_sensor_value = self.__get_random_value
|
||||
else:
|
||||
self.wallaby_library = cdll.LoadLibrary(LIB_WALLABY)
|
||||
self.get_sensor_value = self.__get_sensor_value
|
||||
|
||||
|
||||
async def run(self):
|
||||
while self.running:
|
||||
current_values = {
|
||||
ANALOG : {},
|
||||
DIGITAL : {}
|
||||
}
|
||||
|
||||
for mode in MODES:
|
||||
for port in self.readout_required[mode]:
|
||||
current_values[mode][port] = self.get_sensor_value(port, mode)
|
||||
|
||||
self.handler_lock.acquire()
|
||||
for handler in self.handlers:
|
||||
readouts = 0
|
||||
|
||||
response = {ANALOG : {}, DIGITAL : {}}
|
||||
|
||||
for mode in MODES:
|
||||
for port in self.handlers[handler][mode]:
|
||||
response[mode][port] = current_values[mode][port]
|
||||
readouts += 1
|
||||
if readouts != 0:
|
||||
await handler.send(response, "sensor_stream")
|
||||
self.handler_lock.release()
|
||||
|
||||
await sleep(self.poll_rate)
|
||||
|
||||
|
||||
def stop(self):
|
||||
self.running = False
|
||||
|
||||
|
||||
def subscribe(self, port, mode, handler):
|
||||
if port not in self.readout_required[mode]:
|
||||
self.readout_required[mode].append(port)
|
||||
if not handler in self.handlers:
|
||||
self.handler_lock.acquire()
|
||||
self.handlers[handler] = READOUT_TEMPLATE.copy()
|
||||
self.handler_lock.release()
|
||||
|
||||
self.handler_lock.acquire()
|
||||
self.handlers[handler][mode].append(port)
|
||||
self.handler_lock.release()
|
||||
|
||||
|
||||
def unsubscribe(self, port, mode, handler):
|
||||
if handler in self.handlers:
|
||||
if port in self.handlers[handler][mode]:
|
||||
self.handler_lock.acquire()
|
||||
|
||||
del self.handlers[handler][mode][ \
|
||||
self.handlers[handler][mode].index(port)]
|
||||
|
||||
self.handler_lock.release()
|
||||
self.determine_required_readouts()
|
||||
|
||||
|
||||
def determine_required_readouts(self):
|
||||
readout_required = READOUT_TEMPLATE.copy()
|
||||
|
||||
self.handler_lock.acquire()
|
||||
for handler in self.handlers:
|
||||
for mode in MODES:
|
||||
for port in self.handlers[handler][mode]:
|
||||
if not port in readout_required[mode]:
|
||||
readout_required[mode].append(port)
|
||||
self.handler_lock.release()
|
||||
self.readout_required = readout_required
|
||||
|
||||
|
||||
async def unsubscribe_all(self, handler):
|
||||
successful = False
|
||||
if handler in self.handlers:
|
||||
self.handler_lock.acquire()
|
||||
del self.handlers[handler]
|
||||
self.handler_lock.release()
|
||||
successful = True
|
||||
self.determine_required_readouts()
|
||||
await handler.send(successful, "sensor_unsubscribe")
|
||||
|
||||
|
||||
def handler_disconnected(self, handler):
|
||||
if handler in self.handlers:
|
||||
self.handler_lock.acquire()
|
||||
del self.handlers[handler]
|
||||
self.handler_lock.release()
|
||||
self.determine_required_readouts()
|
||||
|
||||
|
||||
def __get_sensor_value(self, port, mode):
|
||||
if mode == SensorReadout.ANALOG:
|
||||
return self.wallaby_library.analog(port)
|
||||
elif mode == SensorReadout.DIGITAL:
|
||||
return self.wallaby_library.digital(port)
|
||||
|
||||
|
||||
def __get_random_value(self, port, mode):
|
||||
if mode == ANALOG:
|
||||
return randint(0, 4095)
|
||||
elif mode == DIGITAL:
|
||||
return randint(0, 1)
|
||||
|
||||
|
74
utils.py
Normal file
74
utils.py
Normal file
|
@ -0,0 +1,74 @@
|
|||
from traceback import print_exception
|
||||
from sys import exc_info, byteorder
|
||||
import platform
|
||||
import os
|
||||
import struct
|
||||
import subprocess
|
||||
import urllib
|
||||
|
||||
HIGHEST_PORT = 65535
|
||||
|
||||
class HostnameNotChangedError(PermissionError):
|
||||
def __init__(self):
|
||||
super(PermissionError, self).__init__("hostname could not be changed")
|
||||
|
||||
class NotSupportedOnPlatform(OSError):
|
||||
def __init__(self):
|
||||
super(OSError, self).__init__("feature not avaliable on OS")
|
||||
|
||||
class PlaybackFailure(OSError):
|
||||
def __init__(self):
|
||||
super(OSError, self).__init__("audio playback failed")
|
||||
|
||||
|
||||
def is_wallaby():
|
||||
return "3.18.21-custom" in platform.uname().release
|
||||
|
||||
|
||||
def is_linux():
|
||||
return platform.uname().system == "Linux"
|
||||
|
||||
|
||||
def is_darwin():
|
||||
return platform.uname().system == "Darwin"
|
||||
|
||||
|
||||
def is_windows():
|
||||
return platform.uname().system == "Windows"
|
||||
|
||||
|
||||
def set_hostname(hostname):
|
||||
if is_linux():
|
||||
if os.geteuid() == 0:
|
||||
open("/etc/hostname", "w").write(hostname)
|
||||
else:
|
||||
raise HostnameNotChangedError()
|
||||
elif is_darwin():
|
||||
if os.geteuid() == 0:
|
||||
subprocess.check_call(["scutil", "--set", "HostName", hostname])
|
||||
else:
|
||||
raise HostnameNotChangedError()
|
||||
else:
|
||||
raise HostnameNotChangedError()
|
||||
|
||||
|
||||
def get_hostname():
|
||||
return platform.uname().node
|
||||
|
||||
|
||||
def get_ip_from_url(url):
|
||||
return urllib.parse.urlsplit(url).netloc.split(':')[0]
|
||||
|
||||
|
||||
def play_sound(path):
|
||||
if is_linux() or is_darwin():
|
||||
try:
|
||||
subprocess.check_call(["aplay" if is_linux() else "afplay", path])
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise PlaybackFailure()
|
||||
else:
|
||||
raise NotSupportedOnPlatform()
|
||||
|
||||
|
||||
def valid_port(port):
|
||||
return type(port) is int and port <= HIGHEST_PORT
|
Reference in a new issue