From 430229048275ec6a5b59f85d9d0ee856b8753e42 Mon Sep 17 00:00:00 2001 From: Von Random Date: Tue, 31 Oct 2023 01:45:36 +0200 Subject: [PATCH] initial commit --- README.md | 1 + pgbot | 62 +++++++++++++++++++++++ pgbotlib/api.py | 118 +++++++++++++++++++++++++++++++++++++++++++ pgbotlib/commands.py | 61 ++++++++++++++++++++++ pgbotlib/dbstuff.py | 22 ++++++++ pgbotlib/misc.py | 42 +++++++++++++++ pgbotlib/response.py | 105 ++++++++++++++++++++++++++++++++++++++ pgbotlib/sched.py | 70 +++++++++++++++++++++++++ 8 files changed, 481 insertions(+) create mode 100755 pgbot create mode 100644 pgbotlib/api.py create mode 100644 pgbotlib/commands.py create mode 100644 pgbotlib/dbstuff.py create mode 100644 pgbotlib/misc.py create mode 100644 pgbotlib/response.py create mode 100644 pgbotlib/sched.py diff --git a/README.md b/README.md index 918d5c6..20111a7 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,3 @@ # pgbot +Use `config.yml` to set it up. It needs a list of regex with tokens and a database to match them. This doc is probably going to be abandoned right away, but at least I have added this line I dunno. diff --git a/pgbot b/pgbot new file mode 100755 index 0000000..40f9b82 --- /dev/null +++ b/pgbot @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 + +import sys +import threading + +import telethon +import yaml + +import pgbotlib.dbstuff +import pgbotlib.commands +import pgbotlib.misc +import pgbotlib.response +import pgbotlib.sched + + +def init(args: list) -> tuple: + conf_path = args[0] if args else 'config.yml' + try: + with open(conf_path, 'r', encoding='utf-8') as data: + config = yaml.safe_load(data.read()) + except FileNotFoundError as err: + sys.exit(err) + + client = telethon.TelegramClient( + 'bot_session', config['api_id'], + config['api_hash']).start(bot_token=config['bot_token']) + + # db_conn = pgbotlib.dbstuff.DBConn( + # f'dbname={config['db_name']} user={config['db_user']}') + db_conn = pgbotlib.dbstuff.DBConn(config['db_spec']) + + return config, db_conn, client + + +def main(): + config, db_conn, client = init(sys.argv[1:]) + + responder = pgbotlib.response.Responder(config, client, db_conn) + commander = pgbotlib.commands.Commander(config, client, config['admins'], + db_conn, responder) + + sched_thread = threading.Thread( + target=pgbotlib.sched.spawn_scheduler, + args=(config, client, responder), + daemon=True) + sched_thread.start() + + @client.on(telethon.events.NewMessage()) + async def handle_new_message(event): + chat = await event.get_chat() + result = await client.get_messages(chat.id, ids=[event.message.reply_to.reply_to_msg_id]) + print(result) + if event.message.text.startswith('/'): + await commander.action(event) + else: + await responder.respond(event) + + client.run_until_disconnected() + + +if __name__ == '__main__': + main() diff --git a/pgbotlib/api.py b/pgbotlib/api.py new file mode 100644 index 0000000..848f3c0 --- /dev/null +++ b/pgbotlib/api.py @@ -0,0 +1,118 @@ +""" Some functions for api calls """ + +import json +import random +import re + +import requests +import bs4 +import fake_headers + +import pgbotlib.dbstuff + + +class ApiWrapper: + FAILED = 'я обосрался :<' + GIF_REGEX = { + 'part': re.compile(r'(?<=\).*(?=\<\/center\>)'), + 'gif': re.compile(r'(?<=src=").*(?="\s)') + } + SEARCH_TOKENS = ['botname', '!find'] + + def __init__(self, tokens: dict, db_conn: pgbotlib.dbstuff.DBConn) -> None: + self.tokens = tokens + self.db_conn = db_conn + self.nonw = re.compile(r'\W') + self.headers = fake_headers.Headers(headers=True) + + # this is the entry point for the api calls + # if you add another api, make sure there is a match here + def call(self, api: str, data: str | None, message: str) -> str: + match api: + case 'img_url': return self.format_img(data) + case 'gif': return self.get_gif() + case 'kmp': return self.get_kmp() + case 'fga': return self.get_fga() + case 'fakenews': return self.get_fakenews() + case 'anek': return self.get_anek() + case 'y_search': return self.y_search(message) + case _: return self.FAILED + + def __sanitize_search(self, message: str) -> str: + """Removes one of each of the search tokens from the query + so that "bot find" phrase does not poison the search query + + It's not guaranteed it will delete the first match though, + and I see no point in implementing that""" + keywords = self.nonw.sub(' ', message) + for token_spec in self.tokens: + if token_spec[0] not in self.SEARCH_TOKENS: + continue + for regex in token_spec[1]: + sub_spec = regex.subn('', keywords, count=1) + if sub_spec[1]: + keywords = sub_spec[0] + break + return keywords + + def y_search(self, message: str) -> str: + """Pretty much copy & paste from the original bot + I have no fucking clue how this black magic works""" + query = self.__sanitize_search(message) + request = requests.get('https://yandex.ru/images/search', + timeout=30, + params={'text': query, + 'nomisspell': 1, + 'noreask': 1, + 'isize': 'medium'}, + headers=self.headers.generate()) + parser = bs4.BeautifulSoup(request.text, 'html.parser') + items_place = parser.find('div', {'class': 'serp-list'}) + items = items_place.find_all('div', {'class': 'serp-item'}) + images = [] + for item in items: + data = json.loads(item.get('data-bem')) + images.append(data['serp-item']['img_href']) + if not images: + return None + result = random.choice(images) + return f'[url]({result})' + + def get_gif(self) -> str: + resp = requests.get("http://xdgif.ru/random/", timeout=30) + part = self.GIF_REGEX['part'].search(resp.text).group(0) + gif = self.GIF_REGEX['gif'].search(part).group(0) + return gif + + @staticmethod + def get_kmp() -> str: + request = requests.get("https://killpls.me/random/", timeout=30) + parser = bs4.BeautifulSoup(request.text, features="html.parser") + result = parser.find("div", attrs={ + "style": "margin:0.5em 0;line-height:1.785em"}) + return result.text.strip() + + @staticmethod + def get_fga() -> str: + request = requests.get("http://fucking-great-advice.ru/api/random", + timeout=30) + return json.loads(request.text)["text"] + + @staticmethod + def get_fakenews() -> str: + request = requests.get("http://news.olegmakarenko.ru/news", timeout=30) + parser = bs4.BeautifulSoup(request.text, features="html.parser") + news = [item.text.strip() for item in parser.find_all( + "span", attrs={"class": "headlinetext"})] + return random.choice(news) + + @staticmethod + def get_anek() -> str: + request = requests.get("http://rzhunemogu.ru/Rand.aspx?CType=11", + timeout=30) + result = request.text.split('')[1].split('')[0] + return result.strip() + + @staticmethod + def format_img(data: str) -> str: + return f'[url]({data})' diff --git a/pgbotlib/commands.py b/pgbotlib/commands.py new file mode 100644 index 0000000..360b938 --- /dev/null +++ b/pgbotlib/commands.py @@ -0,0 +1,61 @@ +""" Respond to commands """ + +import telethon + +import pgbotlib.api +import pgbotlib.dbstuff +import pgbotlib.response + + +class Commander: + T_START = frozenset(['start_cmd']) + T_STOP = frozenset(['stop_cmd']) + + def __init__(self, config: dict, + client: telethon.TelegramClient, + admins: list, + db_conn: pgbotlib.dbstuff.DBConn, + responder: pgbotlib.response.Responder) -> None: + self.config = config + self.client = client + self.admins = admins + self.db_conn = db_conn + self.responder = responder + self.available_tokens = [ + str(token) for token, _ in self.responder.tokens] + + def __add_entry(self, caller: int, command: str) -> bool: + if caller not in self.admins: + print('fuck off!') + return None + input_tokens, phrase = command.strip().split(' ', 1) + input_tokenset = frozenset(input_tokens.split(',')) + for token in input_tokenset: + if token not in self.available_tokens: + return False + query = 'INSERT INTO responses (tokens, response) values (%s,%s)' + values = (','.join(sorted(input_tokenset)), phrase.strip()) + return self.db_conn.update(query, values) + + async def action(self, + event: telethon.events.common.EventBuilder) -> None: + command = event.message.text + sender = await event.get_sender() + response = None + match command: + case command if command.startswith('/add '): + if self.__add_entry(sender.id, command[5:]): + response = 'success' + else: + response = 'failure' + case '/list': + response = ', '.join(self.available_tokens) + case '/start': + self.responder.enable() + response = self.responder.get_response(self.T_START) + case '/stop': + self.responder.disable() + response = self.responder.get_response(self.T_STOP) + if response: + await self.client.send_message(event.message.peer_id, response) + return None diff --git a/pgbotlib/dbstuff.py b/pgbotlib/dbstuff.py new file mode 100644 index 0000000..e207fae --- /dev/null +++ b/pgbotlib/dbstuff.py @@ -0,0 +1,22 @@ +import random +import psycopg + + +class DBConn: + def __init__(self, *args, **kwargs) -> None: + self.connection = psycopg.connect(*args, **kwargs) + self.cursor = self.connection.cursor() + + def update(self, query: str, values: tuple) -> list: + self.cursor.execute(query, values) + return self.connection.commit() + + def query_raw(self, query: str, values: tuple) -> list: + self.cursor.execute(query, values) + return self.cursor.fetchall() + + def query_random(self, query: str, values: tuple) -> str: + result = self.query_raw(query, values) + if not result: + return None + return random.choice(result)[0] diff --git a/pgbotlib/misc.py b/pgbotlib/misc.py new file mode 100644 index 0000000..cc51a94 --- /dev/null +++ b/pgbotlib/misc.py @@ -0,0 +1,42 @@ +import telethon +import pgbotlib.dbstuff +import pgbotlib.response + + +class NameGenerator: + def __init__(self, config: dict, db: pgbotlib.dbstuff.DBConn) -> None: + self.db = db + + def get_name(self, sender: telethon.tl.types.User) -> str: + query = 'SELECT name FROM names WHERE tg_id = %s' + result = self.db.query_random(query, (sender.id,)) + if result: + return result + return self.get_tg_name(sender) + + @staticmethod + def get_tg_name(sender: telethon.tl.types.User) -> str: + result = [sender.first_name, sender.last_name] + while None in result: + result.remove(None) + if result: + return ' '.join(result) + if sender.username: + return sender.username + return sender.id + + +class MiscReactor: + def __init__(self, config: dict, db: pgbotlib.dbstuff.DBConn) -> None: + self.namegen = NameGenerator(config, db) + self.db = db + + def spawn_edited_handler(self, + client: telethon.TelegramClient, + trigger: telethon.events.common.EventBuilder) -> None: + @client.on(trigger) + async def handle_edited_message(event): + sender = await event.get_sender() + sender_name = self.namegen.get_name(sender) + chat_id = event.message.peer_id + await client.send_message(chat_id, f'Я всё видел! {sender_name} совсем охуел, сообщения правит!') diff --git a/pgbotlib/response.py b/pgbotlib/response.py new file mode 100644 index 0000000..a46559f --- /dev/null +++ b/pgbotlib/response.py @@ -0,0 +1,105 @@ +import re + +import telethon +import yaml +import pgbotlib.api +import pgbotlib.dbstuff + + +def get_token(token_name: str, token_regex: list) -> tuple: + regex = [] + for i in token_regex: + regex.append(re.compile(i)) + return token_name, regex + + +def get_tokens(path: str) -> list: + with open(path, 'r', encoding='utf-8') as data: + tokens = yaml.safe_load(data.read()) + return [get_token(i, tokens[i]) for i in tokens] + + +class Responder: + def __init__(self, config: dict, + client: telethon.TelegramClient, + db_connection: pgbotlib.dbstuff.DBConn) -> None: + # apiregex matches "{apiname}optional data" + # message itself is also passed to the api call method + self.started = True + self.apiregex = re.compile(r'^\{(\w+)\}(.+)?$') + self.namegen = pgbotlib.misc.NameGenerator(config, db_connection) + self.tokens = get_tokens(config['response_tokens']) + self.api = pgbotlib.api.ApiWrapper(self.tokens, db_connection) + self.db_connection = db_connection + self.client = client + + def __tokenize(self, message: str) -> frozenset: + tokens = set() + for token, regexi in self.tokens: + for regex in regexi: + if regex.search(message): + tokens.add(token) + break + return frozenset(tokens) + + def __get_keys(self) -> dict: + result = {} + query = 'SELECT DISTINCT tokens FROM responses' + for i in self.db_connection.query_raw(query, tuple()): + result[frozenset(i[0].split(','))] = i[0] + return result + + def __response_choice(self, key: str) -> str: + return self.db_connection.query_random( + "SELECT response FROM responses WHERE tokens = %s", (key,)) + + def enable(self) -> None: + self.started = True + + def disable(self) -> None: + self.started = False + + def get_response(self, tokens: frozenset) -> str: + counter = 0 + keys = self.__get_keys() + for items, string in keys.items(): + if items <= tokens: + # check for priority tokens + for token in items: + if token.startswith('!'): + return self.__response_choice(string) + match_length = len(items & tokens) + if match_length > counter: + counter = match_length + key = string + if not counter: + return None + return self.__response_choice(key) + + def api_match(self, response: str, message: str) -> str: + match = self.apiregex.search(response) + if not match: + return response + api_spec = match.groups() + return self.api.call(*api_spec, message) + + async def username(self, response: str, + event: telethon.events.common.EventBuilder) -> str: + if '' not in response: + return response + sender = await event.get_sender() + username = self.namegen.get_name(sender) + return response.replace('', username) + + async def respond(self, + event: telethon.events.common.EventBuilder) -> None: + if not self.started: + return None + message = event.message.text.lower() + tokens = self.__tokenize(message) + response = self.get_response(tokens) + if not response: + return None + response = self.api_match(response, message) + response = await self.username(response, event) + await self.client.send_message(event.message.peer_id, response) diff --git a/pgbotlib/sched.py b/pgbotlib/sched.py new file mode 100644 index 0000000..9b3ada2 --- /dev/null +++ b/pgbotlib/sched.py @@ -0,0 +1,70 @@ +import asyncio +import time +import random + +import yaml +import schedule +import telethon +import pgbotlib.response + + +class Scheduler: + def __init__(self, + config: dict, + client: telethon.TelegramClient, + responder: pgbotlib.response.Responder) -> None: + self.responder = responder + self.client = client + with open(config['schedule'], 'r', encoding='utf-8') as data: + self.sched = yaml.safe_load(data.read()) + self.days = ( + schedule.every().day, + schedule.every().monday, + schedule.every().tuesday, + schedule.every().wednesday, + schedule.every().thursday, + schedule.every().friday, + schedule.every().saturday, + schedule.every().sunday + ) + + def __get_job(self, tokens: frozenset, + chat_id: int, rand: int) -> callable: + async def send_message(): + if rand: + time.sleep(random.randint(0, rand) * 60) + message = self.responder.get_response(tokens) + message = self.responder.api_match(message, '') + await self.client.send_message(chat_id, message) + + def job(): + loop = asyncio.get_event_loop() + coroutine = send_message() + loop.run_until_complete(coroutine) + return job + + def __schedule_job(self, tokens: str, chat: int, + day: int, t: str, rand: int) -> None: + job_tokens = frozenset(tokens.split(',')) + job = self.__get_job(job_tokens, chat, rand) + self.days[day].at(t).do(job) + + def build(self) -> None: + for i in self.sched: + for day in i.get('days', [0]): + for timespec in i['time']: + self.__schedule_job(i['tokens'], i['chat'], + day, timespec, i.get('rand', 0)) + + def run(self) -> None: + while True: + schedule.run_pending() + time.sleep(1) + + +def spawn_scheduler(config: dict, client: telethon.TelegramClient, + responder: pgbotlib.response.Responder) -> Scheduler: + asyncio.set_event_loop(asyncio.new_event_loop()) + scheduler = Scheduler(config, client, responder) + scheduler.build() + scheduler.run()