510 lines
17 KiB
Python
510 lines
17 KiB
Python
"""
|
|
TBD
|
|
"""
|
|
|
|
__author__ = "Lukas Mahler"
|
|
__version__ = "0.0.0"
|
|
__date__ = "27.09.2021"
|
|
__email__ = "m@hler.eu"
|
|
__status__ = "Development"
|
|
|
|
# Default
|
|
import sys
|
|
import time
|
|
import json
|
|
import os.path
|
|
|
|
# Custom
|
|
import ts3
|
|
|
|
# Self
|
|
from src import util, gecko
|
|
|
|
|
|
class TSbot:
|
|
|
|
def __init__(self, conf, log):
|
|
"""
|
|
Create a new instance, connect to the server via telnet and
|
|
start the event loop function.
|
|
"""
|
|
|
|
self.host = conf["Connection"]["host"]
|
|
self.port = conf["Connection"]["port"]
|
|
self.sid = conf["Connection"]["sid"]
|
|
|
|
self.user = conf["Authentication"]["user"]
|
|
self.pwd = conf["Authentication"]["pwd"]
|
|
|
|
self.nickname = conf["Misc"]["nickname"]
|
|
self.whitelisted = conf["Misc"]["whitelisted"]
|
|
|
|
self.gecko = gecko.GeckoAPI()
|
|
self.log = log
|
|
self.myid = None
|
|
self.running = True
|
|
self.intro = "<Keep this chat open to use commands>"
|
|
self.last_crypto_update = time.time() - 1800
|
|
|
|
self.pipeOut(f"Trying to connect to: {self.host}:{self.port}")
|
|
|
|
try:
|
|
with ts3.query.TS3Connection(self.host, self.port) as self.bot:
|
|
try:
|
|
self.bot.login(client_login_name=self.user, client_login_password=self.pwd)
|
|
self.bot.use(sid=self.sid)
|
|
except ts3.query.TS3QueryError:
|
|
self.pipeOut("Invalid login credentials, please check your '.toml' file.", lvl="CRITICAL")
|
|
try:
|
|
self.bot.clientupdate(client_nickname=self.nickname)
|
|
except ts3.query.TS3QueryError:
|
|
self.pipeOut("Nickname already in use", lvl="WARNING")
|
|
pass
|
|
|
|
self.pipeOut(f"Successfully connected as: {self.nickname}")
|
|
|
|
# Start the Bot
|
|
self.loop()
|
|
|
|
except ts3.query.TS3QueryError as e:
|
|
self.pipeOut(e, lvl="CRITICAL")
|
|
|
|
def loop(self):
|
|
"""
|
|
Subscribe to event types, ping bot admins and start the event loop.
|
|
Every time an event is triggered, the bot will identify its type and
|
|
execute the appropriate function.
|
|
|
|
The loop can be stopped setting self.running to False
|
|
"""
|
|
|
|
# Find my client id
|
|
me = self.bot.clientfind(pattern=self.nickname)
|
|
if len(me) == 1:
|
|
self.myid = me[0]["clid"]
|
|
else:
|
|
self.pipeOut("Can't find my own client id. terminating...", lvl="critical")
|
|
exit(1)
|
|
|
|
''' if you want to move the Bot to a certain channel (instead of the defualt channel, you can do: '''
|
|
# self.bot.clientmove(clid=self.myid,cid=129)
|
|
|
|
# ----------- SUBSCRIBE TO EVENTS -------------
|
|
# Subscribe to a server movement events
|
|
self.bot.servernotifyregister(event="server")
|
|
|
|
# Subscribe to server channel messages
|
|
self.bot.servernotifyregister(event="textserver")
|
|
|
|
# Subscribe to channel messages
|
|
self.bot.servernotifyregister(event="textchannel")
|
|
|
|
# Subscribe to privat chat messages
|
|
self.bot.servernotifyregister(event="textprivate")
|
|
|
|
# Subscribe to channel movement events
|
|
# self.bot.servernotifyregister(event="channel", id_=0)
|
|
|
|
if not self.whitelisted:
|
|
time.sleep(5)
|
|
|
|
# Notify connected admins
|
|
self.notifyAdmin()
|
|
|
|
# ----------- LOOP HERE -------------
|
|
while self.running:
|
|
# self.bot.send_keepalive()
|
|
self.pipeOut(f"Waiting for a new Event...")
|
|
self.bot.version()
|
|
|
|
# Auto-update crypto price channels every 30 minutes
|
|
if self.last_crypto_update + 1800 < time.time():
|
|
self.last_crypto_update = time.time()
|
|
|
|
self.lookupcommand(".btc", self.myid)
|
|
self.lookupcommand(".dot", self.myid)
|
|
|
|
try:
|
|
# This method blocks, but we must sent the keepalive message at
|
|
# least once in 5 minutes to avoid the sever side idle client
|
|
# disconnect. So we set the timeout parameter simply to 1 minute.
|
|
event = self.bot.wait_for_event(timeout=60)
|
|
|
|
except ts3.query.TS3TimeoutError:
|
|
pass
|
|
else:
|
|
event_type = event.event
|
|
self.pipeOut(f"Got Event | length={len(event[0])} | {event_type}")
|
|
|
|
# Client connected
|
|
if event_type == "notifycliententerview":
|
|
|
|
if 'client_nickname' in event[0]:
|
|
displayname = event[0]['client_nickname']
|
|
elif 'clid' in event[0]:
|
|
displayname = event[0]['clid']
|
|
else:
|
|
self.pipeOut(event[0])
|
|
displayname = "Unresolved"
|
|
|
|
self.pipeOut(f"Client [{displayname}] connected.")
|
|
|
|
# Check if the connector is a ServerQuery or not
|
|
if not self.isqueryclient(event[0]["client_unique_identifier"]):
|
|
self.pipeOut(f"* {event[0]}", lvl="debug")
|
|
# Check if the connector is an Admin
|
|
if self.isadmin(event[0]["client_database_id"]):
|
|
self.bot.sendtextmessage(targetmode=1, target=event[0]["clid"], msg=self.intro)
|
|
else:
|
|
pass
|
|
else:
|
|
pass
|
|
|
|
# Client disconnected
|
|
elif event_type == "notifyclientleftview":
|
|
|
|
if 'client_nickname' in event[0]:
|
|
displayname = event[0]['client_nickname']
|
|
else:
|
|
displayname = event[0]['clid']
|
|
|
|
self.pipeOut(f"Clientid [{displayname}] disconnected.")
|
|
|
|
# New text message
|
|
elif event_type == "notifytextmessage":
|
|
msg = event[0]["msg"]
|
|
invkr = event[0]["invokername"]
|
|
invkr_id = event[0]["invokerid"]
|
|
self.pipeOut(f'Message | From: "{invkr}" | Content: "{msg}"')
|
|
if msg.startswith("."):
|
|
self.lookupcommand(msg, invkr_id)
|
|
|
|
# Unknown Event
|
|
else:
|
|
self.pipeOut(f"Unknown Event: {event.__dict__}", lvl="warning")
|
|
|
|
def pipeOut(self, msg, lvl="INFO", reprint=True):
|
|
"""
|
|
All output pipes through this function.
|
|
It's possible to print the output to console [set reprint to True]
|
|
or run in silent log mode. [set reprint to False]
|
|
"""
|
|
|
|
lvl = lvl.upper()
|
|
lvln = int(getattr(util.logging, lvl))
|
|
self.log.log(lvln, msg)
|
|
|
|
if reprint:
|
|
print(f"[{time.strftime('%H:%M:%S')}][{lvl}] {msg}")
|
|
|
|
if lvl == "CRITICAL" or lvl == "ERROR":
|
|
exit(1)
|
|
|
|
def stop(self, invkr_id):
|
|
"""
|
|
This stops the bot instance by invalidating the event loop.
|
|
"""
|
|
|
|
msg = "Shutdown, bye bye!"
|
|
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg)
|
|
self.pipeOut(msg)
|
|
self.running = False
|
|
|
|
def notifyAdmin(self):
|
|
"""
|
|
Ping every available admin.
|
|
"""
|
|
|
|
clients = self.bot.clientlist()
|
|
clients = [client for client in clients if client["client_type"] != "1"]
|
|
for client in clients:
|
|
cldbid = client["client_database_id"]
|
|
clid = client["clid"]
|
|
if self.isadmin(cldbid):
|
|
self.bot.sendtextmessage(targetmode=1, target=clid, msg=self.intro)
|
|
|
|
if not self.whitelisted:
|
|
time.sleep(1)
|
|
|
|
def kickall(self, msg):
|
|
"""
|
|
TODO
|
|
"""
|
|
|
|
clients = self.bot.clientlist()
|
|
clients = [client["clid"] for client in clients if client["client_type"] != "1"]
|
|
for clid in clients:
|
|
try:
|
|
self.bot.clientpoke(msg=msg, clid=clid)
|
|
# TODO
|
|
except:
|
|
pass
|
|
|
|
def poke(self, msg=None, n=10, delay=0.2, usr='all'):
|
|
"""
|
|
ping a single or multiple users for n times including a msg.
|
|
"""
|
|
|
|
if msg is None:
|
|
msg = "-~-~-~-~-~ Placeholder -~-~-~-~-~-~"
|
|
|
|
# Get the client ids
|
|
if usr == 'all':
|
|
clients = self.bot.clientlist()
|
|
clients = [client["clid"] for client in clients if client["client_type"] != "1"]
|
|
else:
|
|
clients = self.bot.clientfind(pattern=usr)
|
|
clients = [client["clid"] for client in clients]
|
|
|
|
# Ping them
|
|
if len(clients) > 0:
|
|
|
|
for clid in clients:
|
|
data = self.printable_clientinfo(clid)
|
|
self.pipeOut(f"{data}", lvl="debug")
|
|
|
|
i = 0
|
|
while n == -1 or i < n:
|
|
for clid in clients:
|
|
try:
|
|
self.bot.clientpoke(msg=msg, clid=clid)
|
|
except Exception as e:
|
|
self.pipeOut(e, lvl="warning")
|
|
pass
|
|
time.sleep(delay)
|
|
i += 1
|
|
|
|
def createChannel(self, name, permanent=False):
|
|
"""
|
|
Create a teamspeak channel, the channel is non persistent by default.
|
|
Set permanent to True if you want a persistant channel.
|
|
"""
|
|
|
|
if permanent:
|
|
new = self.bot.channelcreate(channel_name=name, channel_flag_permanent="1")
|
|
else:
|
|
new = self.bot.channelcreate(channel_name=name)
|
|
|
|
return new[0]["cid"]
|
|
|
|
def editChannelname(self, cid, name):
|
|
"""
|
|
Using a channel-id you can set the name of the given channel.
|
|
This will fail if the channel name is already in use.
|
|
This will fail if the given channel name is > 40 bytes.
|
|
"""
|
|
|
|
# Check name for bytes
|
|
n = len(name)
|
|
if n > 40:
|
|
self.pipeOut(f"Can't change channelname to {name} as [{n}] exceeds the max bytes of 40.")
|
|
return
|
|
|
|
try:
|
|
self.bot.channeledit(cid=cid, channel_name=name)
|
|
except Exception as e:
|
|
self.pipeOut(e, lvl="error")
|
|
|
|
def list(self, what, invkr_id):
|
|
"""
|
|
Message the invoker of the function either a list of channels or a list of clients.
|
|
"""
|
|
|
|
mydict = {}
|
|
if what == "channel":
|
|
channels = self.bot.channellist()
|
|
for channel in channels:
|
|
order = channel["channel_order"]
|
|
mydict[order] = [channel["channel_name"], channel["cid"]]
|
|
|
|
mydict = dict(sorted(mydict.items()))
|
|
msg = json.dumps(mydict)
|
|
|
|
elif what == "clients":
|
|
clients = self.bot.clientlist()
|
|
for client in clients:
|
|
mydict[client["client_nickname"]] = {"clid": client["clid"], "cldbid": client["client_database_id"]}
|
|
mydict = dict(sorted(mydict.items()))
|
|
msg = json.dumps(mydict)
|
|
else:
|
|
msg = None
|
|
|
|
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg)
|
|
|
|
def isqueryclient(self, clid):
|
|
"""
|
|
Check if the given client-uid is a query client.
|
|
"""
|
|
|
|
client = self.bot.clientinfo(clid=clid)
|
|
if client["client_type"] == "1":
|
|
self.pipeOut(f"[{clid}] ISQUERY: True")
|
|
return True
|
|
else:
|
|
self.pipeOut(f"[{clid}] ISQUERY: False")
|
|
return False
|
|
|
|
def isadmin(self, cldbid):
|
|
"""
|
|
Check if the given client-databaseid is an admin.
|
|
"""
|
|
|
|
groups = self.bot.servergroupsbyclientid(cldbid=cldbid)
|
|
# [print(group["sgid"]) for group in groups]
|
|
for group in groups:
|
|
# 6 Server Admin/ 13 Operator / 15 Root
|
|
# if (group["sgid"] == "6") or (group["sgid"] == "13") or (group["sgid"] == "15"):
|
|
if group["sgid"] == "15":
|
|
self.pipeOut(f"[{cldbid}] ISADMIN: True")
|
|
return True
|
|
else:
|
|
continue
|
|
|
|
self.pipeOut(f"[{cldbid}] ISADMIN: False")
|
|
return False
|
|
|
|
def lookupcommand(self, msg, invkr_id):
|
|
"""
|
|
Every message starting with '.' gets passed and evaluated in this function.
|
|
|
|
Command Parameter 1 Parameter 2
|
|
---------------------------------------------------------
|
|
.annoy target message
|
|
.kickall message
|
|
.test
|
|
.btc / .eth
|
|
.dot / .ada
|
|
.info clid
|
|
.list channel/clients
|
|
.follow
|
|
.rename nickname
|
|
.stop / .quit / .q
|
|
.pingall message
|
|
.admin
|
|
|
|
"""
|
|
|
|
commandstring = msg.split(" ")
|
|
command = commandstring[0]
|
|
parameter = commandstring[1:]
|
|
self.pipeOut(f"command: {command} | parameter: {parameter} | invkr_id: {invkr_id}")
|
|
|
|
if command == ".annoy":
|
|
try:
|
|
target = parameter[0]
|
|
msg = parameter[1]
|
|
self.poke(msg=msg, usr=target)
|
|
except IndexError:
|
|
err = "Please use the command like this: .annoy TARGET MESSAGE"
|
|
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
|
|
pass
|
|
|
|
elif command == ".kickall":
|
|
self.kickall("test") # TODO
|
|
|
|
elif command == ".test":
|
|
cid = self.createChannel("Test")
|
|
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=cid)
|
|
|
|
elif command == ".btc" or command == ".eth":
|
|
channelname = f"[cspacerC1]BTC: {self.gecko.getSymbol('Bitcoin', decimal=0)} | " \
|
|
f"ETH: {self.gecko.getSymbol('ethereum', decimal=0)}"
|
|
try:
|
|
self.editChannelname(200, channelname)
|
|
except ts3.query.TS3QueryError:
|
|
pass
|
|
|
|
elif command == ".dot" or command == ".ada":
|
|
channelname = f"[cspacerC2]DOT: {self.gecko.getSymbol('polkadot', decimal=2)} | " \
|
|
f"ADA: {self.gecko.getSymbol('cardano', decimal=2)}"
|
|
try:
|
|
self.editChannelname(201, channelname)
|
|
except ts3.query.TS3QueryError:
|
|
pass
|
|
|
|
elif command == ".info":
|
|
pass # TODO
|
|
|
|
elif command == ".list":
|
|
try:
|
|
self.list(parameter[0], invkr_id)
|
|
except IndexError:
|
|
err = "Please use the command like this: .list channel/clients"
|
|
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
|
|
pass
|
|
|
|
elif command == ".follow":
|
|
pass # TODO
|
|
|
|
elif command == ".rename":
|
|
try:
|
|
self.nickname = parameter[0]
|
|
self.bot.clientupdate(client_nickname=self.nickname)
|
|
except IndexError:
|
|
err = "Please use the command like this: .rename NAME"
|
|
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
|
|
pass
|
|
|
|
elif command == ".stop" or command == ".quit" or command == ".q":
|
|
self.stop(invkr_id)
|
|
|
|
elif command == ".pingall":
|
|
try:
|
|
msg = parameter[0]
|
|
self.poke(msg=msg)
|
|
except IndexError:
|
|
err = "Please use the command like this: .pingall MESSAGE"
|
|
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
|
|
pass
|
|
|
|
elif command == ".admin":
|
|
self.notifyAdmin()
|
|
|
|
else:
|
|
err = f"Unknown Command: [{command}]"
|
|
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
|
|
|
|
def printable_clientinfo(self, clid):
|
|
"""
|
|
Generate printable clientinfo from clid.
|
|
"""
|
|
|
|
usrd = {}
|
|
info = self.bot.clientinfo(clid=clid)
|
|
usrd = info.__dict__
|
|
|
|
return usrd
|
|
|
|
def checkgrp(self):
|
|
"""
|
|
Print all assigned groups for every connected client.
|
|
"""
|
|
|
|
# clients = self.bot.clientlist()
|
|
# clients_cldbid = [client["client_database_id"] for client in clients if client["client_type"] != "1"]
|
|
clients = self.bot.clientlist(groups=True)
|
|
clients_groups = [client["client_servergroups"] for client in clients if client["client_type"] != "1"]
|
|
|
|
self.pipeOut(f"{clients_groups}")
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------------------------
|
|
|
|
def main():
|
|
# Start logger
|
|
logpath = os.path.dirname(os.path.abspath(__file__)) + r"/log"
|
|
log = util.setupLogger(logpath)
|
|
|
|
# Log unhandled exception
|
|
sys.excepthook = util.unhandledException
|
|
|
|
# Load toml config
|
|
conf = util.getconf("prod.toml")
|
|
|
|
# Start the Bot Instance
|
|
TSbot(conf, log)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|