4th version (actually working :D)

This commit is contained in:
PhilipTrauner 2016-03-21 00:17:33 +01:00
parent 731e5f4e1e
commit fda8eb2d41

View file

@ -2,13 +2,68 @@ import os
import hashlib import hashlib
import time import time
import base64 import base64
import random
import Routing import Routing
import sqlite3
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
class File:
def __init__(self, relpath, hash, mtime):
self.relpath = relpath
self.hash = hash
self.mtime = mtime
@property
def db_repr(self):
return [self.relpath, self.hash, self.mtime]
def __eq__(self, other):
if type(other) is File:
return self.__dict__ == other.__dict__
return False
def __repr__(self):
return "%s: %s, %d" % (self.relpath, self.hash, self.mtime)
class DeletedStorage:
def __init__(self, db_path="deleted.db"):
self.db = sqlite3.connect(db_path)
self.cursor = self.db.cursor()
self.files = []
self.cursor.execute("create table if not exists files (file_id integer primary key autoincrement, relpath text, hash text, mtime integer)")
for row in self.cursor.execute("select * from files"):
self.files.append(File(row[1], row[2], row[3]))
self.cursor.execute("delete from files")
def add(self, file):
if type(file) is File:
if file not in self.files:
self.files.append(file)
else:
raise TypeError("only objects of type File can be added")
def remove(self, file):
if type(file) is File:
if file in self.files:
del self.files[self.files.index(file)]
else:
raise TypeError("only objects of type File can be removed")
def close(self):
for file in self.files:
self.cursor.execute("insert into files (relpath, hash, mtime) values (?, ?, ?)", file.db_repr)
self.db.commit()
self.db.close()
def relative_recursive_ls(path, relative_to, exclude=[]): def relative_recursive_ls(path, relative_to, exclude=[]):
if path[-1] != "/": if path[-1] != "/":
path += "/" path += "/"
@ -25,6 +80,9 @@ def relative_recursive_ls(path, relative_to, exclude=[]):
def md5(path): def md5(path):
return hashlib.md5(open(path, "rb").read()).hexdigest() return hashlib.md5(open(path, "rb").read()).hexdigest()
def get_mtime(path):
return os.path.getmtime(path)
def get_name_from_path(path): def get_name_from_path(path):
name = os.path.basename(path) name = os.path.basename(path)
@ -76,13 +134,9 @@ class SyncClient(Routing.ClientRoute):
def start(self): def start(self):
out = {"list" : {}} out = {"list" : {}}
for file in self.files: for file in self.files:
out["list"].update({file : {"mtime" : os.path.getmtime(self.folder + file), out["list"].update({file : {"mtime" : get_mtime(self.folder + file),
"md5" : md5(self.folder + file)}}) "md5" : md5(self.folder + file)}})
self.sock.send(out, self.route) self.sock.send(out, self.route)
self.started = True
def stop(self):
self.started = False
def suppress_fs_event(self, file): def suppress_fs_event(self, file):
@ -113,10 +167,12 @@ class SyncClient(Routing.ClientRoute):
self.unsuppress_fs_event(file) self.unsuppress_fs_event(file)
elif "del" in data: elif "del" in data:
for file in data["del"]: for file in data["del"]:
if not file in self.suppressed_fs_events: self.suppress_fs_event(file)
try:
os.remove(self.folder + file) os.remove(self.folder + file)
else: except FileNotFoundError:
self.unsuppress_fs_event(file) self.unsuppress_fs_event(file)
Logging.warning("Possible server misbehaviour")
elif "req" in data: elif "req" in data:
for file in data["req"]: for file in data["req"]:
if file in self.files: if file in self.files:
@ -128,9 +184,11 @@ class SyncClient(Routing.ClientRoute):
relpath = os.path.relpath(event.src_path, self.folder) relpath = os.path.relpath(event.src_path, self.folder)
if relpath not in self.files: if relpath not in self.files:
self.files.append(relpath) self.files.append(relpath)
if not relpath in self.suppressed_fs_events:
self.sock.send({"add" : {relpath : {"content" : base64_str_decode(event.src_path), self.sock.send({"add" : {relpath : {"content" : base64_str_decode(event.src_path),
"mtime" : os.path.getmtime(event.src_path)}}}, self.route) "mtime" : os.path.getmtime(event.src_path)}}}, self.route)
self.suppress_fs_event(relpath) else:
self.unsuppress_fs_event(relpath)
@ -142,83 +200,85 @@ class SyncClient(Routing.ClientRoute):
relpath = os.path.relpath(event.src_path, self.folder) relpath = os.path.relpath(event.src_path, self.folder)
if relpath in self.files: if relpath in self.files:
del self.files[self.files.index(relpath)] del self.files[self.files.index(relpath)]
if not relpath in self.suppressed_fs_events:
self.sock.send({"del" : [relpath]}, self.route) self.sock.send({"del" : [relpath]}, self.route)
self.suppress_fs_event(relpath) else:
self.unsuppress_fs_event(relpath)
class SyncServer(Routing.ServerRoute): class SyncServer(Routing.ServerRoute):
REQUIRED = (Routing.BROADCAST, Routing.ROUTE) REQUIRED = (Routing.BROADCAST, Routing.ROUTE)
def __init__(self, folder, channel, exclude=[".DS_Store", ".git"]): def __init__(self, folder, channel, exclude=[".DS_Store", ".git", ".keep"], deleted_db_path="deleted.db", modified_hook=None, deleted_hook=None):
self.folder = folder if folder[-1] == "/" else folder + "/" self.folder = folder if folder[-1] == "/" else folder + "/"
self.channel = channel self.channel = channel
self.exclude = exclude self.exclude = exclude
self.suppressed_fs_events = [] self.deleted_storage = DeletedStorage(deleted_db_path)
self.modified_hook = modified_hook
self.deleted_hook = deleted_hook
self.broadcast_file_excludes = {}
self.route = None # Set by REQUIRED self.route = None # Set by REQUIRED
self.broadcast = None # Set by REQUIRED self.broadcast = None # Set by REQUIRED
def start(self): def start(self, handler):
self.files = relative_recursive_ls(self.folder, self.folder, exclude=self.exclude) self.files = relative_recursive_ls(self.folder, self.folder, exclude=self.exclude)
observer = Observer() observer = Observer()
observer.schedule(ReloadHandler(self), path=self.folder, recursive=True) observer.schedule(ReloadHandler(self), path=self.folder, recursive=True)
observer.start() observer.start()
def suppress_fs_event(self, file):
if not file in self.suppressed_fs_events:
self.suppressed_fs_events.append(file)
def unsuppress_fs_event(self, file):
if file in self.suppressed_fs_events:
del self.suppressed_fs_events[self.suppressed_fs_events.index(file)]
def run(self, data, handler): def run(self, data, handler):
if type(data) is dict: if type(data) is dict:
if "list" in data: if "list" in data:
client_files = []
for file in data["list"]: for file in data["list"]:
if data["list"][file]["mtime"] < handler.last_stop: client_file = File(file, data["list"][file]["md5"], data["list"][file]["mtime"])
if file in self.files: client_files.append(client_file)
if data["list"][file]["mtime"] < os.path.getmtime(self.folder + file): if not file in self.files:
handler.sock.send({"add" : {relpath : { file = File(file, data["list"][file]["md5"], data["list"][file]["mtime"])
"content" : base64_str_decode(event.src_path), if file in self.deleted_storage.files:
"mtime" : os.path.getmtime(event.src_path)}}}, self.route) handler.sock.send({"del" : [file.relpath]}, self.route)
elif data["list"][file]["mtime"] > os.path.getmtime(self.folder + file):
handler.sock.send({"req" : [file]}, self.route)
else: else:
handler.sock.send({"del" : [file]}, self.route) handler.sock.send({"req" : [file.relpath]}, self.route)
else: for file in self.files:
if file not in self.files: file = File(file, md5(self.folder + file), get_mtime(self.folder + file))
handler.sock.send({"req" : [file]}, self.route) if file not in client_files:
handler.sock.send({"add" : {file.relpath : {"content" : base64_str_decode(self.folder + file.relpath),
"mtime" : os.path.getmtime(self.folder + file.relpath)}}}, self.route)
elif "add" in data: elif "add" in data:
for file in data["add"]: for file in data["add"]:
if not file in self.suppressed_fs_events:
self.suppress_fs_event(file)
folder_path = self.folder + os.path.dirname(file) folder_path = self.folder + os.path.dirname(file)
if not os.path.exists(folder_path): if not os.path.exists(folder_path):
os.makedirs(folder_path) os.makedirs(folder_path)
if os.path.exists: mode = "wb+" if os.path.exists(self.folder + file) else "ab+"
open(self.folder + file, "wb+").write(base64.b64decode(data["add"][file]["content"])) self.broadcast_file_excludes[file] = handler
else: open(self.folder + file, mode).write(base64.b64decode(data["add"][file]["content"]))
open(self.folder + file, "ab+").write(base64.b64decode(data["add"][file]["content"]))
os.utime(self.folder + file, (data["add"][file]["mtime"], data["add"][file]["mtime"])) os.utime(self.folder + file, (data["add"][file]["mtime"], data["add"][file]["mtime"]))
else:
self.unsuppress_fs_event(file)
elif "del" in data: elif "del" in data:
for file in data["del"]: for file in data["del"]:
self.broadcast_file_excludes[file] = handler
self.deleted_storage.add(File(file, md5(self.folder + file), get_mtime(self.folder + file)))
try:
os.remove(self.folder + file) os.remove(self.folder + file)
except FileNotFoundError:
del self.broadcast_file_excludes[file]
Logging.warning("Possible client misbehaviour (%s:%d)" % (handler.info[0], handler.info[1]))
def modified(self, event): def modified(self, event):
relpath = os.path.relpath(event.src_path, self.folder) relpath = os.path.relpath(event.src_path, self.folder)
if relpath not in self.files: if relpath not in self.files:
self.files.append(relpath) self.files.append(relpath)
exclude = []
if relpath in self.broadcast_file_excludes:
exclude.append(self.broadcast_file_excludes[relpath])
self.broadcast.broadcast({"add" : {relpath : { self.broadcast.broadcast({"add" : {relpath : {
"content" : base64_str_decode(event.src_path), "content" : base64_str_decode(event.src_path),
"mtime" : os.path.getmtime(event.src_path)}}}, self.route, self.channel) "mtime" : os.path.getmtime(event.src_path)}}}, self.route, self.channel, exclude=exclude)
suppress_fs_event(relpath) if exclude != []:
del self.broadcast_file_excludes[relpath]
def created(self, event): def created(self, event):
@ -229,5 +289,9 @@ class SyncServer(Routing.ServerRoute):
relpath = os.path.relpath(event.src_path, self.folder) relpath = os.path.relpath(event.src_path, self.folder)
if relpath in self.files: if relpath in self.files:
del self.files[self.files.index(relpath)] del self.files[self.files.index(relpath)]
self.broadcast.broadcast({"del" : [relpath]}, self.route, self.channel) exclude = []
self.suppress_fs_event(relpath) if relpath in self.broadcast_file_excludes:
exclude.append(self.broadcast_file_excludes[relpath])
self.broadcast.broadcast({"del" : [relpath]}, self.route, self.channel, exclude=exclude)
if exclude != []:
del self.broadcast_file_excludes[relpath]