156 lines
4.0 KiB
Python
156 lines
4.0 KiB
Python
__author__ = "Lukas Mahler"
|
|
__version__ = "0.0.1"
|
|
__date__ = "07.11.2021"
|
|
__email__ = "lm@ankerlab.de"
|
|
__status__ = "Development"
|
|
|
|
|
|
try:
|
|
# Defaults
|
|
import os
|
|
import sys
|
|
import stat
|
|
import shutil
|
|
import requests
|
|
import tempfile
|
|
import datetime
|
|
|
|
# Customs
|
|
import pyzipper
|
|
import owncloud
|
|
|
|
# Self
|
|
from src import util
|
|
|
|
except ImportError as e:
|
|
print(f"ERROR: Missing Module [{e}]")
|
|
sys.exit()
|
|
|
|
|
|
def del_rw(action, name, exc):
|
|
os.chmod(name, stat.S_IWRITE)
|
|
os.remove(name)
|
|
|
|
|
|
def create_duplicate_dir(folder):
|
|
"""
|
|
Creates a duplicate of the folder in the tempdir.
|
|
Throws errors when there is a Thumbs.db blocking...
|
|
"""
|
|
temp_dir = tempfile.gettempdir()
|
|
date = datetime.datetime.now()
|
|
ext = date.strftime("%Y-%m-%d")
|
|
src = folder
|
|
dst = os.path.join(temp_dir, ext + "_" + "Dump")
|
|
|
|
print("[*] Trying to create a Duplicate, please wait...")
|
|
if os.path.exists(dst):
|
|
try:
|
|
shutil.rmtree(dst, onerror=del_rw)
|
|
except PermissionError as e:
|
|
print(f"[X] Error: {e}")
|
|
exit(1)
|
|
|
|
shutil.copytree(src, dst)
|
|
print("[*] Successfully created Duplicate")
|
|
|
|
return dst
|
|
|
|
|
|
# inspired by https://stackoverflow.com/questions/60087965/how-to-zip-a-folder-in-python-with-password
|
|
def zip_folder(folder):
|
|
"""
|
|
Zips the duplicated folder in the tempdir.
|
|
"""
|
|
|
|
print("[*] Trying to create corresponding zipfile")
|
|
|
|
zipfilename = folder + ".zip"
|
|
parent_folder = os.path.dirname(folder)
|
|
contents = os.walk(folder)
|
|
|
|
with pyzipper.AESZipFile(zipfilename, 'w', compression=pyzipper.ZIP_DEFLATED, encryption=pyzipper.WZ_AES) as zf:
|
|
|
|
if config["Other"]["zippw"] != "":
|
|
zf.pwd = bytes(config["Other"]["zippw"], encoding="utf8")
|
|
|
|
for root, folders, files in contents:
|
|
# Include all subfolders, including empty ones.
|
|
for folder_name in folders:
|
|
try:
|
|
absolute_path = os.path.join(root, folder_name)
|
|
relative_path = absolute_path.replace(parent_folder + '\\', '')
|
|
print(f" [+] {absolute_path}")
|
|
zf.write(absolute_path, relative_path)
|
|
except:
|
|
print(f" [-] couldn't add {absolute_path}")
|
|
continue
|
|
for file_name in files:
|
|
try:
|
|
absolute_path = os.path.join(root, file_name)
|
|
relative_path = absolute_path.replace(parent_folder + '\\', '')
|
|
print(f" [+] {absolute_path}")
|
|
zf.write(absolute_path, relative_path)
|
|
except:
|
|
print(f" [-] couldn't add {absolute_path}")
|
|
continue
|
|
|
|
print("[*] Successfully created corresponding zipfile")
|
|
|
|
return zipfilename
|
|
|
|
|
|
def upload_to_nextcloud(file):
|
|
"""
|
|
Uploads the file to Own/Nextcloud
|
|
"""
|
|
|
|
url = config["Connection"]["host"]
|
|
usr = config["Auth"]["user"]
|
|
pwd = config["Auth"]["password"]
|
|
pth = config["Other"]["savepath"]
|
|
|
|
# Test Connection
|
|
r = requests.head(url)
|
|
httpc = str(r.status_code)[0]
|
|
if httpc == "4" or httpc == "5":
|
|
print(f"ERROR: HTTP Statuscode for URL [{url}] is [{r.status_code}]")
|
|
return
|
|
|
|
try:
|
|
nxt = owncloud.Client(url)
|
|
nxt.login(usr, pwd)
|
|
except Exception as e:
|
|
print("ERROR: ", e)
|
|
return
|
|
|
|
print("[*] Starting the file Upload...")
|
|
nxt.makedirs(pth, exist_ok=True)
|
|
nxt.put_file(pth, file)
|
|
|
|
print(f"[*] Finished uploading to {url}")
|
|
|
|
|
|
def clean_temp(tempdir):
|
|
if os.path.exists(tempdir):
|
|
print("Cleaning up the tempdir [", tempdir, "]...")
|
|
shutil.rmtree(tempdir, ignore_errors=True)
|
|
|
|
|
|
def main():
|
|
|
|
# This will be uploaded
|
|
folder = os.path.join(os.environ['USERPROFILE'], 'Desktop', "Dump")
|
|
|
|
# Load toml config
|
|
global config
|
|
config = util.getConf("prod.toml")
|
|
|
|
dupe = create_duplicate_dir(folder)
|
|
zipped = zip_folder(dupe)
|
|
upload_to_nextcloud(zipped)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|