323 lines
9.6 KiB
Python
323 lines
9.6 KiB
Python
"""
|
|
TBD
|
|
"""
|
|
|
|
__author__ = "Lukas Mahler"
|
|
__version__ = "0.1.0"
|
|
__date__ = "19.09.2021"
|
|
__email__ = "m@hler.eu"
|
|
__status__ = "Development"
|
|
|
|
# Default
|
|
import os
|
|
from time import sleep
|
|
|
|
# Custom
|
|
import ts3
|
|
import dotenv # python-dotenv
|
|
|
|
|
|
class MyTeamspeakBot:
|
|
|
|
def __init__(self, conf):
|
|
"""
|
|
|
|
"""
|
|
|
|
self.nickname = conf["name"]
|
|
self.myid = None
|
|
self.running = True
|
|
self.intro = "__ Keep this Chat open to Use Admin Commands __"
|
|
|
|
print("Trying to connect to: {0}:{1}".format(conf["host"], conf["port"]))
|
|
|
|
with ts3.query.TS3Connection(conf["host"], conf["port"]) as self.con:
|
|
self.con.login(client_login_name=conf["user"], client_login_password=conf["pwd"])
|
|
self.con.use(sid=conf["sid"])
|
|
try:
|
|
self.con.clientupdate(client_nickname=self.nickname)
|
|
except ts3.query.TS3QueryError:
|
|
pass
|
|
print("Successfully connected as: {0}\n\n".format(self.nickname))
|
|
|
|
# Start the Bot
|
|
self.loop()
|
|
|
|
def loop(self):
|
|
"""
|
|
|
|
"""
|
|
|
|
# Finding myself
|
|
me = self.con.clientfind(pattern=self.nickname)
|
|
meid = [client["clid"] for client in me]
|
|
self.myid = meid[0]
|
|
# info = printable_clientinfo(self.myid)
|
|
|
|
# Positioning myself
|
|
'''ist standardmäßig im default channel (für meine Zwecke Richtig)'''
|
|
# ts3conn.clientmove(clid=selfid,cid=129)
|
|
|
|
# Subscribe myself to channel
|
|
self.con.servernotifyregister(event="server")
|
|
# Subscribe myself to privat chat messages
|
|
self.con.servernotifyregister(event="textprivate")
|
|
# Subscribe myself to channel movement events
|
|
# ts3conn.servernotifyregister(event="channel",id_=0)
|
|
|
|
# Notify every admin of my existance
|
|
self.notifyAdmin()
|
|
|
|
# ----------- LOOP HERE -------------
|
|
while self.running:
|
|
# ts3conn.send_keepalive()
|
|
print("Waiting for a new Event...")
|
|
self.con.version()
|
|
|
|
try:
|
|
# This method blocks, but we must sent the keepalive message at
|
|
# least once in 5 minutes to avoid the sever side idle client
|
|
# disconnect. So we set the timeout parameter simply to 1 minute.
|
|
event = self.con.wait_for_event(timeout=60)
|
|
|
|
except ts3.query.TS3TimeoutError:
|
|
pass
|
|
else:
|
|
print(100 * " " + "\nGot Event | length={0}".format(len(event[0])))
|
|
|
|
if len(event[0]) > 15:
|
|
if event[0]["reasonid"] == "0":
|
|
print("Client '{}' connected.".format(event[0]["client_nickname"]))
|
|
|
|
# Check if the connector is a ServerQuery or not
|
|
if not self.isqueryclient(event[0]["client_unique_identifier"]):
|
|
print(event[0])
|
|
# Check if the connector is an Admin
|
|
if self.isadmin(event[0]["client_database_id"]):
|
|
self.con.sendtextmessage(targetmode=1, target=event[0]["clid"], msg=self.intro)
|
|
else:
|
|
pass
|
|
else:
|
|
pass
|
|
|
|
# Message Event
|
|
elif len(event[0]) == 6:
|
|
msg = event[0]["msg"]
|
|
invkr = event[0]["invokername"]
|
|
print('From: "{1}"\nMessage: "{0}"'.format(msg, invkr))
|
|
self.lookupcommand(msg, invkr)
|
|
|
|
print((100 * " ") + "\n")
|
|
|
|
def stop(self, invkr):
|
|
"""
|
|
|
|
"""
|
|
msg = "I'm out, bye bye!"
|
|
self.con.sendtextmessage(targetmode=1, target=invkr, msg=msg)
|
|
self.running = False
|
|
|
|
def notifyAdmin(self):
|
|
clients = self.con.clientlist()
|
|
clients = [client for client in clients if client["client_type"] != "1"]
|
|
for client in clients:
|
|
cldbid = client["client_database_id"]
|
|
clid = client["clid"]
|
|
if self.isadmin(cldbid):
|
|
self.con.sendtextmessage(targetmode=1, target=clid, msg=self.intro)
|
|
sleep(1)
|
|
|
|
def kickall(self, msg):
|
|
"""
|
|
|
|
"""
|
|
|
|
clients = self.con.clientlist()
|
|
clients = [client["clid"] for client in clients if client["client_type"] != "1"]
|
|
for clid in clients:
|
|
try:
|
|
self.con.clientpoke(msg=msg, clid=clid)
|
|
except:
|
|
pass
|
|
|
|
def poke(self, msg=None, num=10, delay=0.2, usr='all'):
|
|
"""
|
|
|
|
"""
|
|
|
|
if msg is None:
|
|
msg = "-~-~-~-~-~-~-~-~-~-~-~"
|
|
|
|
# Get the client ids
|
|
if usr == 'all':
|
|
clients = self.con.clientlist()
|
|
clients = [client["clid"] for client in clients if client["client_type"] != "1"]
|
|
else:
|
|
clients = self.con.clientfind(pattern=usr)
|
|
clients = [client["clid"] for client in clients]
|
|
|
|
# Break, if there's no client.
|
|
if not clients:
|
|
return None
|
|
else:
|
|
for client in clients:
|
|
data = self.printable_clientinfo(client)
|
|
print(data)
|
|
|
|
# Nopokeatm
|
|
# return
|
|
|
|
# Poke them
|
|
i = 0
|
|
while num == -1 or i < num:
|
|
for clid in clients:
|
|
print(clid)
|
|
try:
|
|
self.con.clientpoke(msg=msg, clid=clid)
|
|
except:
|
|
pass
|
|
sleep(delay)
|
|
i += 1
|
|
return None
|
|
|
|
# def openmsg():
|
|
|
|
# def subscribemsg():
|
|
|
|
@staticmethod
|
|
def isqueryclient(cluid):
|
|
"""
|
|
Check if the given client-uid is a query client
|
|
"""
|
|
|
|
# client = ts3conn.clientlist(uid=uid)
|
|
# print(client[0])
|
|
# if client[0]["client_type"] == "1":
|
|
if cluid == "ServerQuery":
|
|
print("ISQUERY: True")
|
|
return True
|
|
else:
|
|
print("ISQUERY: False")
|
|
return False
|
|
|
|
def isadmin(self, cldbid):
|
|
"""
|
|
Check if the given client-databaseid is an admin
|
|
"""
|
|
|
|
groups = self.con.servergroupsbyclientid(cldbid=cldbid)
|
|
# [print(group["sgid"]) for group in groups]
|
|
for group in groups:
|
|
# 6 Server Admin/ 13 Operator / 15 Root
|
|
# if (group["sgid"] == "6") or (group["sgid"] == "13") or (group["sgid"] == "15"):
|
|
if group["sgid"] == "15":
|
|
print("ISADMIN: True")
|
|
return True
|
|
else:
|
|
continue
|
|
|
|
print("ISADMIN: False")
|
|
return False
|
|
|
|
def lookupcommand(self, msg, invkr):
|
|
"""
|
|
|
|
"""
|
|
|
|
if msg.startswith("!"):
|
|
commandstring = msg.split(" ")
|
|
command = commandstring[0]
|
|
parameter = commandstring[1:]
|
|
print(command)
|
|
print(parameter)
|
|
|
|
if command == "!annoy":
|
|
try:
|
|
target = parameter[0]
|
|
msg = parameter[1]
|
|
self.poke(msg=msg, usr=target)
|
|
except IndexError:
|
|
err = "Please use the command like this: !annoy TARGET MESSAGE"
|
|
self.con.sendtextmessage(targetmode=1, target=invkr, msg=err)
|
|
pass
|
|
|
|
elif command == "!kickall":
|
|
self.kickall("test")
|
|
|
|
elif command == "!stop" or command == "!quit" or command == "!q":
|
|
self.stop(invkr)
|
|
|
|
elif command == "!pingall":
|
|
try:
|
|
msg = parameter[0]
|
|
self.poke(msg=msg)
|
|
except IndexError:
|
|
err = "Please use the command like this: !pingall MESSAGE"
|
|
self.con.sendtextmessage(targetmode=1, target=invkr, msg=err)
|
|
pass
|
|
|
|
else:
|
|
err = "Unknown Command:[{0}]".format(command)
|
|
self.con.sendtextmessage(targetmode=1, target=invkr, msg=err)
|
|
|
|
def printable_clientinfo(self, client):
|
|
"""
|
|
|
|
"""
|
|
|
|
usrd = {}
|
|
info = self.con.clientinfo(clid=client)
|
|
temp = info._data[0].split()
|
|
for t1 in temp:
|
|
t2 = t1.decode("utf-8")
|
|
t3 = t2.split("=")
|
|
try:
|
|
t3[1]
|
|
except Exception:
|
|
t3.append("None")
|
|
pass
|
|
|
|
usrd[t3[0]] = t3[1]
|
|
|
|
return usrd
|
|
|
|
def checkgrp(self):
|
|
"""
|
|
|
|
"""
|
|
|
|
clients = self.con.clientlist()
|
|
clients_cldbid = [client["client_database_id"] for client in clients if client["client_type"] != "1"]
|
|
clients = self.con.clientlist(groups=True)
|
|
clients_groups = [client["client_servergroups"] for client in clients if client["client_type"] != "1"]
|
|
|
|
print(clients_groups)
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------------------------
|
|
|
|
def main():
|
|
# Load Dotenv
|
|
dotenv_file = dotenv.find_dotenv()
|
|
if not dotenv_file:
|
|
raise FileNotFoundError("could not locate .env file")
|
|
dotenv.load_dotenv(dotenv_file)
|
|
dotend_keys = dotenv.dotenv_values()
|
|
if not {"HOST", "PORT", "USER", "PWD", "SID"} <= dotend_keys.keys():
|
|
raise ValueError("missing keys in your .env file.")
|
|
|
|
# Config
|
|
conf = dict(host=os.getenv("HOST"),
|
|
port=os.getenv("PORT"),
|
|
user=os.getenv("USER"),
|
|
pwd=os.getenv("PWD"),
|
|
sid=os.getenv("SID"),
|
|
name=os.getenv("NAME"))
|
|
|
|
# Start the Bot Instance
|
|
abot = MyTeamspeakBot(conf)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|