initial commit

This commit is contained in:
Von Random 2023-10-31 01:45:36 +02:00
parent 1307dbcbee
commit 4302290482
8 changed files with 481 additions and 0 deletions

View file

@ -1,2 +1,3 @@
# pgbot
Use `config.yml` to set it up. It needs a list of regex with tokens and a database to match them. This doc is probably going to be abandoned right away, but at least I have added this line I dunno.

62
pgbot Executable file
View file

@ -0,0 +1,62 @@
#!/usr/bin/env python3
import sys
import threading
import telethon
import yaml
import pgbotlib.dbstuff
import pgbotlib.commands
import pgbotlib.misc
import pgbotlib.response
import pgbotlib.sched
def init(args: list) -> tuple:
conf_path = args[0] if args else 'config.yml'
try:
with open(conf_path, 'r', encoding='utf-8') as data:
config = yaml.safe_load(data.read())
except FileNotFoundError as err:
sys.exit(err)
client = telethon.TelegramClient(
'bot_session', config['api_id'],
config['api_hash']).start(bot_token=config['bot_token'])
# db_conn = pgbotlib.dbstuff.DBConn(
# f'dbname={config['db_name']} user={config['db_user']}')
db_conn = pgbotlib.dbstuff.DBConn(config['db_spec'])
return config, db_conn, client
def main():
config, db_conn, client = init(sys.argv[1:])
responder = pgbotlib.response.Responder(config, client, db_conn)
commander = pgbotlib.commands.Commander(config, client, config['admins'],
db_conn, responder)
sched_thread = threading.Thread(
target=pgbotlib.sched.spawn_scheduler,
args=(config, client, responder),
daemon=True)
sched_thread.start()
@client.on(telethon.events.NewMessage())
async def handle_new_message(event):
chat = await event.get_chat()
result = await client.get_messages(chat.id, ids=[event.message.reply_to.reply_to_msg_id])
print(result)
if event.message.text.startswith('/'):
await commander.action(event)
else:
await responder.respond(event)
client.run_until_disconnected()
if __name__ == '__main__':
main()

118
pgbotlib/api.py Normal file
View file

@ -0,0 +1,118 @@
""" Some functions for api calls """
import json
import random
import re
import requests
import bs4
import fake_headers
import pgbotlib.dbstuff
class ApiWrapper:
FAILED = 'я обосрался :<'
GIF_REGEX = {
'part': re.compile(r'(?<=\<center\>).*(?=\<\/center\>)'),
'gif': re.compile(r'(?<=src=").*(?="\s)')
}
SEARCH_TOKENS = ['botname', '!find']
def __init__(self, tokens: dict, db_conn: pgbotlib.dbstuff.DBConn) -> None:
self.tokens = tokens
self.db_conn = db_conn
self.nonw = re.compile(r'\W')
self.headers = fake_headers.Headers(headers=True)
# this is the entry point for the api calls
# if you add another api, make sure there is a match here
def call(self, api: str, data: str | None, message: str) -> str:
match api:
case 'img_url': return self.format_img(data)
case 'gif': return self.get_gif()
case 'kmp': return self.get_kmp()
case 'fga': return self.get_fga()
case 'fakenews': return self.get_fakenews()
case 'anek': return self.get_anek()
case 'y_search': return self.y_search(message)
case _: return self.FAILED
def __sanitize_search(self, message: str) -> str:
"""Removes one of each of the search tokens from the query
so that "bot find" phrase does not poison the search query
It's not guaranteed it will delete the first match though,
and I see no point in implementing that"""
keywords = self.nonw.sub(' ', message)
for token_spec in self.tokens:
if token_spec[0] not in self.SEARCH_TOKENS:
continue
for regex in token_spec[1]:
sub_spec = regex.subn('', keywords, count=1)
if sub_spec[1]:
keywords = sub_spec[0]
break
return keywords
def y_search(self, message: str) -> str:
"""Pretty much copy & paste from the original bot
I have no fucking clue how this black magic works"""
query = self.__sanitize_search(message)
request = requests.get('https://yandex.ru/images/search',
timeout=30,
params={'text': query,
'nomisspell': 1,
'noreask': 1,
'isize': 'medium'},
headers=self.headers.generate())
parser = bs4.BeautifulSoup(request.text, 'html.parser')
items_place = parser.find('div', {'class': 'serp-list'})
items = items_place.find_all('div', {'class': 'serp-item'})
images = []
for item in items:
data = json.loads(item.get('data-bem'))
images.append(data['serp-item']['img_href'])
if not images:
return None
result = random.choice(images)
return f'[url]({result})'
def get_gif(self) -> str:
resp = requests.get("http://xdgif.ru/random/", timeout=30)
part = self.GIF_REGEX['part'].search(resp.text).group(0)
gif = self.GIF_REGEX['gif'].search(part).group(0)
return gif
@staticmethod
def get_kmp() -> str:
request = requests.get("https://killpls.me/random/", timeout=30)
parser = bs4.BeautifulSoup(request.text, features="html.parser")
result = parser.find("div", attrs={
"style": "margin:0.5em 0;line-height:1.785em"})
return result.text.strip()
@staticmethod
def get_fga() -> str:
request = requests.get("http://fucking-great-advice.ru/api/random",
timeout=30)
return json.loads(request.text)["text"]
@staticmethod
def get_fakenews() -> str:
request = requests.get("http://news.olegmakarenko.ru/news", timeout=30)
parser = bs4.BeautifulSoup(request.text, features="html.parser")
news = [item.text.strip() for item in parser.find_all(
"span", attrs={"class": "headlinetext"})]
return random.choice(news)
@staticmethod
def get_anek() -> str:
request = requests.get("http://rzhunemogu.ru/Rand.aspx?CType=11",
timeout=30)
result = request.text.split('<content>')[1].split('</content>')[0]
return result.strip()
@staticmethod
def format_img(data: str) -> str:
return f'[url]({data})'

