myTS3/myTS3.py

553 lines
19 KiB
Python

"""
TBD
"""
__author__ = "Lukas Mahler"
__version__ = "0.0.0"
__date__ = "11.10.2021"
__email__ = "m@hler.eu"
__status__ = "Development"
# Default
import sys
import time
import json
import os.path
# Custom
import ts3
# Self
from src import util, gecko
class TSbot:
def __init__(self, conf, log):
"""
Create a new instance, connect to the server via telnet and
start the event loop function.
"""
# Initialize from config
self.host = conf["Connection"]["host"]
self.port = conf["Connection"]["port"]
self.sid = conf["Connection"]["sid"]
self.user = conf["Authentication"]["user"]
self.pwd = conf["Authentication"]["pwd"]
self.allowed_sgids = conf["Allowed"]["sgids"]
self.nickname = conf["Misc"]["nickname"]
self.whitelisted = conf["Misc"]["whitelisted"]
# Initialize self
self.gecko = gecko.GeckoAPI()
self.log = log
self.myid = None
self.running = True
self.started = time.time()
self.crypto_update = self.started - 1800
self.pipeOut(f"Trying to connect to: {self.host}:{self.port}")
# Starting the Connection
with ts3.query.TS3Connection(self.host, self.port) as self.bot:
try:
self.bot.login(client_login_name=self.user, client_login_password=self.pwd)
self.bot.use(sid=self.sid)
except ts3.query.TS3QueryError:
self.pipeOut("Invalid login credentials, please check your '.toml' file.", lvl="CRITICAL")
try:
self.bot.clientupdate(client_nickname=self.nickname)
except ts3.query.TS3QueryError as e:
self.pipeOut(e, lvl="WARNING")
pass
self.pipeOut(f"Successfully connected as: {self.nickname}")
# Start the Bot loop
self.loop()
def loop(self):
"""
Subscribe to event types, ping bot admins and start the event loop.
Every time an event is triggered, the bot will identify its type and
execute the appropriate function.
The loop can be stopped setting self.running to False
"""
# Find the instances client id
me = self.bot.clientfind(pattern=self.nickname)
if len(me) == 1:
self.myid = me[0]["clid"]
else:
self.pipeOut("Can't find my own client id. terminating...", lvl="critical")
exit(1)
''' if you want to move the Bot to a certain channel (instead of the defualt channel, you can do: '''
# self.bot.clientmove(clid=self.myid,cid=129)
# ----------- SUBSCRIBE TO EVENTS -------------
# Subscribe to a server movement events
self.bot.servernotifyregister(event="server")
# Subscribe to server channel messages
self.bot.servernotifyregister(event="textserver")
# Subscribe to channel messages
self.bot.servernotifyregister(event="textchannel")
# Subscribe to privat chat messages
self.bot.servernotifyregister(event="textprivate")
# Subscribe to channel movement events
# self.bot.servernotifyregister(event="channel", id_=0)
if not self.whitelisted:
time.sleep(5)
# Notify connected admins
self.notifyAdmin()
# ----------- LOOP HERE -------------
while self.running:
# self.bot.send_keepalive()
self.pipeOut(f"Waiting for a new Event...")
self.bot.version()
# Auto-update crypto price channels every 30 minutes
if self.crypto_update + 1800 < time.time():
self.crypto_update = time.time()
self.lookupcommand(".btc", self.myid)
self.lookupcommand(".dot", self.myid)
else:
pass
try:
# This method blocks, but we must sent the keepalive message at
# least once in 5 minutes to avoid the sever side idle client
# disconnect. So we set the timeout parameter simply to 1 minute.
event = self.bot.wait_for_event(timeout=60)
except ts3.query.TS3TimeoutError:
pass
# try else!
else:
event_type = event.event
self.pipeOut(f"Got Event | length={len(event[0])} | {event_type}")
# Client connected
if event_type == "notifycliententerview":
if 'client_nickname' in event[0]:
displayname = event[0]['client_nickname']
elif 'clid' in event[0]:
displayname = event[0]['clid']
else:
self.pipeOut(event[0])
# displayname = "Unresolved"
continue # can't resolve no clid
self.pipeOut(f"Client [{displayname}] connected.")
# Check if the connector is a ServerQuery or not
if not self.isqueryclient(event[0]["clid"]):
self.pipeOut(f"* {event[0]}", lvl="debug")
# Check if the connector is an Admin
if self.isadmin(event[0]["client_database_id"]):
msg = "<Keep this chat open to use commands>"
self.bot.sendtextmessage(targetmode=1, target=event[0]["clid"], msg=msg)
else:
pass
else:
pass
# Client disconnected
elif event_type == "notifyclientleftview":
if 'client_nickname' in event[0]:
displayname = event[0]['client_nickname']
elif 'clid' in event[0]:
displayname = event[0]['clid']
else:
self.pipeOut(event[0])
# displayname = "Unresolved"
continue # can't resolve no clid
self.pipeOut(f"Clientid [{displayname}] disconnected.")
# New text message
elif event_type == "notifytextmessage":
msg = event[0]["msg"]
invkr = event[0]["invokername"]
invkr_id = event[0]["invokerid"]
self.pipeOut(f'Message | From: "{invkr}" | Content: "{msg}"')
if msg.startswith("."):
self.lookupcommand(msg, invkr_id)
# Unknown Event
else:
self.pipeOut(f"Unknown Event: {event.__dict__}", lvl="warning")
def pipeOut(self, msg, lvl="INFO", reprint=True):
"""
All output pipes through this function.
It's possible to print the output to console [set reprint to True]
or run in silent log mode. [set reprint to False]
"""
lvl = lvl.upper()
lvln = int(getattr(util.logging, lvl))
self.log.log(lvln, msg)
if reprint:
print(f"[{time.strftime('%H:%M:%S')}]{f'[{lvl}]':10s} {msg}")
if lvl == "CRITICAL":
exit(1)
def stop(self, invkr_id):
"""
This stops the bot instance by invalidating the event loop.
"""
msg = "Shutdown, bye bye!"
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg)
self.pipeOut(msg)
self.running = False
def notifyAdmin(self):
"""
Ping every available admin.
"""
clients = self.bot.clientlist()
clients = [client for client in clients if client["client_type"] != "1"]
for client in clients:
cldbid = client["client_database_id"]
clid = client["clid"]
if self.isadmin(cldbid):
msg = "<Keep this chat open to use commands>"
self.bot.sendtextmessage(targetmode=1, target=clid, msg=msg)
if not self.whitelisted:
time.sleep(1)
def kickall(self, msg):
"""
TODO
"""
clients = self.bot.clientlist()
clients = [client["clid"] for client in clients if client["client_type"] != "1"]
for clid in clients:
self.bot.clientpoke(msg=msg, clid=clid)
# TODO
def poke(self, msg=None, n=10, delay=0.2, usr='all'):
"""
ping a single or multiple users for n times including a msg.
"""
if msg is None:
msg = "-~-~-~-~-~ Placeholder -~-~-~-~-~-~"
# Get the client ids
if usr == 'all':
clients = self.bot.clientlist()
clients = [client["clid"] for client in clients if client["client_type"] != "1"]
else:
try:
clients = self.bot.clientfind(pattern=usr)
clients = [client["clid"] for client in clients]
except ts3.query.TS3QueryError as e:
self.pipeOut(f"Couldnt execute poke, no client found using pattern {usr},"
f"returned error:\n{e}", lvl="ERROR")
return
# Ping them
if len(clients) > 0:
for clid in clients:
data = self.printable_clientinfo(clid)
self.pipeOut(f"{data}", lvl="debug")
i = 0
while n == -1 or i < n:
for clid in clients:
try:
self.bot.clientpoke(msg=msg, clid=clid)
except Exception as e:
self.pipeOut(e, lvl="warning")
pass
time.sleep(delay)
i += 1
def createChannel(self, name, permanent=False):
"""
Create a teamspeak channel, the channel is non persistent by default.
Set permanent to True if you want a persistant channel.
"""
if permanent:
new = self.bot.channelcreate(channel_name=name, channel_flag_permanent="1")
else:
new = self.bot.channelcreate(channel_name=name)
return new[0]["cid"]
def editChannelname(self, cid, name):
"""
Using a channel-id you can set the name of the given channel.
This will fail if the channel name is already in use.
This will fail if the given channel name is > 40 bytes.
"""
# Check name for bytes
n = len(name)
if n > 40:
self.pipeOut(f"Can't change channelname to {name} as [{n}] exceeds the max bytes of 40.")
return
try:
self.bot.channeledit(cid=cid, channel_name=name)
except Exception as e:
self.pipeOut(e, lvl="error")
def list(self, what, invkr_id):
"""
Message the invoker of the function either a list of channels or a list of clients.
"""
mydict = {}
if what == "channel":
channels = self.bot.channellist()
for channel in channels:
order = channel["channel_order"]
mydict[order] = [channel["channel_name"], channel["cid"]]
mydict = dict(sorted(mydict.items()))
msg = json.dumps(mydict)
elif what == "clients":
clients = self.bot.clientlist()
for client in clients:
mydict[client["client_nickname"]] = {"clid": client["clid"], "cldbid": client["client_database_id"]}
mydict = dict(sorted(mydict.items()))
msg = json.dumps(mydict)
else:
msg = None
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg)
def isqueryclient(self, clid):
"""
Check if the given client-uid is a query client.
"""
try:
client = self.bot.clientinfo(clid=clid)
except ts3.query.TS3QueryError as e:
self.pipeOut(f"given clid {clid} returned error:\n{e}", lvl="ERROR")
return True
if client[0]["client_type"] == "1":
self.pipeOut(f"[{clid}] ISQUERY: True")
return True
else:
self.pipeOut(f"[{clid}] ISQUERY: False")
return False
def isadmin(self, cldbid):
"""
Check if the given client-databaseid is an admin.
This is done by resolving the clients groups and check
if any of the groups sgid match a sgid from the sgids from the '.toml' config.
"""
try:
groups = self.bot.servergroupsbyclientid(cldbid=cldbid)
except ts3.query.TS3QueryError as e:
self.pipeOut(e, lvl="ERROR")
return False
self.pipeOut(str(groups.__dict__), lvl="DEBUG") # DEBUG
for group in groups:
if any(sgid == group['sgid'] for sgid in self.allowed_sgids):
self.pipeOut(f"[{cldbid}] ISADMIN: True")
return True
self.pipeOut(f"[{cldbid}] ISADMIN: False")
return False
def lookupcommand(self, msg, invkr_id):
"""
Every message starting with '.' gets passed and evaluated in this function.
Commands in this function are sorted alphabetically.
Command Parameter 1 Parameter 2
---------------------------------------------------------
.admin
.annoy target message
.btc / .eth
.dot / .ada
.follow target
.info clid
.kickall message
.list channel/clients
.pingall message
.rename nickname
.roll
.stop / .quit / .q
.test
"""
commandstring = msg.split(" ")
command = commandstring[0]
parameter = commandstring[1:]
self.pipeOut(f"command: {command} | parameter: {parameter} | invkr_id: {invkr_id}")
if command == ".admin":
self.notifyAdmin()
elif command == ".annoy":
try:
target = parameter[0]
msg = parameter[1]
self.poke(msg=msg, usr=target)
except IndexError:
err = "Please use the command like this: .annoy TARGET MESSAGE"
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
pass
elif command == ".btc" or command == ".eth":
channelname = f"[cspacerC1]BTC: {self.gecko.getSymbol('Bitcoin', decimal=0)} | " \
f"ETH: {self.gecko.getSymbol('ethereum', decimal=0)}"
try:
self.editChannelname(200, channelname)
except ts3.query.TS3QueryError:
pass
elif command == ".dot" or command == ".ada":
channelname = f"[cspacerC2]DOT: {self.gecko.getSymbol('polkadot', decimal=2)} | " \
f"ADA: {self.gecko.getSymbol('cardano', decimal=2)}"
try:
self.editChannelname(201, channelname)
except ts3.query.TS3QueryError:
pass
elif command == ".follow":
if not parameter[0]:
err = "Please use the command like this: .follow TARGET"
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
return
target = parameter[0]
try:
clients = self.bot.clientfind(pattern=target)
clids = [client["clid"] for client in clients]
if len(clids) == 1:
cid = self.bot.clientinfo(clids[0])["cid"]
self.bot.clientmove(clid=self.myid, cid=cid)
else:
self.pipeOut(f"Found multiple matching clients using pattern {target}:{clids}")
return
except ts3.query.TS3QueryError as e:
self.pipeOut(f"No client found using pattern {target},"
f"returned error:\n{e}", lvl="ERROR")
return
elif command == ".info":
# TODO implement more then just the runtime
msg = f"Runtime: {util.getRuntime(self.started)}"
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg)
elif command == ".kickall":
self.kickall("test") # TODO
elif command == ".list":
try:
self.list(parameter[0], invkr_id)
except IndexError:
err = "Please use the command like this: .list channel/clients"
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
pass
elif command == ".pingall":
try:
msg = parameter[0]
self.poke(msg=msg)
except IndexError:
err = "Please use the command like this: .pingall MESSAGE"
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
pass
elif command == ".rename":
try:
self.nickname = parameter[0]
self.bot.clientupdate(client_nickname=self.nickname)
except IndexError:
err = "Please use the command like this: .rename NAME"
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
pass
elif command == ".roll":
pass # TODO needs permission for everyone + targemode implementation
elif command == ".stop" or command == ".quit" or command == ".q":
self.stop(invkr_id)
elif command == ".test":
cid = self.createChannel("Test")
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=cid)
else:
err = f"Unknown Command: [{command}]"
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
def printable_clientinfo(self, clid):
"""
Generate printable clientinfo from clid.
"""
info = self.bot.clientinfo(clid=clid)
usrd = info.__dict__
return usrd
def checkgrp(self):
"""
Print all assigned groups for every connected client.
"""
# clients = self.bot.clientlist()
# clients_cldbid = [client["client_database_id"] for client in clients if client["client_type"] != "1"]
clients = self.bot.clientlist(groups=True)
clients_groups = [client["client_servergroups"] for client in clients if client["client_type"] != "1"]
self.pipeOut(f"{clients_groups}")
# ----------------------------------------------------------------------------------------------------------------------
def main():
# Start logger
logpath = os.path.dirname(os.path.abspath(__file__)) + r"/log"
log = util.setupLogger(logpath)
# Log unhandled exception
sys.excepthook = util.unhandledException
# Load toml config
conf = util.getConf("prod.toml")
# Start the Bot Instance
TSbot(conf, log)
if __name__ == "__main__":
main()