""" TBD """ __author__ = "Lukas Mahler" __version__ = "0.0.0" __date__ = "24.10.2021" __email__ = "m@hler.eu" __status__ = "Development" # Default import sys import time import os.path # Custom import ts3 # Self from src import util, gecko, commands class TSbot: def __init__(self, conf, log): """ Create a new instance, connect to the server via telnet and start the event loop function. """ # Initialize from config self.host = conf["Connection"]["host"] self.port = conf["Connection"]["port"] self.sid = conf["Connection"]["sid"] self.user = conf["Authentication"]["user"] self.pwd = conf["Authentication"]["pwd"] self.sgid_admin = conf["Groups"]["admins"] self.sgid_mute = conf["Groups"]["mute"] self.nickname = conf["Misc"]["nickname"] self.whitelisted = conf["Misc"]["whitelisted"] # Initialize self self.gecko = gecko.GeckoAPI() self.log = log self.myid = None self.running = True self.version = __version__ self.started = time.time() self.crypto_update = self.started - 1800 self.pipeOut(f"Trying to connect to: {self.host}:{self.port}") # Starting the Connection with ts3.query.TS3Connection(self.host, self.port) as self.bot: try: self.bot.login(client_login_name=self.user, client_login_password=self.pwd) self.bot.use(sid=self.sid) except ts3.query.TS3QueryError: self.pipeOut("Invalid login credentials, please check your '.toml' file.", lvl="CRITICAL") try: self.bot.clientupdate(client_nickname=self.nickname) except ts3.query.TS3QueryError as e: self.pipeOut(e, lvl="WARNING") pass self.pipeOut(f"Successfully connected as: {self.nickname}") # Start the Bot loop self.loop() def loop(self): """ Subscribe to event types, ping bot admins and start the event loop. Every time an event is triggered, the bot will identify its type and execute the appropriate function. The loop can be stopped setting self.running to False """ # Find the instances client id me = self.bot.clientfind(pattern=self.nickname) if len(me) == 1: self.myid = me[0]["clid"] else: self.pipeOut("Can't find my own client id. terminating...", lvl="critical") exit(1) ''' if you want to move the Bot to a certain channel (instead of the defualt channel, you can do: ''' # self.bot.clientmove(clid=self.myid,cid=129) # ----------- SUBSCRIBE TO EVENTS ------------- # Subscribe to a server movement events self.bot.servernotifyregister(event="server") # Subscribe to server channel messages self.bot.servernotifyregister(event="textserver") # Subscribe to channel messages self.bot.servernotifyregister(event="textchannel") # Subscribe to privat chat messages self.bot.servernotifyregister(event="textprivate") # Subscribe to channel movement events # self.bot.servernotifyregister(event="channel", id_=0) if not self.whitelisted: time.sleep(5) # Notify connected admins commands.notifyAdmin(self) # ----------- LOOP HERE ------------- while self.running: # self.bot.send_keepalive() self.pipeOut(f"Waiting for a new Event...") self.bot.version() # Auto-update crypto price channels every 30 minutes if self.crypto_update + 1800 < time.time(): self.crypto_update = time.time() self.lookupcommand(".btc", self.myid) self.lookupcommand(".dot", self.myid) else: pass try: # This method blocks, but we must sent the keepalive message at # least once in 5 minutes to avoid the sever side idle client # disconnect. So we set the timeout parameter simply to 1 minute. event = self.bot.wait_for_event(timeout=60) except ts3.query.TS3TimeoutError: pass # try else! else: event_type = event.event self.pipeOut(f"Got Event | length={len(event[0])} | {event_type}") # Client connected if event_type == "notifycliententerview": if 'client_nickname' in event[0]: displayname = event[0]['client_nickname'] elif 'clid' in event[0]: displayname = event[0]['clid'] else: self.pipeOut(event[0]) # displayname = "Unresolved" continue # can't resolve no clid self.pipeOut(f"Client [{displayname}] connected.") # Check if the connector is a ServerQuery or not if not self.isqueryclient(event[0]["clid"]): self.pipeOut(f"* {event[0]}", lvl="debug") # Check if the connector is an Admin if self.isadmin(event[0]["client_database_id"]): msg = "" self.bot.sendtextmessage(targetmode=1, target=event[0]["clid"], msg=msg) else: pass else: pass # Client disconnected elif event_type == "notifyclientleftview": if 'client_nickname' in event[0]: displayname = event[0]['client_nickname'] else: displayname = event[0]['clid'] self.pipeOut(f"Clientid [{displayname}] disconnected.") # New text message elif event_type == "notifytextmessage": msg = event[0]["msg"].replace("\n", " ") invkr = event[0]["invokername"] invkr_id = event[0]["invokerid"] self.pipeOut(f'Message | From: "{invkr}" | Content: "{msg}"') if msg.startswith("."): self.lookupcommand(msg, invkr_id) # Unknown Event else: self.pipeOut(f"Unknown Event: {event.__dict__}", lvl="warning") # Class Utility -------------------------------------------------------------------------------------------------------- def pipeOut(self, msg, lvl="INFO", reprint=True): """ All output pipes through this function. It's possible to print the output to console [set reprint to True] or run in silent log mode. [set reprint to False] """ lvl = lvl.upper() lvln = int(getattr(util.logging, lvl)) self.log.log(lvln, msg) if reprint: print(f"[{time.strftime('%H:%M:%S')}]{f'[{lvl}]':10s} {msg}") if lvl == "CRITICAL": exit(1) def stop(self, invkr_id): """ This stops the bot instance by invalidating the event loop. """ msg = "Shutdown, bye bye!" self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg) self.pipeOut(msg) self.running = False def kickall(self, msg): """ TODO """ clients = self.bot.clientlist() clients = [client["clid"] for client in clients if client["client_type"] != "1"] for clid in clients: self.bot.clientpoke(msg=msg, clid=clid) # TODO def poke(self, msg=None, n=10, delay=0.2, usr='all'): """ ping a single or multiple users for n times including a msg. """ if msg is None: msg = "-~-~-~-~-~ Placeholder -~-~-~-~-~-~" # Get the client ids if usr == 'all': clients = self.bot.clientlist() clients = [client["clid"] for client in clients if client["client_type"] != "1"] else: try: clients = self.bot.clientfind(pattern=usr) clients = [client["clid"] for client in clients] except ts3.query.TS3QueryError as e: self.pipeOut(f"Couldnt execute poke, no client found using pattern {usr}," f"returned error:\n{e}", lvl="ERROR") return # Ping them if len(clients) > 0: for clid in clients: data = self.printable_clientinfo(clid) self.pipeOut(f"{data}", lvl="debug") i = 0 while n == -1 or i < n: for clid in clients: try: self.bot.clientpoke(msg=msg, clid=clid) except Exception as e: self.pipeOut(e, lvl="warning") pass time.sleep(delay) i += 1 def createChannel(self, name, permanent=False): """ Create a teamspeak channel, the channel is non persistent by default. Set permanent to True if you want a persistant channel. """ if permanent: new = self.bot.channelcreate(channel_name=name, channel_flag_permanent="1") else: new = self.bot.channelcreate(channel_name=name) return new[0]["cid"] def editChannelname(self, cid, name): """ Using a channel-id you can set the name of the given channel. This will fail if the channel name is already in use. This will fail if the given channel name is > 40 bytes. """ # Check name for bytes n = len(name) if n > 40: self.pipeOut(f"Can't change channelname to {name} as [{n}] exceeds the max bytes of 40.") return try: self.bot.channeledit(cid=cid, channel_name=name) except Exception as e: self.pipeOut(e, lvl="error") def isqueryclient(self, clid): """ Check if the given client-uid is a query client. """ try: client = self.bot.clientinfo(clid=clid) except ts3.query.TS3QueryError as e: self.pipeOut(f"given clid {clid} returned error:\n{e}", lvl="ERROR") return True try: if client[0]["client_type"] == "1": self.pipeOut(f"[{clid}] ISQUERY: True") return True else: self.pipeOut(f"[{clid}] ISQUERY: False") return False except KeyError as e: self.pipeOut(f"Error [{e}] on client [{client.__dict__}] ") return True # Da wahrscheinlich ein Query Client def isadmin(self, cldbid): """ Check if the given client-databaseid is an admin. This is done by resolving the clients groups and check if any of the groups sgid match a sgid from the sgids from the '.toml' config. """ try: groups = self.bot.servergroupsbyclientid(cldbid=cldbid) except ts3.query.TS3QueryError as e: self.pipeOut(e, lvl="ERROR") return False self.pipeOut(str(groups.__dict__), lvl="DEBUG") # DEBUG for group in groups: if any(sgid == group['sgid'] for sgid in self.sgid_admin): self.pipeOut(f"[{cldbid}] ISADMIN: True") return True self.pipeOut(f"[{cldbid}] ISADMIN: False") return False def printable_clientinfo(self, clid): """ Generate printable clientinfo from clid. """ info = self.bot.clientinfo(clid=clid) usrd = info.__dict__ return usrd def clid2cldbid(self, clid): """ Convert a given clid to a cldbid. """ cluid = self.bot.clientgetuidfromclid(clid=clid)[0]["cluid"] cldbid = self.bot.clientgetdbidfromuid(cluid=cluid)[0]["cldbid"] return cldbid def identifytarget(self, target): """ Try to find clid from nickname. """ try: clients = self.bot.clientfind(pattern=target) clids = [client["clid"] for client in clients] if len(clids) == 1: return clids[0] else: self.pipeOut(f"Found multiple matching clients using pattern {target}:{clids}") return except ts3.query.TS3QueryError as e: self.pipeOut(f"No client found using pattern [{target}], " f"returned error:\n{e}", lvl="ERROR") return # Commands ------------------------------------------------------------------------------------------------------------- def lookupcommand(self, msg, invkr_id): """ Every message starting with '.' gets passed and evaluated in this function. Commands in this function are sorted alphabetically. Command Parameter 1 Parameter 2 --------------------------------------------------------- .admin .annoy target message .btc / .eth .dot / .ada .follow target .help / .h .info clid .kickall message .list channel/clients/groups .mute target .pingall message .rename nickname .roll .stop / .quit / .q .test .unmute target """ commandstring = msg.split(" ") command = commandstring[0] parameter = commandstring[1:] self.pipeOut(f"command: {command} | parameter: {parameter} | invkr_id: {invkr_id}") if command == ".admin": commands.notifyAdmin(self) elif command == ".annoy": try: target = parameter[0] msg = parameter[1] self.poke(msg=msg, usr=target) except IndexError: err = "Please use the command like this: .annoy TARGET MESSAGE" self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) elif command == ".btc" or command == ".eth": channelname = f"[cspacerC1]BTC: {self.gecko.getSymbol('Bitcoin', decimal=0)} | " \ f"ETH: {self.gecko.getSymbol('ethereum', decimal=0)}" try: self.editChannelname(200, channelname) except ts3.query.TS3QueryError: pass elif command == ".dot" or command == ".ada": channelname = f"[cspacerC2]DOT: {self.gecko.getSymbol('polkadot', decimal=2)} | " \ f"ADA: {self.gecko.getSymbol('cardano', decimal=2)}" try: self.editChannelname(201, channelname) except ts3.query.TS3QueryError: pass elif command == ".follow": commands.follow(self, invkr_id, parameter) elif command == ".help" or command == ".h": commands.help(self, invkr_id) elif command == ".info": commands.info(self, invkr_id) elif command == ".kickall": self.kickall("test") # TODO elif command == ".list": try: commands.msglist(self, invkr_id, parameter) except IndexError: err = "Please use the command like this: .list channel/clients/groups" self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) elif command == ".mute": commands.mute(self, invkr_id, parameter) elif command == ".pingall": try: msg = parameter[0] self.poke(msg=msg) except IndexError: err = "Please use the command like this: .pingall MESSAGE" self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) elif command == ".rename": try: self.nickname = parameter[0] self.bot.clientupdate(client_nickname=self.nickname) except IndexError: err = "Please use the command like this: .rename NAME" self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) elif command == ".roll": pass # TODO needs permission for everyone + targemode implementation elif command == ".stop" or command == ".quit" or command == ".q": self.stop(invkr_id) elif command == ".test": cid = self.createChannel("Test") self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=cid) elif command == ".unmute": commands.unmute(self, invkr_id, parameter) else: err = f"Unknown Command: [{command}]" self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) # ---------------------------------------------------------------------------------------------------------------------- def main(): # Start logger logpath = os.path.dirname(os.path.abspath(__file__)) + r"/log" log = util.setupLogger(logpath) # Log unhandled exception sys.excepthook = util.unhandledException # Load toml config conf = util.getConf("prod.toml") # Start the Bot Instance TSbot(conf, log) if __name__ == "__main__": main()