61
pgbotlib/commands.py Normal file
View file

@ -0,0 +1,61 @@
""" Respond to commands """
import telethon
import pgbotlib.api
import pgbotlib.dbstuff
import pgbotlib.response
class Commander:
T_START = frozenset(['start_cmd'])
T_STOP = frozenset(['stop_cmd'])
def __init__(self, config: dict,
client: telethon.TelegramClient,
admins: list,
db_conn: pgbotlib.dbstuff.DBConn,
responder: pgbotlib.response.Responder) -> None:
self.config = config
self.client = client
self.admins = admins
self.db_conn = db_conn
self.responder = responder
self.available_tokens = [
str(token) for token, _ in self.responder.tokens]
def __add_entry(self, caller: int, command: str) -> bool:
if caller not in self.admins:
print('fuck off!')
return None
input_tokens, phrase = command.strip().split(' ', 1)
input_tokenset = frozenset(input_tokens.split(','))
for token in input_tokenset:
if token not in self.available_tokens:
return False
query = 'INSERT INTO responses (tokens, response) values (%s,%s)'
values = (','.join(sorted(input_tokenset)), phrase.strip())
return self.db_conn.update(query, values)
async def action(self,
event: telethon.events.common.EventBuilder) -> None:
command = event.message.text
sender = await event.get_sender()
response = None
match command:
case command if command.startswith('/add '):
if self.__add_entry(sender.id, command[5:]):
response = 'success'
else:
response = 'failure'
case '/list':
response = ', '.join(self.available_tokens)
case '/start':
self.responder.enable()
response = self.responder.get_response(self.T_START)
case '/stop':
self.responder.disable()
response = self.responder.get_response(self.T_STOP)
if response:
await self.client.send_message(event.message.peer_id, response)
return None

22
pgbotlib/dbstuff.py Normal file
View file

@ -0,0 +1,22 @@
import random
import psycopg
class DBConn:
def __init__(self, *args, **kwargs) -> None:
self.connection = psycopg.connect(*args, **kwargs)
self.cursor = self.connection.cursor()
def update(self, query: str, values: tuple) -> list:
self.cursor.execute(query, values)
return self.connection.commit()
def query_raw(self, query: str, values: tuple) -> list:
self.cursor.execute(query, values)
return self.cursor.fetchall()
def query_random(self, query: str, values: tuple) -> str:
result = self.query_raw(query, values)
if not result:
return None
return random.choice(result)[0]

42
pgbotlib/misc.py Normal file
View file

@ -0,0 +1,42 @@
import telethon
import pgbotlib.dbstuff
import pgbotlib.response
class NameGenerator:
def __init__(self, config: dict, db: pgbotlib.dbstuff.DBConn) -> None:
self.db = db
def get_name(self, sender: telethon.tl.types.User) -> str:
query = 'SELECT name FROM names WHERE tg_id = %s'
result = self.db.query_random(query, (sender.id,))
if result:
return result
return self.get_tg_name(sender)
@staticmethod
def get_tg_name(sender: telethon.tl.types.User) -> str:
result = [sender.first_name, sender.last_name]
while None in result:
result.remove(None)
if result:
return ' '.join(result)
if sender.username:
return sender.username
return sender.id
class MiscReactor:
def __init__(self, config: dict, db: pgbotlib.dbstuff.DBConn) -> None:
self.namegen = NameGenerator(config, db)
self.db = db
def spawn_edited_handler(self,
client: telethon.TelegramClient,
trigger: telethon.events.common.EventBuilder) -> None:
@client.on(trigger)
async def handle_edited_message(event):
sender = await event.get_sender()
sender_name = self.namegen.get_name(sender)
chat_id = event.message.peer_id
await client.send_message(chat_id, f'Я всё видел! {sender_name} совсем охуел, сообщения правит!')

