add logging / add some docstrings

This commit is contained in:
Lukas 2021-09-25 00:49:51 +02:00
parent 305ad6cffb
commit 14fbc7f827
2 changed files with 214 additions and 129 deletions

181
myTS3.py
View File

@ -3,15 +3,15 @@ TBD
""" """
__author__ = "Lukas Mahler" __author__ = "Lukas Mahler"
__version__ = "0.2.0" __version__ = "0.3.0"
__date__ = "23.09.2021" __date__ = "25.09.2021"
__email__ = "m@hler.eu" __email__ = "m@hler.eu"
__status__ = "Development" __status__ = "Development"
# Default # Default
import os import os
import time
import json import json
from time import sleep
from functools import partial from functools import partial
# Custom # Custom
@ -23,11 +23,12 @@ import gecko
import util import util
class MyTeamspeakBot: class TSbot:
def __init__(self, conf): def __init__(self, conf, log):
""" """
Create a new instance, connect to the server via telnet and
start the event loop function.
""" """
self.host = conf["host"] self.host = conf["host"]
@ -38,11 +39,12 @@ class MyTeamspeakBot:
self.nickname = conf["name"] self.nickname = conf["name"]
self.gecko = gecko.GeckoAPI() self.gecko = gecko.GeckoAPI()
self.log = log
self.myid = None self.myid = None
self.running = True self.running = True
self.intro = "<Keep this chat open to use commands>" self.intro = "<Keep this chat open to use commands>"
print(f"* Trying to connect to: {self.host}:{self.port}") self.pipeOut(f"Trying to connect to: {self.host}:{self.port}")
with ts3.query.TS3Connection(self.host, self.port) as self.bot: with ts3.query.TS3Connection(self.host, self.port) as self.bot:
self.bot.login(client_login_name=self.user, client_login_password=self.pwd) self.bot.login(client_login_name=self.user, client_login_password=self.pwd)
@ -51,14 +53,18 @@ class MyTeamspeakBot:
self.bot.clientupdate(client_nickname=self.nickname) self.bot.clientupdate(client_nickname=self.nickname)
except ts3.query.TS3QueryError: except ts3.query.TS3QueryError:
pass pass
print(f"* Successfully connected as: {self.nickname}") self.pipeOut(f"Successfully connected as: {self.nickname}")
# Start the Bot # Start the Bot
self.loop() self.loop()
def loop(self): def loop(self):
""" """
Subscribe to event types, ping bot admins and start the event loop.
Every time an event is triggered, the bot will identify its type and
execute the appropriate function.
The loop can be stopped setting self.running to False
""" """
# Find my client id # Find my client id
@ -66,11 +72,13 @@ class MyTeamspeakBot:
if len(me) == 1: if len(me) == 1:
self.myid = me[0]["clid"] self.myid = me[0]["clid"]
else: else:
raise ValueError("x Can't find my own client id.") self.pipeOut("Can't find my own client id.", lvl="critical")
raise ValueError("Can't find my own client id.")
''' if you want to move the Bot to a certain channel (instead of the defualt channel, you can do: ''' ''' if you want to move the Bot to a certain channel (instead of the defualt channel, you can do: '''
# self.bot.clientmove(clid=self.myid,cid=129) # self.bot.clientmove(clid=self.myid,cid=129)
# ----------- SUBSCRIBE TO EVENTS -------------
# Subscribe to a server movement events # Subscribe to a server movement events
self.bot.servernotifyregister(event="server") self.bot.servernotifyregister(event="server")
@ -86,21 +94,25 @@ class MyTeamspeakBot:
# Subscribe to channel movement events # Subscribe to channel movement events
# self.bot.servernotifyregister(event="channel", id_=0) # self.bot.servernotifyregister(event="channel", id_=0)
"""
# Start the timer for auto-updating crypto channels # Start the timer for auto-updating crypto channels
channelname = f"{'[cspacerBTC]Bitcoin:':<33}" + f"{self.gecko.getSymbol('BTC', decimal=0):>5}" channelname = f"{'[cspacerBTC]Bitcoin:':<33}" + f"{self.gecko.getSymbol('BTC', decimal=0):>5}"
btc_timer = util.maketimer(60, partial(self.editChannelname, 200, channelname)) btc_timer = util.maketimer(60, partial(self.editChannelname, 200, channelname))
btc_timer.start() btc_timer.start()
sleep(10)
channelname = f"{'[cspacerETH]Ethereum:':<30}" + f"{self.gecko.getSymbol('ETH', decimal=0):>5}" channelname = f"{'[cspacerETH]Ethereum:':<30}" + f"{self.gecko.getSymbol('ETH', decimal=0):>5}"
eth_timer = util.maketimer(60, partial(self.editChannelname, 201, channelname)) eth_timer = util.maketimer(60, partial(self.editChannelname, 201, channelname))
eth_timer.start() eth_timer.start()
"""
time.sleep(5) # This can be removed if the Query Client is Whitelisted
# Notify connected admins # Notify connected admins
self.notifyAdmin() self.notifyAdmin()
# ----------- LOOP HERE ------------- # ----------- LOOP HERE -------------
while self.running: while self.running:
# self.bot.send_keepalive() # self.bot.send_keepalive()
print("* Waiting for a new Event...") self.pipeOut(f"Waiting for a new Event...")
self.bot.version() self.bot.version()
try: try:
@ -113,15 +125,21 @@ class MyTeamspeakBot:
pass pass
else: else:
event_type = event.event event_type = event.event
print(f"* Got Event | length={len(event[0])} | {event_type}") self.pipeOut(f"Got Event | length={len(event[0])} | {event_type}")
# Client Connect # Client connected
if event_type == "notifycliententerview": if event_type == "notifycliententerview":
print(f"* Client [{event[0]['client_nickname']}] connected.")
if 'client_nickname' in event[0]:
displayname = event[0]['client_nickname']
else:
displayname = event[0]['clid']
self.pipeOut(f"Client [{displayname}] connected.")
# Check if the connector is a ServerQuery or not # Check if the connector is a ServerQuery or not
if not self.isqueryclient(event[0]["client_unique_identifier"]): if not self.isqueryclient(event[0]["client_unique_identifier"]):
print(f"* {event[0]}") self.pipeOut(f"* {event[0]}", lvl="debug")
# Check if the connector is an Admin # Check if the connector is an Admin
if self.isadmin(event[0]["client_database_id"]): if self.isadmin(event[0]["client_database_id"]):
self.bot.sendtextmessage(targetmode=1, target=event[0]["clid"], msg=self.intro) self.bot.sendtextmessage(targetmode=1, target=event[0]["clid"], msg=self.intro)
@ -132,29 +150,63 @@ class MyTeamspeakBot:
# Client disconnected # Client disconnected
elif event_type == "notifyclientleftview": elif event_type == "notifyclientleftview":
print(f"* Clientid [{event[0]['clid']}] disconnected.")
pass
# Text Message if 'client_nickname' in event[0]:
displayname = event[0]['client_nickname']
else:
displayname = event[0]['clid']
self.pipeOut(f"Clientid [{displayname}] disconnected.")
# New text message
elif event_type == "notifytextmessage": elif event_type == "notifytextmessage":
msg = event[0]["msg"] msg = event[0]["msg"]
invkr = event[0]["invokername"] invkr = event[0]["invokername"]
invkr_id = event[0]["invokerid"] invkr_id = event[0]["invokerid"]
print(f'* From: "{invkr}" | Message: "{msg}"') self.pipeOut(f'Message | From: "{invkr}" | Content: "{msg}"')
if msg.startswith("."):
self.lookupcommand(msg, invkr_id) self.lookupcommand(msg, invkr_id)
# Unknown Event
else: else:
print(f"* Unknown Event: {event.__dict__}") self.pipeOut(f"Unknown Event: {event.__dict__}", lvl="warning")
def pipeOut(self, msg, lvl="info", reprint=True):
"""
All output pipes through this function.
It's possible to print the output to console [set reprint to True]
or run in silent log mode. [set reprint to False]
"""
if lvl.lower() == "debug":
self.log.debug(msg)
elif lvl.lower() == "info":
self.log.info(msg)
elif lvl.lower() == "warn":
self.log.warning(msg)
elif lvl.lower() == "error":
self.log.error(msg)
else:
self.log.critical(msg)
if reprint:
print(f"[{time.strftime('%H:%M:%S')}][{lvl.upper()}] {msg}")
def stop(self, invkr_id): def stop(self, invkr_id):
""" """
This stops the bot instance by invalidating the event loop.
""" """
msg = "I'm out, bye bye!"
msg = "Shutdown, bye bye!"
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg) self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg)
self.pipeOut(msg)
self.running = False self.running = False
def notifyAdmin(self): def notifyAdmin(self):
"""
Ping every available admin
"""
clients = self.bot.clientlist() clients = self.bot.clientlist()
clients = [client for client in clients if client["client_type"] != "1"] clients = [client for client in clients if client["client_type"] != "1"]
for client in clients: for client in clients:
@ -162,11 +214,11 @@ class MyTeamspeakBot:
clid = client["clid"] clid = client["clid"]
if self.isadmin(cldbid): if self.isadmin(cldbid):
self.bot.sendtextmessage(targetmode=1, target=clid, msg=self.intro) self.bot.sendtextmessage(targetmode=1, target=clid, msg=self.intro)
sleep(1) # This can be removed if the Query Client is Whitelisted time.sleep(1) # This can be removed if the Query Client is Whitelisted
def kickall(self, msg): def kickall(self, msg):
""" """
TODO
""" """
clients = self.bot.clientlist() clients = self.bot.clientlist()
@ -178,13 +230,13 @@ class MyTeamspeakBot:
except: except:
pass pass
def poke(self, msg=None, num=10, delay=0.2, usr='all'): def poke(self, msg=None, n=10, delay=0.2, usr='all'):
""" """
ping a single or multiple users for n times including a msg.
""" """
if msg is None: if msg is None:
msg = "-~-~-~-~-~-~-~-~-~-~-~" msg = "-~-~-~-~-~ Placeholder -~-~-~-~-~-~"
# Get the client ids # Get the client ids
if usr == 'all': if usr == 'all':
@ -194,34 +246,30 @@ class MyTeamspeakBot:
clients = self.bot.clientfind(pattern=usr) clients = self.bot.clientfind(pattern=usr)
clients = [client["clid"] for client in clients] clients = [client["clid"] for client in clients]
# Break, if there's no client. # Ping them
if not clients: if len(clients) > 0:
return None
else:
for client in clients:
data = self.printable_clientinfo(client)
print(f"* {data}")
# Nopokeatm
# return
# Poke them
i = 0
while num == -1 or i < num:
for clid in clients: for clid in clients:
print(f"* {clid}") data = self.printable_clientinfo(clid)
self.pipeOut(f"{data}", lvl="debug")
i = 0
while n == -1 or i < n:
for clid in clients:
self.pipeOut(f"{clid}", lvl="debug")
try: try:
self.bot.clientpoke(msg=msg, clid=clid) self.bot.clientpoke(msg=msg, clid=clid)
except: except:
pass pass
sleep(delay) time.sleep(delay)
i += 1 i += 1
return None
def createChannel(self, name, permanent=False): def createChannel(self, name, permanent=False):
""" """
Create a teamspeak channel, the channel is non persistent by default.
Set permanent to True if you want a persistant channel.
""" """
if permanent: if permanent:
new = self.bot.channelcreate(channel_name=name, channel_flag_permanent="1") new = self.bot.channelcreate(channel_name=name, channel_flag_permanent="1")
else: else:
@ -231,15 +279,21 @@ class MyTeamspeakBot:
def editChannelname(self, cid, name): def editChannelname(self, cid, name):
""" """
Using a channel-id you can set the name of the given channel.
This will fail if the channel name is already in use.
""" """
try:
self.bot.channeledit(cid=cid, channel_name=name) self.bot.channeledit(cid=cid, channel_name=name)
sleep(5) except Exception as e:
self.pipeOut(e, lvl="error")
pass
def list(self, what, invkr_id): def list(self, what, invkr_id):
""" """
Message the invoker of the function either a list of channels or a list of clients
""" """
if what == "channel": if what == "channel":
mydict = {} mydict = {}
channels = self.bot.channellist() channels = self.bot.channellist()
@ -258,8 +312,7 @@ class MyTeamspeakBot:
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg) self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg)
@staticmethod def isqueryclient(self, cluid):
def isqueryclient(cluid):
""" """
Check if the given client-uid is a query client Check if the given client-uid is a query client
""" """
@ -268,10 +321,10 @@ class MyTeamspeakBot:
# print(client[0]) # print(client[0])
# if client[0]["client_type"] == "1": # if client[0]["client_type"] == "1":
if cluid == "ServerQuery": if cluid == "ServerQuery":
print("* ISQUERY: True") self.pipeOut(f"[{cluid}] ISQUERY: True")
return True return True
else: else:
print("* ISQUERY: False") self.pipeOut(f"[{cluid}] ISQUERY: False")
return False return False
def isadmin(self, cldbid): def isadmin(self, cldbid):
@ -285,12 +338,12 @@ class MyTeamspeakBot:
# 6 Server Admin/ 13 Operator / 15 Root # 6 Server Admin/ 13 Operator / 15 Root
# if (group["sgid"] == "6") or (group["sgid"] == "13") or (group["sgid"] == "15"): # if (group["sgid"] == "6") or (group["sgid"] == "13") or (group["sgid"] == "15"):
if group["sgid"] == "15": if group["sgid"] == "15":
print("* ISADMIN: True") self.pipeOut(f"[{cldbid}] ISADMIN: True")
return True return True
else: else:
continue continue
print("* ISADMIN: False") self.pipeOut(f"[{cldbid}] ISADMIN: False")
return False return False
def lookupcommand(self, msg, invkr_id): def lookupcommand(self, msg, invkr_id):
@ -298,11 +351,10 @@ class MyTeamspeakBot:
""" """
if msg.startswith("."):
commandstring = msg.split(" ") commandstring = msg.split(" ")
command = commandstring[0] command = commandstring[0]
parameter = commandstring[1:] parameter = commandstring[1:]
print(f"* command: {command} / parameter: {parameter} / invkr_id: {invkr_id}") self.pipeOut(f"command: {command} | parameter: {parameter} | invkr_id: {invkr_id}")
if command == ".annoy": if command == ".annoy":
try: try:
@ -347,7 +399,13 @@ class MyTeamspeakBot:
pass # TODO pass # TODO
elif command == ".rename": elif command == ".rename":
pass # TODO try:
self.nickname = parameter[0]
self.bot.clientupdate(client_nickname=self.nickname)
except IndexError:
err = "Please use the command like this: .rename NAME"
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
pass
elif command == ".stop" or command == ".quit" or command == ".q": elif command == ".stop" or command == ".quit" or command == ".q":
self.stop(invkr_id) self.stop(invkr_id)
@ -365,13 +423,13 @@ class MyTeamspeakBot:
err = f"Unknown Command: [{command}]" err = f"Unknown Command: [{command}]"
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err) self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
def printable_clientinfo(self, client): def printable_clientinfo(self, clid):
""" """
""" """
usrd = {} usrd = {}
info = self.bot.clientinfo(clid=client) info = self.bot.clientinfo(clid=clid)
temp = info._data[0].split() temp = info._data[0].split()
for t1 in temp: for t1 in temp:
t2 = t1.decode("utf-8") t2 = t1.decode("utf-8")
@ -396,20 +454,25 @@ class MyTeamspeakBot:
clients = self.bot.clientlist(groups=True) clients = self.bot.clientlist(groups=True)
clients_groups = [client["client_servergroups"] for client in clients if client["client_type"] != "1"] clients_groups = [client["client_servergroups"] for client in clients if client["client_type"] != "1"]
print(f"* {clients_groups}") self.pipeOut(f"{clients_groups}")
# ---------------------------------------------------------------------------------------------------------------------- # ----------------------------------------------------------------------------------------------------------------------
def main(): def main():
# Start logger
log = util.setupLogger()
# Load Dotenv # Load Dotenv
dotenv_file = dotenv.find_dotenv() dotenv_file = dotenv.find_dotenv()
if not dotenv_file: if not dotenv_file:
log.critical("could not locate .env file")
raise FileNotFoundError("could not locate .env file") raise FileNotFoundError("could not locate .env file")
dotenv.load_dotenv(dotenv_file) dotenv.load_dotenv(dotenv_file)
dotend_keys = dotenv.dotenv_values() dotend_keys = dotenv.dotenv_values()
if not {"_HOST", "_PORT", "_USER", "_PWD", "_SID"} <= dotend_keys.keys(): if not {"_HOST", "_PORT", "_USER", "_PWD", "_SID"} <= dotend_keys.keys():
log.critical("missing keys in your .env file.")
raise ValueError("missing keys in your .env file.") raise ValueError("missing keys in your .env file.")
# Config # Config
@ -421,7 +484,7 @@ def main():
name=os.getenv("_NAME")) name=os.getenv("_NAME"))
# Start the Bot Instance # Start the Bot Instance
abot = MyTeamspeakBot(conf) TSbot(conf, log)
if __name__ == "__main__": if __name__ == "__main__":

