This commit is contained in:
Lukas 2022-05-07 13:45:31 +02:00
parent d363004ab3
commit 44f92236f5
3 changed files with 331 additions and 0 deletions

96
main.py Normal file
View File

@ -0,0 +1,96 @@
__author__ = "Lukas Mahler"
__version__ = "0.0.0"
__date__ = "06.05.2022"
__email__ = "m@hler.eu"
__status__ = "Development"
# Default
import re
import sys
import time
# Custom
import discord
import requests
from discord.ext import commands
# Self
from src import util, swapgg
class DiscordBot(commands.Bot):
def __init__(self, log, config, swap):
self.member = []
self.log = log
self.log.pipeOut("Bot initializing")
super().__init__(command_prefix=".")
@self.command(name='register')
async def register_to_notifications(ctx, *args):
await self.send_dm(ctx, ctx.message.author, content="test")
@self.command(name='ss',
help="Provides a Screenshot using swap.gg's API (https://docs.swap.gg/#create-screenshot)")
async def doScreen(ctx, *args):
if len(args) < 1:
await ctx.channel.send("Error Missing Inspection Link!")
return
elif len(args) == 1:
# Check if it's a valid inspection Link
inspectionlnk = requests.utils.unquote(args[0])
pattern = re.compile("^steam:\/\/rungame\/730\/\d+\/[+ ]csgo_econ_action_preview ([SM])(\d+)A(\d+)D(\d+)$")
if pattern.search(inspectionlnk):
await ctx.message.add_reaction("")
swap.getScreenshot(inspectionlnk)
while swap.imglnk == "":
time.sleep(0.2)
e = discord.Embed(description=swap.imglnk)
e.set_image(url=swap.imglnk)
await ctx.channel.send(f"{ctx.message.author.mention} -> {swap.imglnk}", embed=e)
else:
await ctx.channel.send(f"Invalid Inspection Link")
else:
await ctx.channel.send(f"The command 'ss' doesn't support multiple arguments")
async def send_dm(self, ctx, member: discord.Member, *, content):
await member.send(content)
self.log.pipeOut(f"DM sent to [{ctx.message.author}] [{content}]")
async def on_ready(self):
self.log.pipeOut("Bot is ready")
# guild = discord.utils.get(self.guilds, name=self.GUILD)
# guilds = [[guild.name, guild.id] for guild in self.guilds]
self.log.pipeOut(f"{self.user} is connected to the following Servers:")
for idx, guild in enumerate(self.guilds):
self.log.pipeOut(f"{idx + 1}. {guild.name} [server_id: {guild.id}]")
async def on_command_error(self, ctx, error):
self.log.pipeOut(f'msg: {ctx} | error: {error}', lvl='ERROR')
def main():
# Start logger
logpath = "./log"
logname = "discord-bot.log"
log = util.Logger(logpath, logname)
# Log any unhandled exception
sys.excepthook = log.unhandledException
# Load toml config
config = util.getConf("prod.toml", log)
# Change loglevel from config
log.changeLevel(config['Log']['level'])
swap = swapgg.Swapgg(log, config)
bot = DiscordBot(log, config, swap)
bot.run(config['Auth']['token'])
if __name__ == '__main__':
main()

97
src/swapgg.py Normal file
View File

@ -0,0 +1,97 @@
__author__ = "Lukas Mahler"
__version__ = "0.0.0"
__date__ = "06.05.2022"
__email__ = "m@hler.eu"
__status__ = "Development"
# Default
import json
# Custom
import requests
import socketio
class Swapgg:
def __init__(self, log, config):
self.apikey = config['Swap']['apikey']
self.headers = {'Content-type': 'application/json', 'Authorization': f'{self.apikey}'}
self.sio = socketio.Client()
self.log = log
self.baseurl = "https://market-api.swap.gg/v1/"
self.socketToken = None
self.imglnk = None
self.requested = []
self._getSocketToken()
def _getSocketToken(self):
endpoint = "user/websocket"
r = requests.get(self.baseurl + endpoint, headers=self.headers)
if r.status_code == 200:
response = r.json()
if response['status'] == "OK":
self.socketToken = response['result']['token']
self._initSocket()
else:
self.log.pipeOut({response['status']}, lvl='ERROR')
else:
self.log.pipeOut({r.status_code}, lvl='ERROR')
def _initSocket(self):
url = "https://market-ws.swap.gg/"
self.sio.connect(url)
self.sio.emit('auth', self.socketToken)
self.sio.on("screenshot:ready", self.passScreenshot)
def _post(self, endpoint: str, payload: dict) -> requests.Response:
url = self.baseurl + endpoint
self.log.pipeOut(f"Posting to [{url}] / Payload [{payload}]", lvl='DEBUG')
req = requests.post(url, headers=self.headers, data=json.dumps(payload))
self.log.pipeOut(f"Returned [{req.status_code}] - {req.text}", lvl='DEBUG')
return req
def getScreenshot(self, inspect: str):
self.imglnk = ""
endpoint = "screenshot"
# Rebuild inspect link
inspectlnk = inspect.replace("%20", " ")
payload = {"inspectLink": inspectlnk}
req = self._post(endpoint, payload)
response = req.json()
self.log.pipeOut(response)
if response['status'] == "OK":
rslt = response['result']
# Looks like swap provides no more requestId. sadge
# rid = rslt['requestId']
rid = inspectlnk
if rslt['state'] == "COMPLETED":
self.imglnk = response['result']['imageLink']
self.log.pipeOut(self.imglnk)
elif rslt['state'] == "IN_QUEUE":
self.requested.append(rid)
pass
else:
self.log.pipeOut({response['status']}, lvl='ERROR')
def passScreenshot(self, data: dict):
# print(data)
if 'inspectLink' in data:
if data['inspectLink'] in self.requested:
self.imglnk = data['imageLink']
self.requested.remove(data['inspectLink'])
else:
self.log.pipeOut(data, lvl='WARNING')
if __name__ == '__main__':
exit()

