From 44f92236f564747c95c5b899a5ca9a51fd0f0358 Mon Sep 17 00:00:00 2001 From: Lukas Date: Sat, 7 May 2022 13:45:31 +0200 Subject: [PATCH] [init] --- main.py | 96 +++++++++++++++++++++++++++++++++++ src/swapgg.py | 97 +++++++++++++++++++++++++++++++++++ src/util.py | 138 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 331 insertions(+) create mode 100644 main.py create mode 100644 src/swapgg.py create mode 100644 src/util.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..54d01ca --- /dev/null +++ b/main.py @@ -0,0 +1,96 @@ +__author__ = "Lukas Mahler" +__version__ = "0.0.0" +__date__ = "06.05.2022" +__email__ = "m@hler.eu" +__status__ = "Development" + +# Default +import re +import sys +import time + +# Custom +import discord +import requests +from discord.ext import commands + +# Self +from src import util, swapgg + + +class DiscordBot(commands.Bot): + + def __init__(self, log, config, swap): + + self.member = [] + self.log = log + self.log.pipeOut("Bot initializing") + + super().__init__(command_prefix=".") + + @self.command(name='register') + async def register_to_notifications(ctx, *args): + await self.send_dm(ctx, ctx.message.author, content="test") + + @self.command(name='ss', + help="Provides a Screenshot using swap.gg's API (https://docs.swap.gg/#create-screenshot)") + async def doScreen(ctx, *args): + if len(args) < 1: + await ctx.channel.send("Error Missing Inspection Link!") + return + elif len(args) == 1: + # Check if it's a valid inspection Link + inspectionlnk = requests.utils.unquote(args[0]) + pattern = re.compile("^steam:\/\/rungame\/730\/\d+\/[+ ]csgo_econ_action_preview ([SM])(\d+)A(\d+)D(\d+)$") + if pattern.search(inspectionlnk): + await ctx.message.add_reaction("✅") + swap.getScreenshot(inspectionlnk) + while swap.imglnk == "": + time.sleep(0.2) + e = discord.Embed(description=swap.imglnk) + e.set_image(url=swap.imglnk) + await ctx.channel.send(f"{ctx.message.author.mention} -> {swap.imglnk}", embed=e) + else: + await ctx.channel.send(f"Invalid Inspection Link") + else: + await ctx.channel.send(f"The command 'ss' doesn't support multiple arguments") + + async def send_dm(self, ctx, member: discord.Member, *, content): + await member.send(content) + self.log.pipeOut(f"DM sent to [{ctx.message.author}] [{content}]") + + async def on_ready(self): + self.log.pipeOut("Bot is ready") + # guild = discord.utils.get(self.guilds, name=self.GUILD) + # guilds = [[guild.name, guild.id] for guild in self.guilds] + self.log.pipeOut(f"{self.user} is connected to the following Servers:") + for idx, guild in enumerate(self.guilds): + self.log.pipeOut(f"{idx + 1}. {guild.name} [server_id: {guild.id}]") + + async def on_command_error(self, ctx, error): + self.log.pipeOut(f'msg: {ctx} | error: {error}', lvl='ERROR') + + +def main(): + # Start logger + logpath = "./log" + logname = "discord-bot.log" + log = util.Logger(logpath, logname) + + # Log any unhandled exception + sys.excepthook = log.unhandledException + + # Load toml config + config = util.getConf("prod.toml", log) + + # Change loglevel from config + log.changeLevel(config['Log']['level']) + + swap = swapgg.Swapgg(log, config) + bot = DiscordBot(log, config, swap) + + bot.run(config['Auth']['token']) + + +if __name__ == '__main__': + main() diff --git a/src/swapgg.py b/src/swapgg.py new file mode 100644 index 0000000..4e57970 --- /dev/null +++ b/src/swapgg.py @@ -0,0 +1,97 @@ +__author__ = "Lukas Mahler" +__version__ = "0.0.0" +__date__ = "06.05.2022" +__email__ = "m@hler.eu" +__status__ = "Development" + + +# Default +import json + +# Custom +import requests +import socketio + + +class Swapgg: + + def __init__(self, log, config): + self.apikey = config['Swap']['apikey'] + self.headers = {'Content-type': 'application/json', 'Authorization': f'{self.apikey}'} + self.sio = socketio.Client() + self.log = log + + self.baseurl = "https://market-api.swap.gg/v1/" + self.socketToken = None + self.imglnk = None + self.requested = [] + + self._getSocketToken() + + def _getSocketToken(self): + endpoint = "user/websocket" + r = requests.get(self.baseurl + endpoint, headers=self.headers) + if r.status_code == 200: + response = r.json() + if response['status'] == "OK": + self.socketToken = response['result']['token'] + self._initSocket() + else: + self.log.pipeOut({response['status']}, lvl='ERROR') + else: + self.log.pipeOut({r.status_code}, lvl='ERROR') + + def _initSocket(self): + url = "https://market-ws.swap.gg/" + self.sio.connect(url) + self.sio.emit('auth', self.socketToken) + self.sio.on("screenshot:ready", self.passScreenshot) + + def _post(self, endpoint: str, payload: dict) -> requests.Response: + url = self.baseurl + endpoint + self.log.pipeOut(f"Posting to [{url}] / Payload [{payload}]", lvl='DEBUG') + req = requests.post(url, headers=self.headers, data=json.dumps(payload)) + self.log.pipeOut(f"Returned [{req.status_code}] - {req.text}", lvl='DEBUG') + + return req + + def getScreenshot(self, inspect: str): + self.imglnk = "" + endpoint = "screenshot" + + # Rebuild inspect link + inspectlnk = inspect.replace("%20", " ") + + payload = {"inspectLink": inspectlnk} + req = self._post(endpoint, payload) + response = req.json() + self.log.pipeOut(response) + + if response['status'] == "OK": + rslt = response['result'] + + # Looks like swap provides no more requestId. sadge + # rid = rslt['requestId'] + rid = inspectlnk + + if rslt['state'] == "COMPLETED": + self.imglnk = response['result']['imageLink'] + self.log.pipeOut(self.imglnk) + elif rslt['state'] == "IN_QUEUE": + self.requested.append(rid) + pass + else: + self.log.pipeOut({response['status']}, lvl='ERROR') + + def passScreenshot(self, data: dict): + # print(data) + if 'inspectLink' in data: + if data['inspectLink'] in self.requested: + self.imglnk = data['imageLink'] + self.requested.remove(data['inspectLink']) + else: + self.log.pipeOut(data, lvl='WARNING') + + +if __name__ == '__main__': + exit() diff --git a/src/util.py b/src/util.py new file mode 100644 index 0000000..58a8840 --- /dev/null +++ b/src/util.py @@ -0,0 +1,138 @@ +""" +TBD +""" + +__author__ = "Lukas Mahler" +__version__ = "0.0.0" +__date__ = "05.05.2022" +__email__ = "m@hler.eu" +__status__ = "Development" + + +# Default +import sys +import time +import shutil +import inspect +import logging +import pathlib +from logging.handlers import RotatingFileHandler + +# Custom +import toml + + +def getConf(fname, log): + """ + + """ + p = pathlib.Path(fname) + + if p.suffix == ".toml": + if p.is_file() and p.exists(): + try: + config = toml.load(fname) + if isValidConf(config): + return config + else: + log.pipeOut(f"The provided '.toml' is invalid", lvl="critical") + except ValueError as e: + log.pipeOut(f"The provided '.toml' is probably invalid, returned error: [{e}]", lvl="critical") + else: + log.pipeOut(f"Couldn't locate the '.toml' file [{fname}].", lvl="error") + log.pipeOut("Creating a new '.toml' file from template, please edit and restart.") + shutil.copy("src/template.toml", fname) + exit(1) + else: + log.pipeOut(f"The provided config file [{fname}] is not a '.toml' file.", lvl="error") + log.pipeOut("Creating a new '.toml' file from template, please edit and restart.") + shutil.copy("src/template.toml", "prod.toml") + exit(1) + + +def isValidConf(config): + """ + + """ + valid = True + + required = {'Auth': ['token', 'server'], 'Swap': ['apikey'], 'Log': ['level']} + + for required_header in required: + if required_header in config: + for required_key in required[required_header]: + if required_key not in config[required_header]: + print(f"[Err] Missing key {required_key}") + valid = False + else: + if config[required_header][required_key] == "": + print(f"[Err] Key {required_key} can't be empty") + valid = False + else: + print(f"[Err] Missing header {required_header}") + valid = False + + return valid + + +class Logger: + """ + Create a rotating log in a log folder + """ + + def __init__(self, logpath, logname, reprint=True, lvl="INFO"): + + self.reprint = reprint + self.log = logging.getLogger("mylog") + p = pathlib.Path(logpath) + p.mkdir(parents=True, exist_ok=True) + + handler = RotatingFileHandler(pathlib.Path(logpath, logname), + encoding='utf-8', + maxBytes=1 * 1024 * 1024, + backupCount=10) + logformat = logging.Formatter("%(asctime)s %(levelname)8s %(message)s", "%Y-%m-%d %H:%M:%S") + handler.setFormatter(logformat) + self.log.addHandler(handler) + self.log.setLevel(lvl) + + def changeLevel(self, lvl): + """ + Change the loglevel after creation + """ + self.log.setLevel(logging.getLevelName(lvl)) + + def pipeOut(self, msg, lvl="INFO"): + """ + All output pipes through this function. + It's possible to print the output to console [set reprint to True] + or run in silent log mode. [set reprint to False] + """ + + # caller = inspect.currentframe().f_back.f_code.co_name # Returns caller function name + caller = pathlib.Path(inspect.stack()[1].filename).stem # Returns caller filename without extension + + lvl = lvl.upper() + lvln = int(getattr(logging, lvl)) + self.log.log(lvln, msg) + + if self.reprint and lvln >= self.log.getEffectiveLevel(): + print(f"[{time.strftime('%H:%M:%S')}]{f'[{lvl}]':10s}[{caller}] {msg}") + + if lvl == "CRITICAL": + exit(1) + + def unhandledException(self, exc_type, exc_value, exc_traceback): + """ + src = https://stackoverflow.com/a/16993115/5593051 + """ + + if issubclass(exc_type, KeyboardInterrupt): + sys.__excepthook__(exc_type, exc_value, exc_traceback) + return + + self.log.critical("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback)) + + +if __name__ == "__main__": + exit()