26
util.py
View File

@ -3,20 +3,27 @@ TBD
""" """
__author__ = "Lukas Mahler" __author__ = "Lukas Mahler"
__version__ = "0.2.0" __version__ = "0.0.0"
__date__ = "23.09.2021" __date__ = "25.09.2021"
__email__ = "m@hler.eu" __email__ = "m@hler.eu"
__status__ = "Development" __status__ = "Development"
# Default # Default
import threading import threading
import logging
from datetime import date
class MyTimer(threading.Timer): class MyTimer(threading.Timer):
def run(self): def run(self):
print(self.__dict__)
print(f"{self._name} Run once")
while not self.finished.wait(self.interval): while not self.finished.wait(self.interval):
print(f"{self._name} Run x")
self.function(*self.args, **self.kwargs) self.function(*self.args, **self.kwargs)
print(f"{self._name} Ran x")
print(f"{self._name} Run end")
def maketimer(cooldown, func): def maketimer(cooldown, func):
@ -25,5 +32,20 @@ def maketimer(cooldown, func):
return timer return timer
def setupLogger():
"""
"""
log = logging.getLogger()
now = date.today().strftime("%Y-%m-%d")
handler = logging.FileHandler(f"myTS3_{now}_{__version__}.log", encoding="utf-8")
logformat = logging.Formatter("%(asctime)s %(levelname)7s %(message)s", "%d-%m-%Y %H:%M:%S")
handler.setFormatter(logformat)
log.addHandler(handler)
log.setLevel(logging.DEBUG)
return log
if __name__ == "__main__": if __name__ == "__main__":
exit() exit()