181 lines
4.7 KiB
Python
181 lines
4.7 KiB
Python
__author__ = "Lukas Mahler"
|
|
__version__ = "0.0.2"
|
|
__date__ = "09.11.2021"
|
|
__email__ = "m@hler.eu"
|
|
__status__ = "Development"
|
|
|
|
|
|
try:
|
|
# Defaults
|
|
import os
|
|
import sys
|
|
import stat
|
|
import shutil
|
|
import getpass
|
|
import requests
|
|
import tempfile
|
|
import datetime
|
|
|
|
# Customs
|
|
import pyzipper
|
|
import owncloud
|
|
|
|
# Self
|
|
from src import util
|
|
|
|
except ImportError as e:
|
|
print(f"ERROR: Missing Module [{e}]")
|
|
sys.exit()
|
|
|
|
|
|
def del_rw(action, name, exc):
|
|
"""
|
|
this custom shutil.rmtree on error function
|
|
makes read-only files writeable so we can delete them
|
|
"""
|
|
|
|
os.chmod(name, stat.S_IWRITE)
|
|
os.remove(name)
|
|
|
|
|
|
def create_duplicate_dir(folder):
|
|
"""
|
|
Creates a duplicate of the folder in the tempdir.
|
|
Throws errors when there is a Thumbs.db blocking...
|
|
"""
|
|
|
|
temp_dir = tempfile.gettempdir()
|
|
date = datetime.datetime.now()
|
|
ext = date.strftime("%Y-%m-%d")
|
|
src = folder
|
|
dst = os.path.join(temp_dir, ext + "_" + "Dump")
|
|
|
|
if not os.path.exists(src):
|
|
raise FileNotFoundError(f"The given path [{src}] was not found.")
|
|
|
|
if not os.path.isdir(src):
|
|
raise ValueError(f"The given path [{src}] is not a directory.")
|
|
|
|
if os.path.exists(dst):
|
|
shutil.rmtree(dst, onerror=del_rw)
|
|
|
|
print("[*] Trying to create a Duplicate, please wait...")
|
|
shutil.copytree(src, dst)
|
|
print("[*] Successfully created Duplicate")
|
|
|
|
return dst
|
|
|
|
|
|
def zip_folder(folder):
|
|
"""
|
|
Zips the duplicated folder in the tempdir.
|
|
"""
|
|
|
|
print("[*] Trying to create corresponding zipfile")
|
|
|
|
zipfilename = folder + ".zip"
|
|
parent_folder = os.path.dirname(folder)
|
|
contents = os.walk(folder)
|
|
|
|
with pyzipper.AESZipFile(zipfilename, 'w', compression=pyzipper.ZIP_DEFLATED, encryption=pyzipper.WZ_AES) as zf:
|
|
|
|
if config["Other"]["zippw"] != "":
|
|
zf.pwd = bytes(config["Other"]["zippw"], encoding="utf8")
|
|
|
|
for root, folders, files in contents:
|
|
# Include all subfolders, including empty ones.
|
|
for folder_name in folders:
|
|
try:
|
|
absolute_path = os.path.join(root, folder_name)
|
|
relative_path = absolute_path.replace(parent_folder + '\\', '')
|
|
print(f" [+] {absolute_path}")
|
|
zf.write(absolute_path, relative_path)
|
|
except:
|
|
print(f" [-] couldn't add {absolute_path}")
|
|
continue
|
|
for file_name in files:
|
|
try:
|
|
absolute_path = os.path.join(root, file_name)
|
|
relative_path = absolute_path.replace(parent_folder + '\\', '')
|
|
print(f" [+] {absolute_path}")
|
|
zf.write(absolute_path, relative_path)
|
|
except:
|
|
print(f" [-] couldn't add {absolute_path}")
|
|
continue
|
|
|
|
print("[*] Successfully created corresponding zipfile")
|
|
|
|
return zipfilename
|
|
|
|
|
|
def upload_to_nextcloud(file):
|
|
"""
|
|
Uploads the file to Own/Nextcloud
|
|
"""
|
|
|
|
url = config["Connection"]["host"]
|
|
usr = config["Auth"]["user"]
|
|
pwd = config["Auth"]["password"]
|
|
pth = config["Other"]["savepath"]
|
|
|
|
if not usr:
|
|
usr = input(f"Please provide a username to [{url}]: ")
|
|
|
|
if not pwd:
|
|
pwd = getpass.getpass(f"Please provide the password for [{usr}]: ")
|
|
|
|
# If no protocol is added to the host url assume it's https://
|
|
if not any(x in ["http://", "https://"] for x in url):
|
|
url = "https://" + url
|
|
|
|
# Test Connection
|
|
r = requests.head(url)
|
|
httpc = str(r.status_code)[0]
|
|
if httpc == "4" or httpc == "5":
|
|
print(f"ERROR: HTTP Statuscode for URL [{url}] is [{r.status_code}]")
|
|
return
|
|
|
|
try:
|
|
nxt = owncloud.Client(url)
|
|
nxt.login(usr, pwd)
|
|
except Exception as e:
|
|
print("ERROR: ", e)
|
|
return
|
|
|
|
print("[*] Starting the file Upload...")
|
|
nxt.makedirs(pth, exist_ok=True)
|
|
nxt.put_file(pth, file)
|
|
|
|
print(f"[*] Finished uploading to {url}")
|
|
|
|
|
|
def cleanup(files):
|
|
"""
|
|
given a list of files/directories (full path) delete them
|
|
"""
|
|
|
|
for file in files:
|
|
if os.path.isfile(file) or os.path.islink(file):
|
|
os.remove(file)
|
|
elif os.path.isdir(file):
|
|
shutil.rmtree(file, onerror=del_rw)
|
|
else:
|
|
raise ValueError(f"[X] Error: [{file}] is not a file or directory.")
|
|
|
|
|
|
def main():
|
|
|
|
# Load toml config
|
|
global config
|
|
config = util.getConf("prod.toml")
|
|
|
|
folder = config["Other"]["uploaddir"]
|
|
dupe = create_duplicate_dir(folder)
|
|
zipped = zip_folder(dupe)
|
|
upload_to_nextcloud(zipped)
|
|
cleanup([dupe, zipped])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|