diff --git a/myTS3.py b/myTS3.py index cf2adb2..138c85f 100644 --- a/myTS3.py +++ b/myTS3.py @@ -3,15 +3,15 @@ TBD """ __author__ = "Lukas Mahler" -__version__ = "0.2.0" -__date__ = "23.09.2021" +__version__ = "0.3.0" +__date__ = "25.09.2021" __email__ = "m@hler.eu" __status__ = "Development" # Default import os +import time import json -from time import sleep from functools import partial # Custom @@ -23,11 +23,12 @@ import gecko import util -class MyTeamspeakBot: +class TSbot: - def __init__(self, conf): + def __init__(self, conf, log): """ - + Create a new instance, connect to the server via telnet and + start the event loop function. """ self.host = conf["host"] @@ -38,11 +39,12 @@ class MyTeamspeakBot: self.nickname = conf["name"] self.gecko = gecko.GeckoAPI() + self.log = log self.myid = None self.running = True self.intro = "" - print(f"* Trying to connect to: {self.host}:{self.port}") + self.pipeOut(f"Trying to connect to: {self.host}:{self.port}") with ts3.query.TS3Connection(self.host, self.port) as self.bot: self.bot.login(client_login_name=self.user, client_login_password=self.pwd) @@ -51,14 +53,18 @@ class MyTeamspeakBot: self.bot.clientupdate(client_nickname=self.nickname) except ts3.query.TS3QueryError: pass - print(f"* Successfully connected as: {self.nickname}") + self.pipeOut(f"Successfully connected as: {self.nickname}") # Start the Bot self.loop() def loop(self): """ + Subscribe to event types, ping bot admins and start the event loop. + Every time an event is triggered, the bot will identify its type and + execute the appropriate function. + The loop can be stopped setting self.running to False """ # Find my client id @@ -66,11 +72,13 @@ class MyTeamspeakBot: if len(me) == 1: self.myid = me[0]["clid"] else: - raise ValueError("x Can't find my own client id.") + self.pipeOut("Can't find my own client id.", lvl="critical") + raise ValueError("Can't find my own client id.") ''' if you want to move the Bot to a certain channel (instead of the defualt channel, you can do: ''' # self.bot.clientmove(clid=self.myid,cid=129) + # ----------- SUBSCRIBE TO EVENTS ------------- # Subscribe to a server movement events self.bot.servernotifyregister(event="server") @@ -86,21 +94,25 @@ class MyTeamspeakBot: # Subscribe to channel movement events # self.bot.servernotifyregister(event="channel", id_=0) + """ # Start the timer for auto-updating crypto channels channelname = f"{'[cspacerBTC]Bitcoin:':<33}" + f"{self.gecko.getSymbol('BTC', decimal=0):>5}€" btc_timer = util.maketimer(60, partial(self.editChannelname, 200, channelname)) btc_timer.start() + sleep(10) channelname = f"{'[cspacerETH]Ethereum:':<30}" + f"{self.gecko.getSymbol('ETH', decimal=0):>5}€" eth_timer = util.maketimer(60, partial(self.editChannelname, 201, channelname)) eth_timer.start() + """ + time.sleep(5) # This can be removed if the Query Client is Whitelisted # Notify connected admins self.notifyAdmin() # ----------- LOOP HERE ------------- while self.running: # self.bot.send_keepalive() - print("* Waiting for a new Event...") + self.pipeOut(f"Waiting for a new Event...") self.bot.version() try: @@ -113,15 +125,21 @@ class MyTeamspeakBot: pass else: event_type = event.event - print(f"* Got Event | length={len(event[0])} | {event_type}") + self.pipeOut(f"Got Event | length={len(event[0])} | {event_type}") - # Client Connect + # Client connected if event_type == "notifycliententerview": - print(f"* Client [{event[0]['client_nickname']}] connected.") + + if 'client_nickname' in event[0]: + displayname = event[0]['client_nickname'] + else: + displayname = event[0]['clid'] + + self.pipeOut(f"Client [{displayname}] connected.") # Check if the connector is a ServerQuery or not if not self.isqueryclient(event[0]["client_unique_identifier"]): - print(f"* {event[0]}") + self.pipeOut(f"* {event[0]}", lvl="debug") # Check if the connector is an Admin if self.isadmin(event[0]["client_database_id"]): self.bot.sendtextmessage(targetmode=1, target=event[0]["clid"], msg=self.intro) @@ -132,29 +150,63 @@ class MyTeamspeakBot: # Client disconnected elif event_type == "notifyclientleftview": - print(f"* Clientid [{event[0]['clid']}] disconnected.") - pass - # Text Message + if 'client_nickname' in event[0]: + displayname = event[0]['client_nickname'] + else: + displayname = event[0]['clid'] + + self.pipeOut(f"Clientid [{displayname}] disconnected.") + + # New text message elif event_type == "notifytextmessage": msg = event[0]["msg"] invkr = event[0]["invokername"] invkr_id = event[0]["invokerid"] - print(f'* From: "{invkr}" | Message: "{msg}"') - self.lookupcommand(msg, invkr_id) + self.pipeOut(f'Message | From: "{invkr}" | Content: "{msg}"') + if msg.startswith("."): + self.lookupcommand(msg, invkr_id) + # Unknown Event else: - print(f"* Unknown Event: {event.__dict__}") + self.pipeOut(f"Unknown Event: {event.__dict__}", lvl="warning") + + def pipeOut(self, msg, lvl="info", reprint=True): + """ + All output pipes through this function. + It's possible to print the output to console [set reprint to True] + or run in silent log mode. [set reprint to False] + """ + + if lvl.lower() == "debug": + self.log.debug(msg) + elif lvl.lower() == "info": + self.log.info(msg) + elif lvl.lower() == "warn": + self.log.warning(msg) + elif lvl.lower() == "error": + self.log.error(msg) + else: + self.log.critical(msg) + + if reprint: + print(f"[{time.strftime('%H:%M:%S')}][{lvl.upper()}] {msg}") def stop(self, invkr_id): """ - + This stops the bot instance by invalidating the event loop. """ - msg = "I'm out, bye bye!" + + msg = "Shutdown, bye bye!" self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg) + self.pipeOut(msg) self.running = False def notifyAdmin(self): + """ + Ping every available admin + """ + clients = self.bot.clientlist() clients = [client for client in clients if client["client_type"] != "1"] for client in clients: @@ -162,11 +214,11 @@ class MyTeamspeakBot: clid = client["clid"] if self.isadmin(cldbid): self.bot.sendtextmessage(targetmode=1, target=clid, msg=self.intro) - sleep(1) # This can be removed if the Query Client is Whitelisted + time.sleep(1) # This can be removed if the Query Client is Whitelisted def kickall(self, msg): """ - + TODO """ clients = self.bot.clientlist() @@ -178,13 +230,13 @@ class MyTeamspeakBot: except: pass - def poke(self, msg=None, num=10, delay=0.2, usr='all'): + def poke(self, msg=None, n=10, delay=0.2, usr='all'): """ - + ping a single or multiple users for n times including a msg. """ if msg is None: - msg = "-~-~-~-~-~-~-~-~-~-~-~" + msg = "-~-~-~-~-~ Placeholder -~-~-~-~-~-~" # Get the client ids if usr == 'all': @@ -194,34 +246,30 @@ class MyTeamspeakBot: clients = self.bot.clientfind(pattern=usr) clients = [client["clid"] for client in clients] - # Break, if there's no client. - if not clients: - return None - else: - for client in clients: - data = self.printable_clientinfo(client) - print(f"* {data}") + # Ping them + if len(clients) > 0: - # Nopokeatm - # return - - # Poke them - i = 0 - while num == -1 or i < num: for clid in clients: - print(f"* {clid}") - try: - self.bot.clientpoke(msg=msg, clid=clid) - except: - pass - sleep(delay) - i += 1 - return None + data = self.printable_clientinfo(clid) + self.pipeOut(f"{data}", lvl="debug") + + i = 0 + while n == -1 or i < n: + for clid in clients: + self.pipeOut(f"{clid}", lvl="debug") + try: + self.bot.clientpoke(msg=msg, clid=clid) + except: + pass + time.sleep(delay) + i += 1 def createChannel(self, name, permanent=False): """ - + Create a teamspeak channel, the channel is non persistent by default. + Set permanent to True if you want a persistant channel. """ + if permanent: new = self.bot.channelcreate(channel_name=name, channel_flag_permanent="1") else: @@ -231,15 +279,21 @@ class MyTeamspeakBot: def editChannelname(self, cid, name): """ - + Using a channel-id you can set the name of the given channel. + This will fail if the channel name is already in use. """ - self.bot.channeledit(cid=cid, channel_name=name) - sleep(5) + + try: + self.bot.channeledit(cid=cid, channel_name=name) + except Exception as e: + self.pipeOut(e, lvl="error") + pass def list(self, what, invkr_id): """ - + Message the invoker of the function either a list of channels or a list of clients """ + if what == "channel": mydict = {} channels = self.bot.channellist() @@ -258,8 +312,7 @@ class MyTeamspeakBot: self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg) - @staticmethod - def isqueryclient(cluid): + def isqueryclient(self, cluid): """ Check if the given client-uid is a query client """ @@ -268,10 +321,10 @@ class MyTeamspeakBot: # print(client[0]) # if client[0]["client_type"] == "1": if cluid == "ServerQuery": - print("* ISQUERY: True") + self.pipeOut(f"[{cluid}] ISQUERY: True") return True else: - print("* ISQUERY: False") + self.pipeOut(f"[{cluid}] ISQUERY: False") return False def isadmin(self, cldbid): @@ -285,12 +338,12 @@ class MyTeamspeakBot: # 6 Server Admin/ 13 Operator / 15 Root # if (group["sgid"] == "6") or (group["sgid"] == "13") or (group["sgid"] == "15"): if group["sgid"] == "15": - print("* ISADMIN: True") + self.pipeOut(f"[{cldbid}] ISADMIN: True") return True else: continue - print("* ISADMIN: False") + self.pipeOut(f"[{cldbid}] ISADMIN: False") return False def lookupcommand(self, msg, invkr_id): @@ -298,80 +351,85 @@ class MyTeamspeakBot: """ - if msg.startswith("."): - commandstring = msg.split(" ") - command = commandstring[0] - parameter = commandstring[1:] - print(f"* command: {command} / parameter: {parameter} / invkr_id: {invkr_id}") + commandstring = msg.split(" ") + command = commandstring[0] + parameter = commandstring[1:] + self.pipeOut(f"command: {command} | parameter: {parameter} | invkr_id: {invkr_id}") - if command == ".annoy": - try: - target = parameter[0] - msg = parameter[1] - self.poke(msg=msg, usr=target) - except IndexError: - err = "Please use the command like this: .annoy TARGET MESSAGE" - self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) - pass - - elif command == ".kickall": - self.kickall("test") # TODO - - elif command == ".test": - cid = self.createChannel("Test") - self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=cid) - - elif command == ".btc": - channelname = f"{'[cspacerBTC]Bitcoin:':<33}" + f"{self.gecko.getSymbol('BTC', decimal=0):>5}€" - try: - self.editChannelname(200, channelname) - except ts3.query.TS3QueryError: - pass - - elif command == ".eth": - channelname = f"{'[cspacerETH]Ethereum:':<30}" + f"{self.gecko.getSymbol('ETH', decimal=0):>5}€" - try: - self.editChannelname(201, channelname) - except ts3.query.TS3QueryError: - pass - - elif command == ".list": - try: - self.list(parameter[0], invkr_id) - except IndexError: - err = "Please use the command like this: .list channel/clients" - self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) - pass - - elif command == ".follow": - pass # TODO - - elif command == ".rename": - pass # TODO - - elif command == ".stop" or command == ".quit" or command == ".q": - self.stop(invkr_id) - - elif command == ".pingall": - try: - msg = parameter[0] - self.poke(msg=msg) - except IndexError: - err = "Please use the command like this: .pingall MESSAGE" - self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) - pass - - else: - err = f"Unknown Command: [{command}]" + if command == ".annoy": + try: + target = parameter[0] + msg = parameter[1] + self.poke(msg=msg, usr=target) + except IndexError: + err = "Please use the command like this: .annoy TARGET MESSAGE" self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) + pass - def printable_clientinfo(self, client): + elif command == ".kickall": + self.kickall("test") # TODO + + elif command == ".test": + cid = self.createChannel("Test") + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=cid) + + elif command == ".btc": + channelname = f"{'[cspacerBTC]Bitcoin:':<33}" + f"{self.gecko.getSymbol('BTC', decimal=0):>5}€" + try: + self.editChannelname(200, channelname) + except ts3.query.TS3QueryError: + pass + + elif command == ".eth": + channelname = f"{'[cspacerETH]Ethereum:':<30}" + f"{self.gecko.getSymbol('ETH', decimal=0):>5}€" + try: + self.editChannelname(201, channelname) + except ts3.query.TS3QueryError: + pass + + elif command == ".list": + try: + self.list(parameter[0], invkr_id) + except IndexError: + err = "Please use the command like this: .list channel/clients" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) + pass + + elif command == ".follow": + pass # TODO + + elif command == ".rename": + try: + self.nickname = parameter[0] + self.bot.clientupdate(client_nickname=self.nickname) + except IndexError: + err = "Please use the command like this: .rename NAME" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) + pass + + elif command == ".stop" or command == ".quit" or command == ".q": + self.stop(invkr_id) + + elif command == ".pingall": + try: + msg = parameter[0] + self.poke(msg=msg) + except IndexError: + err = "Please use the command like this: .pingall MESSAGE" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) + pass + + else: + err = f"Unknown Command: [{command}]" + self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) + + def printable_clientinfo(self, clid): """ """ usrd = {} - info = self.bot.clientinfo(clid=client) + info = self.bot.clientinfo(clid=clid) temp = info._data[0].split() for t1 in temp: t2 = t1.decode("utf-8") @@ -396,20 +454,25 @@ class MyTeamspeakBot: clients = self.bot.clientlist(groups=True) clients_groups = [client["client_servergroups"] for client in clients if client["client_type"] != "1"] - print(f"* {clients_groups}") + self.pipeOut(f"{clients_groups}") # ---------------------------------------------------------------------------------------------------------------------- def main(): + # Start logger + log = util.setupLogger() + # Load Dotenv dotenv_file = dotenv.find_dotenv() if not dotenv_file: + log.critical("could not locate .env file") raise FileNotFoundError("could not locate .env file") dotenv.load_dotenv(dotenv_file) dotend_keys = dotenv.dotenv_values() if not {"_HOST", "_PORT", "_USER", "_PWD", "_SID"} <= dotend_keys.keys(): + log.critical("missing keys in your .env file.") raise ValueError("missing keys in your .env file.") # Config @@ -421,7 +484,7 @@ def main(): name=os.getenv("_NAME")) # Start the Bot Instance - abot = MyTeamspeakBot(conf) + TSbot(conf, log) if __name__ == "__main__": diff --git a/util.py b/util.py index c0877b3..8390066 100644 --- a/util.py +++ b/util.py @@ -3,20 +3,27 @@ TBD """ __author__ = "Lukas Mahler" -__version__ = "0.2.0" -__date__ = "23.09.2021" +__version__ = "0.0.0" +__date__ = "25.09.2021" __email__ = "m@hler.eu" __status__ = "Development" # Default import threading +import logging +from datetime import date class MyTimer(threading.Timer): def run(self): + print(self.__dict__) + print(f"{self._name} Run once") while not self.finished.wait(self.interval): + print(f"{self._name} Run x") self.function(*self.args, **self.kwargs) + print(f"{self._name} Ran x") + print(f"{self._name} Run end") def maketimer(cooldown, func): @@ -25,5 +32,20 @@ def maketimer(cooldown, func): return timer +def setupLogger(): + """ + + """ + log = logging.getLogger() + now = date.today().strftime("%Y-%m-%d") + handler = logging.FileHandler(f"myTS3_{now}_{__version__}.log", encoding="utf-8") + logformat = logging.Formatter("%(asctime)s %(levelname)7s %(message)s", "%d-%m-%Y %H:%M:%S") + handler.setFormatter(logformat) + log.addHandler(handler) + log.setLevel(logging.DEBUG) + + return log + + if __name__ == "__main__": exit()