404 lines
12 KiB
Python
404 lines
12 KiB
Python
"""
|
|
TBD
|
|
"""
|
|
|
|
__author__ = "Lukas Mahler"
|
|
__version__ = "0.2.0"
|
|
__date__ = "23.09.2021"
|
|
__email__ = "m@hler.eu"
|
|
__status__ = "Development"
|
|
|
|
# Default
|
|
import os
|
|
import json
|
|
from time import sleep
|
|
|
|
# Custom
|
|
import ts3
|
|
import dotenv # python-dotenv
|
|
|
|
|
|
class MyTeamspeakBot:
|
|
|
|
def __init__(self, conf):
|
|
"""
|
|
|
|
"""
|
|
|
|
self.host = conf["host"]
|
|
self.port = conf["port"]
|
|
self.user = conf["user"]
|
|
self.pwd = conf["pwd"]
|
|
self.sid = conf["sid"]
|
|
self.nickname = conf["name"]
|
|
|
|
self.myid = None
|
|
self.running = True
|
|
self.intro = "<Keep this chat open to use commands>"
|
|
|
|
print(f"* Trying to connect to: {self.host}:{self.port}")
|
|
|
|
with ts3.query.TS3Connection(self.host, self.port) as self.bot:
|
|
self.bot.login(client_login_name=self.user, client_login_password=self.pwd)
|
|
self.bot.use(sid=self.sid)
|
|
try:
|
|
self.bot.clientupdate(client_nickname=self.nickname)
|
|
except ts3.query.TS3QueryError:
|
|
pass
|
|
print(f"* Successfully connected as: {self.nickname}")
|
|
|
|
# Start the Bot
|
|
self.loop()
|
|
|
|
def loop(self):
|
|
"""
|
|
|
|
"""
|
|
|
|
# Find my client id
|
|
me = self.bot.clientfind(pattern=self.nickname)
|
|
if len(me) == 1:
|
|
self.myid = me[0]["clid"]
|
|
else:
|
|
raise ValueError("x Can't find my own client id.")
|
|
|
|
''' if you want to move the Bot to a certain channel (instead of the defualt channel, you can do: '''
|
|
# self.bot.clientmove(clid=self.myid,cid=129)
|
|
|
|
# Subscribe to a server movement events
|
|
self.bot.servernotifyregister(event="server")
|
|
|
|
# Subscribe to privat chat messages
|
|
self.bot.servernotifyregister(event="textprivate")
|
|
|
|
# Subscribe to chat channel messages
|
|
self.bot.servernotifyregister(event="textchannel")
|
|
|
|
# Subscribe to channel movement events
|
|
# self.bot.servernotifyregister(event="channel", id_=0)
|
|
|
|
# Notify connected admins
|
|
sleep(5)
|
|
self.notifyAdmin()
|
|
|
|
# ----------- LOOP HERE -------------
|
|
while self.running:
|
|
# self.bot.send_keepalive()
|
|
print("* Waiting for a new Event...")
|
|
self.bot.version()
|
|
|
|
try:
|
|
# This method blocks, but we must sent the keepalive message at
|
|
# least once in 5 minutes to avoid the sever side idle client
|
|
# disconnect. So we set the timeout parameter simply to 1 minute.
|
|
event = self.bot.wait_for_event(timeout=60)
|
|
|
|
except ts3.query.TS3TimeoutError:
|
|
pass
|
|
else:
|
|
event_type = event.event
|
|
print(f"* Got Event | length={len(event[0])} | {event_type}")
|
|
|
|
# Client Connect
|
|
if event_type == "notifycliententerview":
|
|
print(f"* Client [{event[0]['client_nickname']}] connected.")
|
|
|
|
# Check if the connector is a ServerQuery or not
|
|
if not self.isqueryclient(event[0]["client_unique_identifier"]):
|
|
print(f"* {event[0]}")
|
|
# Check if the connector is an Admin
|
|
if self.isadmin(event[0]["client_database_id"]):
|
|
self.bot.sendtextmessage(targetmode=1, target=event[0]["clid"], msg=self.intro)
|
|
else:
|
|
pass
|
|
else:
|
|
pass
|
|
|
|
# Client disconnected
|
|
elif event_type == "notifyclientleftview":
|
|
print(f"* Clientid [{event[0]['clid']}] disconnected.")
|
|
pass
|
|
|
|
# Text Message
|
|
elif event_type == "notifytextmessage":
|
|
msg = event[0]["msg"]
|
|
invkr = event[0]["invokername"]
|
|
invkr_id = event[0]["invokerid"]
|
|
print(f'* From: "{invkr}" | Message: "{msg}"')
|
|
self.lookupcommand(msg, invkr_id)
|
|
|
|
else:
|
|
print(f"* Unknown Event: {event.__dict__}")
|
|
|
|
def stop(self, invkr_id):
|
|
"""
|
|
|
|
"""
|
|
msg = "I'm out, bye bye!"
|
|
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg)
|
|
self.running = False
|
|
|
|
def notifyAdmin(self):
|
|
clients = self.bot.clientlist()
|
|
clients = [client for client in clients if client["client_type"] != "1"]
|
|
for client in clients:
|
|
cldbid = client["client_database_id"]
|
|
clid = client["clid"]
|
|
if self.isadmin(cldbid):
|
|
self.bot.sendtextmessage(targetmode=1, target=clid, msg=self.intro)
|
|
sleep(1) # This can be removed if the Query Client is Whitelisted
|
|
|
|
def kickall(self, msg):
|
|
"""
|
|
|
|
"""
|
|
|
|
clients = self.bot.clientlist()
|
|
clients = [client["clid"] for client in clients if client["client_type"] != "1"]
|
|
for clid in clients:
|
|
try:
|
|
self.bot.clientpoke(msg=msg, clid=clid)
|
|
# TODO
|
|
except:
|
|
pass
|
|
|
|
def poke(self, msg=None, num=10, delay=0.2, usr='all'):
|
|
"""
|
|
|
|
"""
|
|
|
|
if msg is None:
|
|
msg = "-~-~-~-~-~-~-~-~-~-~-~"
|
|
|
|
# Get the client ids
|
|
if usr == 'all':
|
|
clients = self.bot.clientlist()
|
|
clients = [client["clid"] for client in clients if client["client_type"] != "1"]
|
|
else:
|
|
clients = self.bot.clientfind(pattern=usr)
|
|
clients = [client["clid"] for client in clients]
|
|
|
|
# Break, if there's no client.
|
|
if not clients:
|
|
return None
|
|
else:
|
|
for client in clients:
|
|
data = self.printable_clientinfo(client)
|
|
print(f"* {data}")
|
|
|
|
# Nopokeatm
|
|
# return
|
|
|
|
# Poke them
|
|
i = 0
|
|
while num == -1 or i < num:
|
|
for clid in clients:
|
|
print(f"* {clid}")
|
|
try:
|
|
self.bot.clientpoke(msg=msg, clid=clid)
|
|
except:
|
|
pass
|
|
sleep(delay)
|
|
i += 1
|
|
return None
|
|
|
|
def createChannel(self, name, permanent=False):
|
|
"""
|
|
|
|
"""
|
|
if permanent:
|
|
new = self.bot.channelcreate(channel_name=name, channel_flag_permanent="1")
|
|
else:
|
|
new = self.bot.channelcreate(channel_name=name)
|
|
|
|
return new[0]["cid"]
|
|
|
|
def editChannelname(self, cid, name):
|
|
"""
|
|
|
|
"""
|
|
self.bot.channeledit(cid=cid, channel_name=name)
|
|
|
|
def list(self, what, invkr_id):
|
|
"""
|
|
|
|
"""
|
|
if what == "channel":
|
|
mydict = {}
|
|
channels = self.bot.channellist()
|
|
for channel in channels:
|
|
order = channel["channel_order"]
|
|
mydict[order] = [channel["channel_name"], channel["cid"]]
|
|
|
|
mydict = dict(sorted(mydict.items()))
|
|
msg = json.dumps(mydict)
|
|
|
|
elif what == "clients":
|
|
msg = self.bot.clientlist()
|
|
# TODO
|
|
else:
|
|
msg = None
|
|
|
|
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=msg)
|
|
|
|
@staticmethod
|
|
def isqueryclient(cluid):
|
|
"""
|
|
Check if the given client-uid is a query client
|
|
"""
|
|
|
|
# client = self.bot.clientlist(uid=uid)
|
|
# print(client[0])
|
|
# if client[0]["client_type"] == "1":
|
|
if cluid == "ServerQuery":
|
|
print("* ISQUERY: True")
|
|
return True
|
|
else:
|
|
print("* ISQUERY: False")
|
|
return False
|
|
|
|
def isadmin(self, cldbid):
|
|
"""
|
|
Check if the given client-databaseid is an admin
|
|
"""
|
|
|
|
groups = self.bot.servergroupsbyclientid(cldbid=cldbid)
|
|
# [print(group["sgid"]) for group in groups]
|
|
for group in groups:
|
|
# 6 Server Admin/ 13 Operator / 15 Root
|
|
# if (group["sgid"] == "6") or (group["sgid"] == "13") or (group["sgid"] == "15"):
|
|
if group["sgid"] == "15":
|
|
print("* ISADMIN: True")
|
|
return True
|
|
else:
|
|
continue
|
|
|
|
print("* ISADMIN: False")
|
|
return False
|
|
|
|
def lookupcommand(self, msg, invkr_id):
|
|
"""
|
|
|
|
"""
|
|
|
|
if msg.startswith("!"):
|
|
commandstring = msg.split(" ")
|
|
command = commandstring[0]
|
|
parameter = commandstring[1:]
|
|
print(f"* command: {command} / parameter: {parameter} / invkr_id: {invkr_id}")
|
|
|
|
if command == "!annoy":
|
|
try:
|
|
target = parameter[0]
|
|
msg = parameter[1]
|
|
self.poke(msg=msg, usr=target)
|
|
except IndexError:
|
|
err = "Please use the command like this: !annoy TARGET MESSAGE"
|
|
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
|
|
pass
|
|
|
|
elif command == "!kickall":
|
|
self.kickall("test") # TODO
|
|
|
|
elif command == "!test":
|
|
cid = self.createChannel("Test")
|
|
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=cid)
|
|
|
|
elif command == "!btc":
|
|
self.editChannelname(200, f"[cspacerBTC]Bitcoin: {50000}€") # TODO
|
|
|
|
elif command == "!eth":
|
|
self.editChannelname(201, f"[cspacerETH]Ethereum: {3000}€") # TODO
|
|
|
|
elif command == "!list":
|
|
try:
|
|
self.list(parameter[0], invkr_id)
|
|
except IndexError:
|
|
err = "Please use the command like this: !list channel/clients"
|
|
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
|
|
pass
|
|
|
|
elif command == "!follow":
|
|
pass # TODO
|
|
|
|
elif command == "!rename":
|
|
pass # TODO
|
|
|
|
elif command == "!stop" or command == "!quit" or command == "!q":
|
|
self.stop(invkr_id)
|
|
|
|
elif command == "!pingall":
|
|
try:
|
|
msg = parameter[0]
|
|
self.poke(msg=msg)
|
|
except IndexError:
|
|
err = "Please use the command like this: !pingall MESSAGE"
|
|
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
|
|
pass
|
|
|
|
else:
|
|
err = f"Unknown Command: [{command}]"
|
|
self.bot.sendtextmessage(targetmode=1, target=invkr_id, msg=err)
|
|
|
|
def printable_clientinfo(self, client):
|
|
"""
|
|
|
|
"""
|
|
|
|
usrd = {}
|
|
info = self.bot.clientinfo(clid=client)
|
|
temp = info._data[0].split()
|
|
for t1 in temp:
|
|
t2 = t1.decode("utf-8")
|
|
t3 = t2.split("=")
|
|
try:
|
|
t3[1]
|
|
except Exception:
|
|
t3.append("None")
|
|
pass
|
|
|
|
usrd[t3[0]] = t3[1]
|
|
|
|
return usrd
|
|
|
|
def checkgrp(self):
|
|
"""
|
|
|
|
"""
|
|
|
|
clients = self.bot.clientlist()
|
|
clients_cldbid = [client["client_database_id"] for client in clients if client["client_type"] != "1"]
|
|
clients = self.bot.clientlist(groups=True)
|
|
clients_groups = [client["client_servergroups"] for client in clients if client["client_type"] != "1"]
|
|
|
|
print(f"* {clients_groups}")
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------------------------
|
|
|
|
def main():
|
|
# Load Dotenv
|
|
dotenv_file = dotenv.find_dotenv()
|
|
if not dotenv_file:
|
|
raise FileNotFoundError("could not locate .env file")
|
|
dotenv.load_dotenv(dotenv_file)
|
|
dotend_keys = dotenv.dotenv_values()
|
|
|
|
if not {"_HOST", "_PORT", "_USER", "_PWD", "_SID"} <= dotend_keys.keys():
|
|
raise ValueError("missing keys in your .env file.")
|
|
|
|
# Config
|
|
conf = dict(host=os.getenv("_HOST"),
|
|
port=os.getenv("_PORT"),
|
|
user=os.getenv("_USER"),
|
|
pwd=os.getenv("_PWD"),
|
|
sid=os.getenv("_SID"),
|
|
name=os.getenv("_NAME"))
|
|
|
|
# Start the Bot Instance
|
|
abot = MyTeamspeakBot(conf)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|