""" TBD """ __author__ = "Lukas Mahler" __version__ = "0.3.0" __date__ = "25.09.2021" __email__ = "m@hler.eu" __status__ = "Development" # Default import os import sys import time import json # Custom import ts3 import dotenv # python-dotenv # Self import gecko import util class TSbot: def __init__(self, conf, log): """ Create a new instance, connect to the server via telnet and start the event loop function. """ self.host = conf["host"] self.port = conf["port"] self.user = conf["user"] self.pwd = conf["pwd"] self.sid = conf["sid"] self.nickname = conf["name"] self.gecko = gecko.GeckoAPI() self.log = log self.myid = None self.running = True self.intro = "" self.last_crypto_update = time.time() - 1800 self.pipeOut(f"Trying to connect to: {self.host}:{self.port}") with ts3.query.TS3Connection(self.host, self.port) as self.bot: self.bot.login(client_login_name=self.user, client_login_password=self.pwd) self.bot.use(sid=self.sid) try: self.bot.clientupdate(client_nickname=self.nickname) except ts3.query.TS3QueryError: pass self.pipeOut(f"Successfully connected as: {self.nickname}") # Start the Bot self.loop() def loop(self): """ Subscribe to event types, ping bot admins and start the event loop. Every time an event is triggered, the bot will identify its type and execute the appropriate function. The loop can be stopped setting self.running to False """ # Find my client id me = self.bot.clientfind(pattern=self.nickname) if len(me) == 1: self.myid = me[0]["clid"] else: self.pipeOut("Can't find my own client id.", lvl="critical") raise ValueError("Can't find my own client id.") ''' if you want to move the Bot to a certain channel (instead of the defualt channel, you can do: ''' # self.bot.clientmove(clid=self.myid,cid=129) # ----------- SUBSCRIBE TO EVENTS ------------- # Subscribe to a server movement events self.bot.servernotifyregister(event="server") # Subscribe to server channel messages self.bot.servernotifyregister(event="textserver") # Subscribe to channel messages self.bot.servernotifyregister(event="textchannel") # Subscribe to privat chat messages self.bot.servernotifyregister(event="textprivate") # Subscribe to channel movement events # self.bot.servernotifyregister(event="channel", id_=0) time.sleep(5) # This can be removed if the Query Client is Whitelisted # Notify connected admins self.notifyAdmin() # ----------- LOOP HERE ------------- while self.running: # self.bot.send_keepalive() self.pipeOut(f"Waiting for a new Event...") self.bot.version() # Auto-update crypto price channels every 30 minutes if self.last_crypto_update + 1800 < time.time(): self.last_crypto_update = time.time() self.lookupcommand(".btc", self.myid) self.lookupcommand(".dot", self.myid) try: # This method blocks, but we must sent the keepalive message at # least once in 5 minutes to avoid the sever side idle client # disconnect. So we set the timeout parameter simply to 1 minute. event = self.bot.wait_for_event(timeout=60) except ts3.query.TS3TimeoutError: pass else: event_type = event.event self.pipeOut(f"Got Event | length={len(event[0])} | {event_type}") # Client connected if event_type == "notifycliententerview": if 'client_nickname' in event[0]: displayname = event[0]['client_nickname'] else: displayname = event[0]['clid'] self.pipeOut(f"Client [{displayname}] connected.") # Check if the connector is a ServerQuery or not if not self.isqueryclient(event[0]["client_unique_identifier"]): self.pipeOut(f"* {event[0]}", lvl="debug") # Check if the connector is an Admin if self.isadmin(event[0]["client_database_id"]): self.bot.sendtextmessage(targetmode=1, target=event[0]["clid"], msg=self.intro) else: pass else: pass # Client disconnected elif event_type == "notifyclientleftview": if 'client_nickname' in event[0]: displayname = event[0]['client_nickname'] else: displayname = event[0]['clid'] self.pipeOut(f"Clientid [{displayname}] disconnected.") # New text message elif event_type == "notifytextmessage": msg = event[0]["msg"] invkr = event[0]["invokername"] invkr_id = event[0]["invokerid"] self.pipeOut(f'Message | From: "{invkr}" | Content: "{msg}"') if msg.startswith("."): self.lookupcommand(msg, invkr_id) # Unknown Event else: self.pipeOut(f"Unknown Event: {event.__dict__}", lvl="warning") def pipeOut(self, msg, lvl="info", reprint=True): """ All output pipes through this function. It's possible to print the output to console [set reprint to True] or run in silent log mode. [set reprint to False] """ if lvl.lower() == "debug": self.log.debug(msg) elif lvl.lower() == "info": self.log.info(msg) elif lvl.lower() == "warn": self.log.warning(msg) elif lvl.lower() == "error": self.log.error(msg) else: self.log.critical(msg) if reprint: print(f"[{time.strftime('%H:%M:%S')}][{lvl.upper()}] {msg}") def stop(self, invkr_id): """ This stops the bot instance by invalidating the event loop. """ msg = "Shutdown, bye bye!" self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg) self.pipeOut(msg) self.running = False def notifyAdmin(self): """ Ping every available admin """ clients = self.bot.clientlist() clients = [client for client in clients if client["client_type"] != "1"] for client in clients: cldbid = client["client_database_id"] clid = client["clid"] if self.isadmin(cldbid): self.bot.sendtextmessage(targetmode=1, target=clid, msg=self.intro) time.sleep(1) # This can be removed if the Query Client is Whitelisted def kickall(self, msg): """ TODO """ clients = self.bot.clientlist() clients = [client["clid"] for client in clients if client["client_type"] != "1"] for clid in clients: try: self.bot.clientpoke(msg=msg, clid=clid) # TODO except: pass def poke(self, msg=None, n=10, delay=0.2, usr='all'): """ ping a single or multiple users for n times including a msg. """ if msg is None: msg = "-~-~-~-~-~ Placeholder -~-~-~-~-~-~" # Get the client ids if usr == 'all': clients = self.bot.clientlist() clients = [client["clid"] for client in clients if client["client_type"] != "1"] else: clients = self.bot.clientfind(pattern=usr) clients = [client["clid"] for client in clients] # Ping them if len(clients) > 0: for clid in clients: data = self.printable_clientinfo(clid) self.pipeOut(f"{data}", lvl="debug") i = 0 while n == -1 or i < n: for clid in clients: self.pipeOut(f"{clid}", lvl="debug") try: self.bot.clientpoke(msg=msg, clid=clid) except: pass time.sleep(delay) i += 1 def createChannel(self, name, permanent=False): """ Create a teamspeak channel, the channel is non persistent by default. Set permanent to True if you want a persistant channel. """ if permanent: new = self.bot.channelcreate(channel_name=name, channel_flag_permanent="1") else: new = self.bot.channelcreate(channel_name=name) return new[0]["cid"] def editChannelname(self, cid, name): """ Using a channel-id you can set the name of the given channel. This will fail if the channel name is already in use. This will fail if the given channel name is > 40 bytes. """ # Check name for bytes n = len(name) if n > 40: self.pipeOut(f"Can't change channelname to {name} as [{n}] exceeds the max bytes of 40.") return try: self.bot.channeledit(cid=cid, channel_name=name) except Exception as e: self.pipeOut(e, lvl="error") def list(self, what, invkr_id): """ Message the invoker of the function either a list of channels or a list of clients """ if what == "channel": mydict = {} channels = self.bot.channellist() for channel in channels: order = channel["channel_order"] mydict[order] = [channel["channel_name"], channel["cid"]] mydict = dict(sorted(mydict.items())) msg = json.dumps(mydict) elif what == "clients": msg = self.bot.clientlist() # TODO else: msg = None self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg) def isqueryclient(self, cluid): """ Check if the given client-uid is a query client """ # client = self.bot.clientlist(uid=uid) # print(client[0]) # if client[0]["client_type"] == "1": if cluid == "ServerQuery": self.pipeOut(f"[{cluid}] ISQUERY: True") return True else: self.pipeOut(f"[{cluid}] ISQUERY: False") return False def isadmin(self, cldbid): """ Check if the given client-databaseid is an admin """ groups = self.bot.servergroupsbyclientid(cldbid=cldbid) # [print(group["sgid"]) for group in groups] for group in groups: # 6 Server Admin/ 13 Operator / 15 Root # if (group["sgid"] == "6") or (group["sgid"] == "13") or (group["sgid"] == "15"): if group["sgid"] == "15": self.pipeOut(f"[{cldbid}] ISADMIN: True") return True else: continue self.pipeOut(f"[{cldbid}] ISADMIN: False") return False def lookupcommand(self, msg, invkr_id): """ """ commandstring = msg.split(" ") command = commandstring[0] parameter = commandstring[1:] self.pipeOut(f"command: {command} | parameter: {parameter} | invkr_id: {invkr_id}") if command == ".annoy": try: target = parameter[0] msg = parameter[1] self.poke(msg=msg, usr=target) except IndexError: err = "Please use the command like this: .annoy TARGET MESSAGE" self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) pass elif command == ".kickall": self.kickall("test") # TODO elif command == ".test": cid = self.createChannel("Test") self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=cid) elif command == ".btc" or command == ".eth": channelname = f"[cspacerC1]BTC: {self.gecko.getSymbol('Bitcoin', decimal=0)} | " \ f"ETH: {self.gecko.getSymbol('ethereum', decimal=0)}" try: self.editChannelname(200, channelname) except ts3.query.TS3QueryError: pass elif command == ".dot" or command == ".ada": channelname = f"[cspacerC2]DOT: {self.gecko.getSymbol('polkadot', decimal=2)} | " \ f"ADA: {self.gecko.getSymbol('cardano', decimal=2)}" try: self.editChannelname(201, channelname) except ts3.query.TS3QueryError: pass elif command == ".list": try: self.list(parameter[0], invkr_id) except IndexError: err = "Please use the command like this: .list channel/clients" self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) pass elif command == ".follow": pass # TODO elif command == ".rename": try: self.nickname = parameter[0] self.bot.clientupdate(client_nickname=self.nickname) except IndexError: err = "Please use the command like this: .rename NAME" self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) pass elif command == ".stop" or command == ".quit" or command == ".q": self.stop(invkr_id) elif command == ".pingall": try: msg = parameter[0] self.poke(msg=msg) except IndexError: err = "Please use the command like this: .pingall MESSAGE" self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) pass else: err = f"Unknown Command: [{command}]" self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) def printable_clientinfo(self, clid): """ """ usrd = {} info = self.bot.clientinfo(clid=clid) temp = info._data[0].split() for t1 in temp: t2 = t1.decode("utf-8") t3 = t2.split("=") try: t3[1] except Exception: t3.append("None") pass usrd[t3[0]] = t3[1] return usrd def checkgrp(self): """ """ clients = self.bot.clientlist() clients_cldbid = [client["client_database_id"] for client in clients if client["client_type"] != "1"] clients = self.bot.clientlist(groups=True) clients_groups = [client["client_servergroups"] for client in clients if client["client_type"] != "1"] self.pipeOut(f"{clients_groups}") # ---------------------------------------------------------------------------------------------------------------------- def main(): # Start logger log = util.setupLogger() # Load Dotenv dotenv_file = dotenv.find_dotenv() if not dotenv_file: log.critical("could not locate .env file") raise FileNotFoundError("could not locate .env file") dotenv.load_dotenv(dotenv_file) dotend_keys = dotenv.dotenv_values() if not {"_HOST", "_PORT", "_USER", "_PWD", "_SID"} <= dotend_keys.keys(): log.critical("missing keys in your .env file.") raise ValueError("missing keys in your .env file.") # Config conf = dict(host=os.getenv("_HOST"), port=os.getenv("_PORT"), user=os.getenv("_USER"), pwd=os.getenv("_PWD"), sid=os.getenv("_SID"), name=os.getenv("_NAME")) # Start the Bot Instance TSbot(conf, log) if __name__ == "__main__": main()