70 lines
2.3 KiB
Python
70 lines
2.3 KiB
Python
import asyncio
|
|
import time
|
|
import random
|
|
|
|
import yaml
|
|
import schedule
|
|
import telethon
|
|
import pgbotlib.response
|
|
|
|
|
|
class Scheduler:
|
|
def __init__(self,
|
|
config: dict,
|
|
client: telethon.TelegramClient,
|
|
responder: pgbotlib.response.Responder) -> None:
|
|
self.responder = responder
|
|
self.client = client
|
|
with open(config['schedule'], 'r', encoding='utf-8') as data:
|
|
self.sched = yaml.safe_load(data.read())
|
|
self.days = (
|
|
schedule.every().day,
|
|
schedule.every().monday,
|
|
schedule.every().tuesday,
|
|
schedule.every().wednesday,
|
|
schedule.every().thursday,
|
|
schedule.every().friday,
|
|
schedule.every().saturday,
|
|
schedule.every().sunday
|
|
)
|
|
|
|
def __get_job(self, tokens: frozenset,
|
|
chat_id: int, rand: int) -> callable:
|
|
async def send_message():
|
|
if rand:
|
|
time.sleep(random.randint(0, rand) * 60)
|
|
message = self.responder.get_response(tokens)
|
|
message = self.responder.api_match(message, '')
|
|
await self.client.send_message(chat_id, message)
|
|
|
|
def job():
|
|
loop = asyncio.get_event_loop()
|
|
coroutine = send_message()
|
|
loop.run_until_complete(coroutine)
|
|
return job
|
|
|
|
def __schedule_job(self, tokens: str, chat: int,
|
|
day: int, t: str, rand: int) -> None:
|
|
job_tokens = frozenset(tokens.split(','))
|
|
job = self.__get_job(job_tokens, chat, rand)
|
|
self.days[day].at(t).do(job)
|
|
|
|
def build(self) -> None:
|
|
for i in self.sched:
|
|
for day in i.get('days', [0]):
|
|
for timespec in i['time']:
|
|
self.__schedule_job(i['tokens'], i['chat'],
|
|
day, timespec, i.get('rand', 0))
|
|
|
|
def run(self) -> None:
|
|
while True:
|
|
schedule.run_pending()
|
|
time.sleep(1)
|
|
|
|
|
|
def spawn_scheduler(config: dict, client: telethon.TelegramClient,
|
|
responder: pgbotlib.response.Responder) -> Scheduler:
|
|
asyncio.set_event_loop(asyncio.new_event_loop())
|
|
scheduler = Scheduler(config, client, responder)
|
|
scheduler.build()
|
|
scheduler.run()
|