105
pgbotlib/response.py Normal file
View file

@ -0,0 +1,105 @@
import re
import telethon
import yaml
import pgbotlib.api
import pgbotlib.dbstuff
def get_token(token_name: str, token_regex: list) -> tuple:
regex = []
for i in token_regex:
regex.append(re.compile(i))
return token_name, regex
def get_tokens(path: str) -> list:
with open(path, 'r', encoding='utf-8') as data:
tokens = yaml.safe_load(data.read())
return [get_token(i, tokens[i]) for i in tokens]
class Responder:
def __init__(self, config: dict,
client: telethon.TelegramClient,
db_connection: pgbotlib.dbstuff.DBConn) -> None:
# apiregex matches "{apiname}optional data"
# message itself is also passed to the api call method
self.started = True
self.apiregex = re.compile(r'^\{(\w+)\}(.+)?$')
self.namegen = pgbotlib.misc.NameGenerator(config, db_connection)
self.tokens = get_tokens(config['response_tokens'])
self.api = pgbotlib.api.ApiWrapper(self.tokens, db_connection)
self.db_connection = db_connection
self.client = client
def __tokenize(self, message: str) -> frozenset:
tokens = set()
for token, regexi in self.tokens:
for regex in regexi:
if regex.search(message):
tokens.add(token)
break
return frozenset(tokens)
def __get_keys(self) -> dict:
result = {}
query = 'SELECT DISTINCT tokens FROM responses'
for i in self.db_connection.query_raw(query, tuple()):
result[frozenset(i[0].split(','))] = i[0]
return result
def __response_choice(self, key: str) -> str:
return self.db_connection.query_random(
"SELECT response FROM responses WHERE tokens = %s", (key,))
def enable(self) -> None:
self.started = True
def disable(self) -> None:
self.started = False
def get_response(self, tokens: frozenset) -> str:
counter = 0
keys = self.__get_keys()
for items, string in keys.items():
if items <= tokens:
# check for priority tokens
for token in items:
if token.startswith('!'):
return self.__response_choice(string)
match_length = len(items & tokens)
if match_length > counter:
counter = match_length
key = string
if not counter:
return None
return self.__response_choice(key)
def api_match(self, response: str, message: str) -> str:
match = self.apiregex.search(response)
if not match:
return response
api_spec = match.groups()
return self.api.call(*api_spec, message)
async def username(self, response: str,
event: telethon.events.common.EventBuilder) -> str:
if '<username>' not in response:
return response
sender = await event.get_sender()
username = self.namegen.get_name(sender)
return response.replace('<username>', username)
async def respond(self,
event: telethon.events.common.EventBuilder) -> None:
if not self.started:
return None
message = event.message.text.lower()
tokens = self.__tokenize(message)
response = self.get_response(tokens)
if not response:
return None
response = self.api_match(response, message)
response = await self.username(response, event)
await self.client.send_message(event.message.peer_id, response)

70
pgbotlib/sched.py Normal file
View file

@ -0,0 +1,70 @@
import asyncio
import time
import random
import yaml
import schedule
import telethon
import pgbotlib.response
class Scheduler:
def __init__(self,
config: dict,
client: telethon.TelegramClient,
responder: pgbotlib.response.Responder) -> None:
self.responder = responder
self.client = client
with open(config['schedule'], 'r', encoding='utf-8') as data:
self.sched = yaml.safe_load(data.read())
self.days = (
schedule.every().day,
schedule.every().monday,
schedule.every().tuesday,
schedule.every().wednesday,
schedule.every().thursday,
schedule.every().friday,
schedule.every().saturday,
schedule.every().sunday
)
def __get_job(self, tokens: frozenset,
chat_id: int, rand: int) -> callable:
async def send_message():
if rand:
time.sleep(random.randint(0, rand) * 60)
message = self.responder.get_response(tokens)
message = self.responder.api_match(message, '')
await self.client.send_message(chat_id, message)
def job():
loop = asyncio.get_event_loop()
coroutine = send_message()
loop.run_until_complete(coroutine)
return job
def __schedule_job(self, tokens: str, chat: int,
day: int, t: str, rand: int) -> None:
job_tokens = frozenset(tokens.split(','))
job = self.__get_job(job_tokens, chat, rand)
self.days[day].at(t).do(job)
def build(self) -> None:
for i in self.sched:
for day in i.get('days', [0]):
for timespec in i['time']:
self.__schedule_job(i['tokens'], i['chat'],
day, timespec, i.get('rand', 0))
def run(self) -> None:
while True:
schedule.run_pending()
time.sleep(1)
def spawn_scheduler(config: dict, client: telethon.TelegramClient,
responder: pgbotlib.response.Responder) -> Scheduler:
asyncio.set_event_loop(asyncio.new_event_loop())
scheduler = Scheduler(config, client, responder)
scheduler.build()
scheduler.run()