Cloudy/cloudy.py

181 lines
4.7 KiB
Python

__author__ = "Lukas Mahler"
__version__ = "0.0.2"
__date__ = "09.11.2021"
__email__ = "m@hler.eu"
__status__ = "Development"
try:
# Defaults
import os
import sys
import stat
import shutil
import getpass
import requests
import tempfile
import datetime
# Customs
import pyzipper
import owncloud
# Self
from src import util
except ImportError as e:
print(f"ERROR: Missing Module [{e}]")
sys.exit()
def del_rw(action, name, exc):
"""
this custom shutil.rmtree on error function
makes read-only files writeable so we can delete them
"""
os.chmod(name, stat.S_IWRITE)
os.remove(name)
def create_duplicate_dir(folder):
"""
Creates a duplicate of the folder in the tempdir.
Throws errors when there is a Thumbs.db blocking...
"""
temp_dir = tempfile.gettempdir()
date = datetime.datetime.now()
ext = date.strftime("%Y-%m-%d")
src = folder
dst = os.path.join(temp_dir, ext + "_" + "Dump")
if not os.path.exists(src):
raise FileNotFoundError(f"The given path [{src}] was not found.")
if not os.path.isdir(src):
raise ValueError(f"The given path [{src}] is not a directory.")
if os.path.exists(dst):
shutil.rmtree(dst, onerror=del_rw)
print("[*] Trying to create a Duplicate, please wait...")
shutil.copytree(src, dst)
print("[*] Successfully created Duplicate")
return dst
def zip_folder(folder):
"""
Zips the duplicated folder in the tempdir.
"""
print("[*] Trying to create corresponding zipfile")
zipfilename = folder + ".zip"
parent_folder = os.path.dirname(folder)
contents = os.walk(folder)
with pyzipper.AESZipFile(zipfilename, 'w', compression=pyzipper.ZIP_DEFLATED, encryption=pyzipper.WZ_AES) as zf:
if config["Other"]["zippw"] != "":
zf.pwd = bytes(config["Other"]["zippw"], encoding="utf8")
for root, folders, files in contents:
# Include all subfolders, including empty ones.
for folder_name in folders:
try:
absolute_path = os.path.join(root, folder_name)
relative_path = absolute_path.replace(parent_folder + '\\', '')
print(f" [+] {absolute_path}")
zf.write(absolute_path, relative_path)
except:
print(f" [-] couldn't add {absolute_path}")
continue
for file_name in files:
try:
absolute_path = os.path.join(root, file_name)
relative_path = absolute_path.replace(parent_folder + '\\', '')
print(f" [+] {absolute_path}")
zf.write(absolute_path, relative_path)
except:
print(f" [-] couldn't add {absolute_path}")
continue
print("[*] Successfully created corresponding zipfile")
return zipfilename
def upload_to_nextcloud(file):
"""
Uploads the file to Own/Nextcloud
"""
url = config["Connection"]["host"]
usr = config["Auth"]["user"]
pwd = config["Auth"]["password"]
pth = config["Other"]["savepath"]
if not usr:
usr = input(f"Please provide a username to [{url}]: ")
if not pwd:
pwd = getpass.getpass(f"Please provide the password for [{usr}]: ")
# If no protocol is added to the host url assume it's https://
if not any(x in ["http://", "https://"] for x in url):
url = "https://" + url
# Test Connection
r = requests.head(url)
httpc = str(r.status_code)[0]
if httpc == "4" or httpc == "5":
print(f"ERROR: HTTP Statuscode for URL [{url}] is [{r.status_code}]")
return
try:
nxt = owncloud.Client(url)
nxt.login(usr, pwd)
except Exception as e:
print("ERROR: ", e)
return
print("[*] Starting the file Upload...")
nxt.makedirs(pth, exist_ok=True)
nxt.put_file(pth, file)
print(f"[*] Finished uploading to {url}")
def cleanup(files):
"""
given a list of files/directories (full path) delete them
"""
for file in files:
if os.path.isfile(file) or os.path.islink(file):
os.remove(file)
elif os.path.isdir(file):
shutil.rmtree(file, onerror=del_rw)
else:
raise ValueError(f"[X] Error: [{file}] is not a file or directory.")
def main():
# Load toml config
global config
config = util.getConf("prod.toml")
folder = config["Other"]["uploaddir"]
dupe = create_duplicate_dir(folder)
zipped = zip_folder(dupe)
upload_to_nextcloud(zipped)
cleanup([dupe, zipped])
if __name__ == "__main__":
main()