138
src/util.py Normal file
View File

@ -0,0 +1,138 @@
"""
TBD
"""
__author__ = "Lukas Mahler"
__version__ = "0.0.0"
__date__ = "05.05.2022"
__email__ = "m@hler.eu"
__status__ = "Development"
# Default
import sys
import time
import shutil
import inspect
import logging
import pathlib
from logging.handlers import RotatingFileHandler
# Custom
import toml
def getConf(fname, log):
"""
"""
p = pathlib.Path(fname)
if p.suffix == ".toml":
if p.is_file() and p.exists():
try:
config = toml.load(fname)
if isValidConf(config):
return config
else:
log.pipeOut(f"The provided '.toml' is invalid", lvl="critical")
except ValueError as e:
log.pipeOut(f"The provided '.toml' is probably invalid, returned error: [{e}]", lvl="critical")
else:
log.pipeOut(f"Couldn't locate the '.toml' file [{fname}].", lvl="error")
log.pipeOut("Creating a new '.toml' file from template, please edit and restart.")
shutil.copy("src/template.toml", fname)
exit(1)
else:
log.pipeOut(f"The provided config file [{fname}] is not a '.toml' file.", lvl="error")
log.pipeOut("Creating a new '.toml' file from template, please edit and restart.")
shutil.copy("src/template.toml", "prod.toml")
exit(1)
def isValidConf(config):
"""
"""
valid = True
required = {'Auth': ['token', 'server'], 'Swap': ['apikey'], 'Log': ['level']}
for required_header in required:
if required_header in config:
for required_key in required[required_header]:
if required_key not in config[required_header]:
print(f"[Err] Missing key {required_key}")
valid = False
else:
if config[required_header][required_key] == "":
print(f"[Err] Key {required_key} can't be empty")
valid = False
else:
print(f"[Err] Missing header {required_header}")
valid = False
return valid
class Logger:
"""
Create a rotating log in a log folder
"""
def __init__(self, logpath, logname, reprint=True, lvl="INFO"):
self.reprint = reprint
self.log = logging.getLogger("mylog")
p = pathlib.Path(logpath)
p.mkdir(parents=True, exist_ok=True)
handler = RotatingFileHandler(pathlib.Path(logpath, logname),
encoding='utf-8',
maxBytes=1 * 1024 * 1024,
backupCount=10)
logformat = logging.Formatter("%(asctime)s %(levelname)8s %(message)s", "%Y-%m-%d %H:%M:%S")
handler.setFormatter(logformat)
self.log.addHandler(handler)
self.log.setLevel(lvl)
def changeLevel(self, lvl):
"""
Change the loglevel after creation
"""
self.log.setLevel(logging.getLevelName(lvl))
def pipeOut(self, msg, lvl="INFO"):
"""
All output pipes through this function.
It's possible to print the output to console [set reprint to True]
or run in silent log mode. [set reprint to False]
"""
# caller = inspect.currentframe().f_back.f_code.co_name # Returns caller function name
caller = pathlib.Path(inspect.stack()[1].filename).stem # Returns caller filename without extension
lvl = lvl.upper()
lvln = int(getattr(logging, lvl))
self.log.log(lvln, msg)
if self.reprint and lvln >= self.log.getEffectiveLevel():
print(f"[{time.strftime('%H:%M:%S')}]{f'[{lvl}]':10s}[{caller}] {msg}")
if lvl == "CRITICAL":
exit(1)
def unhandledException(self, exc_type, exc_value, exc_traceback):
"""
src = https://stackoverflow.com/a/16993115/5593051
"""
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
self.log.critical("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
if __name__ == "__main__":
exit()