diff --git a/myTS3.py b/myTS3.py index 8243dcf..637116f 100644 --- a/myTS3.py +++ b/myTS3.py @@ -4,21 +4,20 @@ TBD __author__ = "Lukas Mahler" __version__ = "0.0.0" -__date__ = "11.10.2021" +__date__ = "30.10.2021" __email__ = "m@hler.eu" __status__ = "Development" # Default import sys import time -import json import os.path # Custom import ts3 # Self -from src import util, gecko +from src import util, gecko, commands class TSbot: @@ -35,16 +34,19 @@ class TSbot: self.sid = conf["Connection"]["sid"] self.user = conf["Authentication"]["user"] self.pwd = conf["Authentication"]["pwd"] - self.allowed_sgids = conf["Allowed"]["sgids"] + self.sgid_admin = conf["Groups"]["admins"] + self.sgid_mute = conf["Groups"]["mute"] + self.crypto = conf["Misc"]["crypto"] self.nickname = conf["Misc"]["nickname"] self.whitelisted = conf["Misc"]["whitelisted"] # Initialize self - self.gecko = gecko.GeckoAPI() self.log = log self.myid = None self.running = True + self.version = __version__ self.started = time.time() + self.gecko = gecko.GeckoAPI() self.crypto_update = self.started - 1800 self.pipeOut(f"Trying to connect to: {self.host}:{self.port}") @@ -107,7 +109,7 @@ class TSbot: time.sleep(5) # Notify connected admins - self.notifyAdmin() + commands.notifyAdmin(self) # ----------- LOOP HERE ------------- while self.running: @@ -116,11 +118,12 @@ class TSbot: self.bot.version() # Auto-update crypto price channels every 30 minutes - if self.crypto_update + 1800 < time.time(): - self.crypto_update = time.time() + if self.crypto: + if self.crypto_update + 1800 < time.time(): + self.crypto_update = time.time() + + commands.ticker(self, "") - self.lookupcommand(".btc", self.myid) - self.lookupcommand(".dot", self.myid) else: pass @@ -169,18 +172,14 @@ class TSbot: if 'client_nickname' in event[0]: displayname = event[0]['client_nickname'] - elif 'clid' in event[0]: - displayname = event[0]['clid'] else: - self.pipeOut(event[0]) - # displayname = "Unresolved" - continue # can't resolve no clid + displayname = event[0]['clid'] self.pipeOut(f"Clientid [{displayname}] disconnected.") # New text message elif event_type == "notifytextmessage": - msg = event[0]["msg"] + msg = event[0]["msg"].replace("\n", " ") invkr = event[0]["invokername"] invkr_id = event[0]["invokerid"] self.pipeOut(f'Message | From: "{invkr}" | Content: "{msg}"') @@ -191,6 +190,8 @@ class TSbot: else: self.pipeOut(f"Unknown Event: {event.__dict__}", lvl="warning") +# Class Utility -------------------------------------------------------------------------------------------------------- + def pipeOut(self, msg, lvl="INFO", reprint=True): """ All output pipes through this function. @@ -218,23 +219,6 @@ class TSbot: self.pipeOut(msg) self.running = False - def notifyAdmin(self): - """ - Ping every available admin. - """ - - clients = self.bot.clientlist() - clients = [client for client in clients if client["client_type"] != "1"] - for client in clients: - cldbid = client["client_database_id"] - clid = client["clid"] - if self.isadmin(cldbid): - msg = "" - self.bot.sendtextmessage(targetmode=1, target=clid, msg=msg) - - if not self.whitelisted: - time.sleep(1) - def kickall(self, msg): """ TODO @@ -246,7 +230,7 @@ class TSbot: self.bot.clientpoke(msg=msg, clid=clid) # TODO - def poke(self, msg=None, n=10, delay=0.2, usr='all'): + def poke(self, msg=None, n=1, delay=0.2, usr='all'): """ ping a single or multiple users for n times including a msg. """ @@ -316,32 +300,6 @@ class TSbot: except Exception as e: self.pipeOut(e, lvl="error") - def list(self, what, invkr_id): - """ - Message the invoker of the function either a list of channels or a list of clients. - """ - - mydict = {} - if what == "channel": - channels = self.bot.channellist() - for channel in channels: - order = channel["channel_order"] - mydict[order] = [channel["channel_name"], channel["cid"]] - - mydict = dict(sorted(mydict.items())) - msg = json.dumps(mydict) - - elif what == "clients": - clients = self.bot.clientlist() - for client in clients: - mydict[client["client_nickname"]] = {"clid": client["clid"], "cldbid": client["client_database_id"]} - mydict = dict(sorted(mydict.items())) - msg = json.dumps(mydict) - else: - msg = None - - self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg) - def isqueryclient(self, clid): """ Check if the given client-uid is a query client. @@ -351,13 +309,16 @@ class TSbot: except ts3.query.TS3QueryError as e: self.pipeOut(f"given clid {clid} returned error:\n{e}", lvl="ERROR") return True - - if client[0]["client_type"] == "1": - self.pipeOut(f"[{clid}] ISQUERY: True") - return True - else: - self.pipeOut(f"[{clid}] ISQUERY: False") - return False + try: + if client[0]["client_type"] == "1": + self.pipeOut(f"[{clid}] ISQUERY: True") + return True + else: + self.pipeOut(f"[{clid}] ISQUERY: False") + return False + except KeyError as e: + self.pipeOut(f"Error [{e}] on client [{client.__dict__}] ") + return True # Da wahrscheinlich ein Query Client def isadmin(self, cldbid): """ @@ -375,139 +336,13 @@ class TSbot: self.pipeOut(str(groups.__dict__), lvl="DEBUG") # DEBUG for group in groups: - if any(sgid == group['sgid'] for sgid in self.allowed_sgids): + if any(sgid == group['sgid'] for sgid in self.sgid_admin): self.pipeOut(f"[{cldbid}] ISADMIN: True") return True self.pipeOut(f"[{cldbid}] ISADMIN: False") return False - def lookupcommand(self, msg, invkr_id): - """ - Every message starting with '.' gets passed and evaluated in this function. - Commands in this function are sorted alphabetically. - - Command Parameter 1 Parameter 2 - --------------------------------------------------------- - .admin - .annoy target message - .btc / .eth - .dot / .ada - .follow target - .info clid - .kickall message - .list channel/clients - .pingall message - .rename nickname - .roll - .stop / .quit / .q - .test - """ - - commandstring = msg.split(" ") - command = commandstring[0] - parameter = commandstring[1:] - self.pipeOut(f"command: {command} | parameter: {parameter} | invkr_id: {invkr_id}") - - if command == ".admin": - self.notifyAdmin() - - elif command == ".annoy": - try: - target = parameter[0] - msg = parameter[1] - self.poke(msg=msg, usr=target) - except IndexError: - err = "Please use the command like this: .annoy TARGET MESSAGE" - self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) - pass - - elif command == ".btc" or command == ".eth": - channelname = f"[cspacerC1]BTC: {self.gecko.getSymbol('Bitcoin', decimal=0)} | " \ - f"ETH: {self.gecko.getSymbol('ethereum', decimal=0)}" - try: - self.editChannelname(200, channelname) - except ts3.query.TS3QueryError: - pass - - elif command == ".dot" or command == ".ada": - channelname = f"[cspacerC2]DOT: {self.gecko.getSymbol('polkadot', decimal=2)} | " \ - f"ADA: {self.gecko.getSymbol('cardano', decimal=2)}" - try: - self.editChannelname(201, channelname) - except ts3.query.TS3QueryError: - pass - - elif command == ".follow": - if not parameter[0]: - err = "Please use the command like this: .follow TARGET" - self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) - return - - target = parameter[0] - try: - clients = self.bot.clientfind(pattern=target) - clids = [client["clid"] for client in clients] - if len(clids) == 1: - cid = self.bot.clientinfo(clids[0])["cid"] - self.bot.clientmove(clid=self.myid, cid=cid) - else: - self.pipeOut(f"Found multiple matching clients using pattern {target}:{clids}") - return - - except ts3.query.TS3QueryError as e: - self.pipeOut(f"No client found using pattern {target}," - f"returned error:\n{e}", lvl="ERROR") - return - - elif command == ".info": - # TODO implement more then just the runtime - msg = f"Runtime: {util.getRuntime(self.started)}" - self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg) - - elif command == ".kickall": - self.kickall("test") # TODO - - elif command == ".list": - try: - self.list(parameter[0], invkr_id) - except IndexError: - err = "Please use the command like this: .list channel/clients" - self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) - pass - - elif command == ".pingall": - try: - msg = parameter[0] - self.poke(msg=msg) - except IndexError: - err = "Please use the command like this: .pingall MESSAGE" - self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) - pass - - elif command == ".rename": - try: - self.nickname = parameter[0] - self.bot.clientupdate(client_nickname=self.nickname) - except IndexError: - err = "Please use the command like this: .rename NAME" - self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) - pass - - elif command == ".roll": - pass # TODO needs permission for everyone + targemode implementation - - elif command == ".stop" or command == ".quit" or command == ".q": - self.stop(invkr_id) - - elif command == ".test": - cid = self.createChannel("Test") - self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=cid) - - else: - err = f"Unknown Command: [{command}]" - self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) - def printable_clientinfo(self, clid): """ Generate printable clientinfo from clid. @@ -518,17 +353,118 @@ class TSbot: return usrd - def checkgrp(self): + def clid2cldbid(self, clid): """ - Print all assigned groups for every connected client. + Convert a given clid to a cldbid. """ - # clients = self.bot.clientlist() - # clients_cldbid = [client["client_database_id"] for client in clients if client["client_type"] != "1"] - clients = self.bot.clientlist(groups=True) - clients_groups = [client["client_servergroups"] for client in clients if client["client_type"] != "1"] + cluid = self.bot.clientgetuidfromclid(clid=clid)[0]["cluid"] + cldbid = self.bot.clientgetdbidfromuid(cluid=cluid)[0]["cldbid"] - self.pipeOut(f"{clients_groups}") + return cldbid + + def identifytarget(self, target): + """ + Try to find clid from nickname. + """ + + try: + clients = self.bot.clientfind(pattern=target) + clids = [client["clid"] for client in clients] + if len(clids) == 1: + return clids[0] + else: + self.pipeOut(f"Found multiple matching clients using pattern {target}:{clids}") + return + + except ts3.query.TS3QueryError as e: + self.pipeOut(f"No client found using pattern [{target}], " + f"returned error:\n{e}", lvl="ERROR") + return + +# Commands ------------------------------------------------------------------------------------------------------------- + + def lookupcommand(self, msg, invkr_id): + """ + Every message starting with '.' gets passed and evaluated in this function. + Commands in this function are sorted alphabetically. + Parameters in brackets are optional. + + Command Parameter 1 Parameter 2 + --------------------------------------------------------- + .admin / .notifyAdmin + .annoy target message + .follow target + .help / .h + .info clid + .kickall message + .list channel/clients/commands/groups + .mute target + .pingall message + .rename nickname + .roll + .stop / .quit / .q + .test + .ticker (symbol) + .unmute target + """ + + commandstring = msg.split(" ") + command = commandstring[0] + parameter = commandstring[1:] + self.pipeOut(f"command: {command} | parameter: {parameter} | invkr_id: {invkr_id}") + + # ??? passable = {"self": self, "invkr_id": invkr_id, "parameter": parameter} + + if command == ".admin" or command == ".notifyAdmin": + commands.notifyAdmin(self) + + elif command == ".annoy": + commands.annoy(self, invkr_id, parameter) + + elif command == ".follow": + commands.follow(self, invkr_id, parameter) + + elif command == ".help" or command == ".h": + commands.help(self, invkr_id) + + elif command == ".info": + commands.info(self, invkr_id) + + elif command == ".kickall": + self.kickall("test") # TODO + + elif command == ".list": + commands.list(self, invkr_id, parameter) + + elif command == ".mute": + commands.mute(self, invkr_id, parameter) + + elif command == ".pingall": + commands.pingall(self, invkr_id, parameter) + + elif command == ".rename": + commands.rename(self, invkr_id, parameter) + + elif command == ".roll": + pass # TODO needs permission for everyone + targemode implementation + + elif command == ".stop" or command == ".quit" or command == ".q": + commands.quit(self, invkr_id) + + elif command == ".test": + cid = self.createChannel("Test") + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=cid) + + elif command == ".ticker": + commands.ticker(self, parameter) + + elif command == ".unmute": + commands.unmute(self, invkr_id, parameter) + + else: + err = f"Unknown Command: [{command}]" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) # ---------------------------------------------------------------------------------------------------------------------- diff --git a/src/commands.py b/src/commands.py new file mode 100644 index 0000000..f5fbbc3 --- /dev/null +++ b/src/commands.py @@ -0,0 +1,248 @@ +""" +TBD +""" + +__author__ = "Lukas Mahler" +__version__ = "0.0.0" +__date__ = "30.10.2021" +__email__ = "m@hler.eu" +__status__ = "Development" + + +# Default +import sys +import json +import time +import inspect + +# Custom +import ts3 + +# Self +from src import util + + +def annoy(self, invkr_id, parameter): + """ + + """ + + if not parameter: + err = "Please use the command like this: .annoy TARGET MESSAGE" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) + return + + target = parameter[0] + msg = parameter[1] + self.poke(n=10, msg=msg, usr=target) + + +def follow(self, invkr_id, parameter): + """ + TODO sticky folgen + """ + + if not parameter: + err = "Please use the command like this: .follow TARGET" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) + return + + target = parameter[0] + clid = self.identifytarget(target) + cid = self.bot.clientinfo(clid=int(clid))[0]["cid"] + self.bot.clientmove(clid=self.myid, cid=cid) + + +def help(self, invkr_id): + """ + + """ + + # Find all functions in this submodule + cmds = [f[0] for f in inspect.getmembers(sys.modules[__name__], inspect.isfunction)] + msg = f"\n\nList of commands:\n---------------------\n.{f'{chr(10)}.'.join(cmds)}" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg) + + +def info(self, invkr_id): + """ + + """ + + msg = f"\n\nRuntime: {util.getRuntime(self.started)}\n" \ + f"Version: {self.version}" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg) + + +def list(self, invkr_id, parameter): + """ + Message the invoker of the function either a list of channels, clients, commands or server groups. + """ + + if not parameter: + err = "Please use the command like this: .list channel/clients/commands/groups/" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) + return + + what = parameter[0] + mydict = {} + + if what == "channel": + channels = self.bot.channellist() + for channel in channels: + order = channel["channel_order"] + mydict[order] = [channel["channel_name"], channel["cid"]] + + mydict = dict(sorted(mydict.items())) + msg = json.dumps(mydict) + + elif what == "clients": + clients = self.bot.clientlist() + for client in clients: + mydict[client["client_nickname"]] = {"clid": client["clid"], "cldbid": client["client_database_id"]} + mydict = dict(sorted(mydict.items())) + msg = json.dumps(mydict) + + elif what == "commands": + help(self, invkr_id) + return + + elif what == "groups": + groups = self.bot.servergrouplist() + for group in groups: + mydict[group["name"]] = {"sgid": group["sgid"], "sortid": group["sortid"]} + msg = json.dumps(mydict) + + else: + msg = f"The parameter [{what}] is not supported." + + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg) + + +def mute(self, invkr_id, parameter): + """ + Assign the mute group to a user. + """ + + if not parameter: + err = "Please use the command like this: .mute TARGET" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) + return + + target = parameter[0] + clid = self.identifytarget(target) + cldbid = self.clid2cldbid(clid) + + try: + self.bot.servergroupaddclient(sgid=self.sgid_mute, cldbid=cldbid) + except ts3.query.TS3QueryError as e: + err = f"Failed to add to group: [{e}]" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) + return + + +def pingall(self, invkr_id, parameter): + """ + + """ + + if not parameter: + err = "Please use the command like this: .pingall MESSAGE" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) + return + + msg = parameter[0] + self.poke(msg=msg) + + +def notifyAdmin(self): + """ + Message every available admin. + """ + + clients = self.bot.clientlist() + clients = [client for client in clients if client["client_type"] != "1"] + for client in clients: + cldbid = client["client_database_id"] + clid = client["clid"] + if self.isadmin(cldbid): + msg = "" + self.bot.sendtextmessage(targetmode=1, target=clid, msg=msg) + + if not self.whitelisted: + time.sleep(1) + + +def quit(self, invkr_id): + """ + + """ + self.stop(invkr_id) + + +def rename(self, invkr_id, parameter): + """ + Rename the Bot to the given Nickname + """ + + if not parameter: + err = "Please use the command like this: .rename NICKNAME" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) + return + + self.nickname = parameter[0] + self.bot.clientupdate(client_nickname=self.nickname) + + +def ticker(self, parameter): + """ + Refresh the Crypto Ticker channels, + this is very specific to my use, you can disable this in the config. + """ + + if parameter: + symbol = parameter[0].lower() + else: + symbol = "all" + + if symbol in ["btc", "bitcoin", "eth", "ethereum", "all"]: + channelname = f"[cspacerC1]BTC: {self.gecko.getSymbol('bitcoin', decimal=0)} | " \ + f"ETH: {self.gecko.getSymbol('ethereum', decimal=0)}" + try: + self.editChannelname(200, channelname) + except ts3.query.TS3QueryError: + pass + + if symbol in ["dot", "polkadot", "ada", "cardano", "all"]: + channelname = f"[cspacerC2]DOT: {self.gecko.getSymbol('polkadot', decimal=2)} | " \ + f"ADA: {self.gecko.getSymbol('cardano', decimal=2)}" + try: + self.editChannelname(201, channelname) + except ts3.query.TS3QueryError: + pass + + +def unmute(self, invkr_id, parameter): + """ + Remove the mute group to a user. + """ + + if not parameter: + err = "Please use the command like this: .unmute TARGET" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) + return + + target = parameter[0] + clid = self.identifytarget(target) + cldbid = self.clid2cldbid(clid) + + try: + self.bot.servergroupdelclient(sgid=self.sgid_mute, cldbid=cldbid) + except ts3.query.TS3QueryError as e: + err = f"Failed to remove from group: [{e}]" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) + return + + +if __name__ == "__main__": + exit() diff --git a/src/template.toml b/src/template.toml index 9abc392..2200916 100644 --- a/src/template.toml +++ b/src/template.toml @@ -7,9 +7,11 @@ sid = 1 user = "exampleuser" pwd = "password" -[Allowed] -sgids = ["1"] +[Groups] +admins = ["1"] +mute = "0" [Misc] nickname = "myTS3-Bot" -whitelisted = false \ No newline at end of file +whitelisted = false +crypto = false \ No newline at end of file diff --git a/src/util.py b/src/util.py index 4d89e15..83d118d 100644 --- a/src/util.py +++ b/src/util.py @@ -4,7 +4,7 @@ TBD __author__ = "Lukas Mahler" __version__ = "0.0.0" -__date__ = "30.09.2021" +__date__ = "13.10.2021" __email__ = "m@hler.eu" __status__ = "Development" @@ -64,7 +64,7 @@ def getRuntime(started): """ elapsed = time.time() - started - runtime = str(timedelta(seconds=elapsed)) + runtime = str(timedelta(seconds=elapsed)).split(".")[0] # ignore Microseconds return runtime