519 lines
18 KiB
Python
519 lines
18 KiB
Python
"""
|
|
TBD
|
|
"""
|
|
|
|
__author__ = "Lukas Mahler"
|
|
__version__ = "1.0.0"
|
|
__date__ = "14.04.2022"
|
|
__email__ = "m@hler.eu"
|
|
__status__ = "Development"
|
|
|
|
# Default
|
|
import sys
|
|
import time
|
|
import os.path
|
|
|
|
# Custom
|
|
import ts3
|
|
|
|
# Self
|
|
from src import util, gecko, commands
|
|
|
|
|
|
class TSbot:
|
|
|
|
def __init__(self, conf, log):
|
|
"""
|
|
Create a new instance, connect to the server via telnet and
|
|
start the event loop function.
|
|
"""
|
|
|
|
# Initialize from config
|
|
self.host = conf["Connection"]["host"]
|
|
self.port = conf["Connection"]["port"]
|
|
self.sid = conf["Connection"]["sid"]
|
|
self.user = conf["Authentication"]["user"]
|
|
self.pwd = conf["Authentication"]["pwd"]
|
|
self.sgid_admin = conf["Groups"]["admins"]
|
|
self.sgid_mute = conf["Groups"]["mute"]
|
|
self.crypto = conf["Misc"]["crypto"]
|
|
self.nickname = conf["Misc"]["nickname"]
|
|
self.whitelisted = conf["Misc"]["whitelisted"]
|
|
|
|
# Initialize self
|
|
self.log = log
|
|
self.myid = None
|
|
self.running = True
|
|
self.version = __version__
|
|
self.started = time.time()
|
|
self.gecko = gecko.GeckoAPI()
|
|
self.crypto_update = self.started - 1800
|
|
|
|
self.pipeOut(f"Trying to connect to: {self.host}:{self.port}")
|
|
|
|
# Starting the Connection
|
|
with ts3.query.TS3Connection(self.host, self.port) as self.bot:
|
|
try:
|
|
self.bot.login(client_login_name=self.user, client_login_password=self.pwd)
|
|
self.bot.use(sid=self.sid)
|
|
except ts3.query.TS3QueryError:
|
|
self.pipeOut("Invalid login credentials, please check your '.toml' file.", lvl="CRITICAL")
|
|
try:
|
|
self.bot.clientupdate(client_nickname=self.nickname)
|
|
except ts3.query.TS3QueryError as e:
|
|
self.pipeOut(e, lvl="WARNING")
|
|
pass
|
|
|
|
self.pipeOut(f"Successfully connected as: {self.nickname}")
|
|
|
|
# Start the Bot loop
|
|
self.loop()
|
|
|
|
def loop(self):
|
|
"""
|
|
Subscribe to event types, ping bot admins and start the event loop.
|
|
Every time an event is triggered, the bot will identify its type and
|
|
execute the appropriate function.
|
|
|
|
The loop can be stopped setting self.running to False
|
|
"""
|
|
|
|
# Find the instances client id
|
|
me = self.bot.clientfind(pattern=self.nickname)
|
|
if len(me) == 1:
|
|
self.myid = me[0]["clid"]
|
|
else:
|
|
self.pipeOut("Can't find my own client id. terminating...", lvl="critical")
|
|
exit(1)
|
|
|
|
''' if you want to move the Bot to a certain channel (instead of the default channel, you can do: '''
|
|
# self.bot.clientmove(clid=self.myid,cid=129)
|
|
|
|
# ----------- SUBSCRIBE TO EVENTS -------------
|
|
# Subscribe to a server movement events
|
|
self.bot.servernotifyregister(event="server")
|
|
|
|
# Subscribe to server channel messages
|
|
self.bot.servernotifyregister(event="textserver")
|
|
|
|
# Subscribe to channel messages
|
|
self.bot.servernotifyregister(event="textchannel")
|
|
|
|
# Subscribe to privat chat messages
|
|
self.bot.servernotifyregister(event="textprivate")
|
|
|
|
# Subscribe to channel movement events
|
|
# self.bot.servernotifyregister(event="channel", id_=0)
|
|
|
|
if not self.whitelisted:
|
|
time.sleep(5)
|
|
|
|
# Notify connected admins
|
|
commands.notifyAdmin(self)
|
|
|
|
# ----------- LOOP HERE -------------
|
|
while self.running:
|
|
# self.bot.send_keepalive()
|
|
self.pipeOut(f"Waiting for a new Event...", lvl="debug")
|
|
self.bot.version()
|
|
|
|
# Auto-update crypto price channels every 30 minutes
|
|
if self.crypto:
|
|
if self.crypto_update + 1800 < time.time():
|
|
self.crypto_update = time.time()
|
|
|
|
commands.ticker(self, "")
|
|
|
|
# Hijacked for WoW
|
|
commands.rwf(self)
|
|
|
|
else:
|
|
pass
|
|
|
|
try:
|
|
# This method blocks, but we must sent the keepalive message at
|
|
# least once in 5 minutes to avoid the sever side idle client
|
|
# disconnect. So we set the timeout parameter simply to 1 minute.
|
|
event = self.bot.wait_for_event(timeout=60)
|
|
|
|
except ts3.query.TS3TimeoutError:
|
|
pass
|
|
|
|
# try else!
|
|
else:
|
|
event_type = event.event
|
|
self.pipeOut(f"Got Event | length={len(event[0])} | {event_type}")
|
|
|
|
# Client connected
|
|
if event_type == "notifycliententerview":
|
|
|
|
if 'client_nickname' in event[0]:
|
|
displayname = event[0]['client_nickname']
|
|
elif 'clid' in event[0]:
|
|
displayname = event[0]['clid']
|
|
else:
|
|
self.pipeOut(event[0])
|
|
# displayname = "Unresolved"
|
|
continue # can't resolve no clid
|
|
|
|
self.pipeOut(f"Client [{displayname}] connected.")
|
|
|
|
# Check if the connector is a ServerQuery or not
|
|
if not self.isqueryclient(event[0]["clid"]):
|
|
self.pipeOut(f"* {event[0]}", lvl="debug")
|
|
# Check if the connector is an Admin
|
|
if self.isadmin(event[0]["client_database_id"]):
|
|
msg = "<Keep this chat open to use commands>"
|
|
self.bot.sendtextmessage(targetmode=1, target=event[0]["clid"], msg=msg)
|
|
else:
|
|
pass
|
|
else:
|
|
pass
|
|
|
|
# Client disconnected
|
|
elif event_type == "notifyclientleftview":
|
|
|
|
if 'client_nickname' in event[0]:
|
|
displayname = event[0]['client_nickname']
|
|
else:
|
|
displayname = event[0]['clid']
|
|
|
|
self.pipeOut(f"Clientid [{displayname}] disconnected.")
|
|
|
|
# New text message
|
|
elif event_type == "notifytextmessage":
|
|
msg = event[0]["msg"].replace("\n", " ")
|
|
invkr_name = event[0]["invokername"]
|
|
invkr_id = event[0]["invokerid"]
|
|
invkr_uid = event[0]["invokeruid"]
|
|
invkr_dbid = self.bot.clientgetdbidfromuid(cluid=invkr_uid)[0]["cldbid"]
|
|
permission_level = 1 if self.isadmin(invkr_dbid) else 0
|
|
|
|
invoker = {'name': invkr_name,
|
|
'id': invkr_id,
|
|
'uid': invkr_uid,
|
|
'dbid': invkr_dbid}
|
|
|
|
self.pipeOut(f'Message | From: "{invkr_name}" | '
|
|
f'Permissionlevel: {permission_level} | Content: "{msg}"')
|
|
|
|
if msg.startswith("."):
|
|
self.lookupcommand(msg, invoker, permission_level)
|
|
|
|
# Unknown Event
|
|
else:
|
|
self.pipeOut(f"Unknown Event: {event.__dict__}", lvl="warning")
|
|
|
|
# Class Utility --------------------------------------------------------------------------------------------------------
|
|
|
|
def pipeOut(self, msg, lvl="INFO", reprint=True):
|
|
"""
|
|
All output pipes through this function.
|
|
It's possible to print the output to console [set reprint to True]
|
|
or run in silent log mode. [set reprint to False]
|
|
"""
|
|
|
|
lvl = lvl.upper()
|
|
lvln = int(getattr(util.logging, lvl))
|
|
self.log.log(lvln, msg)
|
|
|
|
if reprint and lvln >= self.log.getEffectiveLevel():
|
|
print(f"[{time.strftime('%H:%M:%S')}]{f'[{lvl}]':10s} {msg}")
|
|
|
|
if not self.whitelisted:
|
|
if msg == "error id 524: client is flooding":
|
|
time.sleep(30)
|
|
|
|
if lvl == "CRITICAL":
|
|
exit(1)
|
|
|
|
def stop(self, invoker):
|
|
"""
|
|
This stops the bot instance by invalidating the event loop.
|
|
"""
|
|
|
|
msg = "Shutdown, bye bye!"
|
|
self.bot.sendtextmessage(targetmode=1, target=invoker['id'], msg=msg)
|
|
self.pipeOut(msg)
|
|
self.running = False
|
|
|
|
def kickall(self, msg):
|
|
"""
|
|
TODO
|
|
"""
|
|
|
|
clients = self.bot.clientlist()
|
|
clients = [client["clid"] for client in clients if client["client_type"] != "1"]
|
|
for clid in clients:
|
|
self.bot.clientpoke(msg=msg, clid=clid)
|
|
# TODO
|
|
|
|
def poke(self, msg=None, n=1, delay=0.2, usr='all'):
|
|
"""
|
|
ping a single or multiple users for n times including a msg.
|
|
"""
|
|
|
|
if msg is None:
|
|
msg = "-~-~-~-~-~ Placeholder -~-~-~-~-~-~"
|
|
|
|
# Get the client ids
|
|
if usr == 'all':
|
|
clients = self.bot.clientlist()
|
|
clients = [client["clid"] for client in clients if client["client_type"] != "1"]
|
|
else:
|
|
try:
|
|
clients = self.bot.clientfind(pattern=usr)
|
|
clients = [client["clid"] for client in clients]
|
|
except ts3.query.TS3QueryError as e:
|
|
self.pipeOut(f"Couldnt execute poke, no client found using pattern {usr},"
|
|
f"returned error:\n{e}", lvl="ERROR")
|
|
return
|
|
|
|
# Ping them
|
|
if len(clients) > 0:
|
|
|
|
for clid in clients:
|
|
data = self.printable_clientinfo(clid)
|
|
self.pipeOut(f"{data}", lvl="DEBUG")
|
|
|
|
i = 0
|
|
while n == -1 or i < n:
|
|
for clid in clients:
|
|
try:
|
|
self.bot.clientpoke(msg=msg, clid=clid)
|
|
except Exception as e:
|
|
self.pipeOut(e, lvl="WARNING")
|
|
pass
|
|
time.sleep(delay)
|
|
i += 1
|
|
|
|
def createChannel(self, name, permanent=False):
|
|
"""
|
|
Create a teamspeak channel, the channel is non persistent by default.
|
|
Set permanent to True if you want a persistant channel.
|
|
"""
|
|
|
|
if permanent:
|
|
new = self.bot.channelcreate(channel_name=name, channel_flag_permanent="1")
|
|
else:
|
|
new = self.bot.channelcreate(channel_name=name)
|
|
|
|
return new[0]["cid"]
|
|
|
|
def editChannelname(self, cid, name):
|
|
"""
|
|
Using a channel-id you can set the name of the given channel.
|
|
This will fail if the channel name is already in use.
|
|
This will fail if the given channel name is > 40 bytes.
|
|
"""
|
|
|
|
# Check name for bytes
|
|
n = len(name)
|
|
if n > 40:
|
|
self.pipeOut(f"Can't change channelname to {name} as [{n}] exceeds the max bytes of 40.")
|
|
return
|
|
|
|
try:
|
|
self.bot.channeledit(cid=cid, channel_name=name)
|
|
except Exception as e:
|
|
self.pipeOut(e, lvl="error")
|
|
|
|
def isqueryclient(self, clid):
|
|
"""
|
|
Check if the given client-uid is a query client.
|
|
"""
|
|
try:
|
|
client = self.bot.clientinfo(clid=clid)
|
|
except ts3.query.TS3QueryError as e:
|
|
self.pipeOut(f"ISQUERY: given clid {clid} returned: {e}", lvl="WARNING")
|
|
return True
|
|
try:
|
|
if client[0]["client_type"] == "1":
|
|
self.pipeOut(f"[{clid}] ISQUERY: True")
|
|
return True
|
|
else:
|
|
self.pipeOut(f"[{clid}] ISQUERY: False")
|
|
return False
|
|
except KeyError as e:
|
|
self.pipeOut(f"Error [{e}] on client [{client.__dict__}] ")
|
|
return True # Da wahrscheinlich ein Query Client
|
|
|
|
def isadmin(self, cldbid):
|
|
"""
|
|
Check if the given client-databaseid is an admin.
|
|
This is done by resolving the clients groups and check
|
|
if any of the groups sgid match a sgid from the sgids from the '.toml' config.
|
|
"""
|
|
|
|
try:
|
|
groups = self.bot.servergroupsbyclientid(cldbid=cldbid)
|
|
except ts3.query.TS3QueryError as e:
|
|
self.pipeOut(e, lvl="ERROR")
|
|
return False
|
|
|
|
self.pipeOut(str(groups.__dict__), lvl="DEBUG") # DEBUG
|
|
|
|
for group in groups:
|
|
if any(sgid == group['sgid'] for sgid in self.sgid_admin):
|
|
self.pipeOut(f"[{cldbid}] ISADMIN: True")
|
|
return True
|
|
|
|
self.pipeOut(f"[{cldbid}] ISADMIN: False")
|
|
return False
|
|
|
|
def printable_clientinfo(self, clid):
|
|
"""
|
|
Generate printable clientinfo from clid.
|
|
"""
|
|
|
|
info = self.bot.clientinfo(clid=clid)
|
|
usrd = info.__dict__
|
|
|
|
return usrd
|
|
|
|
def clid2cldbid(self, clid):
|
|
"""
|
|
Convert a given clid to a cldbid.
|
|
"""
|
|
|
|
cluid = self.bot.clientgetuidfromclid(clid=clid)[0]["cluid"]
|
|
cldbid = self.bot.clientgetdbidfromuid(cluid=cluid)[0]["cldbid"]
|
|
|
|
return cldbid
|
|
|
|
def identifytarget(self, target):
|
|
"""
|
|
Try to find clid from nickname.
|
|
"""
|
|
|
|
try:
|
|
clients = self.bot.clientfind(pattern=target)
|
|
clids = [client["clid"] for client in clients]
|
|
if len(clids) == 1:
|
|
return clids[0]
|
|
else:
|
|
self.pipeOut(f"Found multiple matching clients using pattern {target}:{clids}")
|
|
return
|
|
|
|
except ts3.query.TS3QueryError as e:
|
|
self.pipeOut(f"No client found using pattern [{target}], "
|
|
f"returned error:\n{e}", lvl="ERROR")
|
|
return
|
|
|
|
# Commands -------------------------------------------------------------------------------------------------------------
|
|
|
|
def lookupcommand(self, msg, invoker, permission_level):
|
|
"""
|
|
Every message starting with '.' gets passed and evaluated in this function.
|
|
Commands in this function are sorted alphabetically.
|
|
Parameters in brackets are optional.
|
|
|
|
Currently only 2 permission levels are implemented [0 = Everyone, 1 = Admins]
|
|
|
|
Command Parameter 1 Parameter 2 Permissionlevel
|
|
--------------------------------------------------------------------------------------
|
|
.admin / .notifyAdmin 0
|
|
.annoy target message 1
|
|
.follow target 1
|
|
.help / .h 0
|
|
.info clid 0
|
|
.kickall message 1
|
|
.list channel/clients/commands/groups 1
|
|
.mute target 1
|
|
.pingall message 1
|
|
.rename nickname 1
|
|
.rmb amount 0
|
|
.roll 0
|
|
.rwf 0
|
|
.stop / .quit / .q 1
|
|
.test 1
|
|
.ticker (symbol) 0
|
|
.unmute target 1
|
|
"""
|
|
|
|
commandstring = msg.split(" ")
|
|
command = commandstring[0]
|
|
parameter = commandstring[1:]
|
|
self.pipeOut(f"command: {command} | parameter: {parameter} | "
|
|
f"from: {invoker['name']} | permissionlevel: {permission_level}")
|
|
|
|
if command == ".admin" or command == ".notifyAdmin":
|
|
commands.notifyAdmin(self)
|
|
|
|
elif command == ".annoy" and permission_level > 0:
|
|
commands.annoy(self, invoker, parameter)
|
|
|
|
elif command == ".follow" and permission_level > 0:
|
|
commands.follow(self, invoker, parameter)
|
|
|
|
elif command == ".help" or command == ".h":
|
|
commands.help(self, invoker)
|
|
|
|
elif command == ".info":
|
|
commands.info(self, invoker)
|
|
|
|
elif command == ".kickall" and permission_level > 0:
|
|
self.kickall("test") # TODO
|
|
|
|
elif command == ".list" and permission_level > 0:
|
|
commands.list(self, invoker, parameter)
|
|
|
|
elif command == ".mute" and permission_level > 0:
|
|
commands.mute(self, invoker, parameter)
|
|
|
|
elif command == ".pingall" and permission_level > 0:
|
|
commands.pingall(self, invoker, parameter)
|
|
|
|
elif command == ".rename" and permission_level > 0:
|
|
commands.rename(self, invoker, parameter)
|
|
|
|
elif command == ".rmb":
|
|
commands.rmb(self, invoker, parameter)
|
|
|
|
elif command == ".roll":
|
|
commands.roll(self, invoker)
|
|
|
|
elif command == ".rwf":
|
|
commands.rwf(self)
|
|
|
|
elif command == ".stop" or command == ".quit" or command == ".q" and permission_level > 0:
|
|
commands.quit(self, invoker)
|
|
|
|
elif command == ".test" and permission_level > 0:
|
|
cid = self.createChannel("Test")
|
|
self.bot.sendtextmessage(targetmode=1, target=invoker['id'], msg=cid)
|
|
|
|
elif command == ".ticker":
|
|
commands.ticker(self, parameter)
|
|
|
|
elif command == ".unmute" and permission_level > 0:
|
|
commands.unmute(self, invoker, parameter)
|
|
|
|
else:
|
|
err = f"Unknown Command: [{command}]"
|
|
self.bot.sendtextmessage(targetmode=1, target=invoker['id'], msg=err)
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------------------------
|
|
|
|
def main():
|
|
# Start logger
|
|
logpath = os.path.dirname(os.path.abspath(__file__)) + r"/log"
|
|
log = util.setupLogger(logpath)
|
|
|
|
# Log unhandled exception
|
|
sys.excepthook = util.unhandledException
|
|
|
|
# Load toml config
|
|
conf = util.getConf("prod.toml")
|
|
|
|
# Change loglevel from config
|
|
util.changeLevel(log, conf['Log']['level'])
|
|
|
|
# Start the Bot Instance
|
|
TSbot(conf, log)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|