__author__ = "Lukas Mahler" __version__ = "0.0.2" __date__ = "09.11.2021" __email__ = "m@hler.eu" __status__ = "Development" try: # Defaults import os import sys import stat import shutil import getpass import requests import tempfile import datetime # Customs import pyzipper import owncloud # Self from src import util except ImportError as e: print(f"ERROR: Missing Module [{e}]") sys.exit() def del_rw(action, name, exc): """ this custom shutil.rmtree on error function makes read-only files writeable so we can delete them """ os.chmod(name, stat.S_IWRITE) os.remove(name) def create_duplicate_dir(folder): """ Creates a duplicate of the folder in the tempdir. Throws errors when there is a Thumbs.db blocking... """ temp_dir = tempfile.gettempdir() date = datetime.datetime.now() ext = date.strftime("%Y-%m-%d") src = folder dst = os.path.join(temp_dir, ext + "_" + "Dump") if not os.path.exists(src): raise FileNotFoundError(f"The given path [{src}] was not found.") if not os.path.isdir(src): raise ValueError(f"The given path [{src}] is not a directory.") if os.path.exists(dst): shutil.rmtree(dst, onerror=del_rw) print("[*] Trying to create a Duplicate, please wait...") shutil.copytree(src, dst) print("[*] Successfully created Duplicate") return dst def zip_folder(folder): """ Zips the duplicated folder in the tempdir. """ print("[*] Trying to create corresponding zipfile") zipfilename = folder + ".zip" parent_folder = os.path.dirname(folder) contents = os.walk(folder) with pyzipper.AESZipFile(zipfilename, 'w', compression=pyzipper.ZIP_DEFLATED, encryption=pyzipper.WZ_AES) as zf: if config["Other"]["zippw"] != "": zf.pwd = bytes(config["Other"]["zippw"], encoding="utf8") for root, folders, files in contents: # Include all subfolders, including empty ones. for folder_name in folders: try: absolute_path = os.path.join(root, folder_name) relative_path = absolute_path.replace(parent_folder + '\\', '') print(f" [+] {absolute_path}") zf.write(absolute_path, relative_path) except: print(f" [-] couldn't add {absolute_path}") continue for file_name in files: try: absolute_path = os.path.join(root, file_name) relative_path = absolute_path.replace(parent_folder + '\\', '') print(f" [+] {absolute_path}") zf.write(absolute_path, relative_path) except: print(f" [-] couldn't add {absolute_path}") continue print("[*] Successfully created corresponding zipfile") return zipfilename def upload_to_nextcloud(file): """ Uploads the file to Own/Nextcloud """ url = config["Connection"]["host"] usr = config["Auth"]["user"] pwd = config["Auth"]["password"] pth = config["Other"]["savepath"] if not usr: usr = input(f"Please provide a username to [{url}]: ") if not pwd: pwd = getpass.getpass(f"Please provide the password for [{usr}]: ") # If no protocol is added to the host url assume it's https:// if not any(x in ["http://", "https://"] for x in url): url = "https://" + url # Test Connection r = requests.head(url) httpc = str(r.status_code)[0] if httpc == "4" or httpc == "5": print(f"ERROR: HTTP Statuscode for URL [{url}] is [{r.status_code}]") return try: nxt = owncloud.Client(url) nxt.login(usr, pwd) except Exception as e: print("ERROR: ", e) return print("[*] Starting the file Upload...") nxt.makedirs(pth, exist_ok=True) nxt.put_file(pth, file) print(f"[*] Finished uploading to {url}") def cleanup(files): """ given a list of files/directories (full path) delete them """ for file in files: if os.path.isfile(file) or os.path.islink(file): os.remove(file) elif os.path.isdir(file): shutil.rmtree(file, onerror=del_rw) else: raise ValueError(f"[X] Error: [{file}] is not a file or directory.") def main(): # Load toml config global config config = util.getConf("prod.toml") folder = config["Other"]["uploaddir"] dupe = create_duplicate_dir(folder) zipped = zip_folder(dupe) upload_to_nextcloud(zipped) cleanup([dupe, zipped]) if __name__ == "__main__": main()