From 8613c57eec622c4e3f699b1e22975ed6f96b6f5f Mon Sep 17 00:00:00 2001 From: Jarno Rankinen Date: Sun, 20 Nov 2022 12:25:01 +0200 Subject: [PATCH] Removed unused modules --- modules/cron.py | 65 --------- modules/googlecal.py | 182 ------------------------- modules/url.py | 307 ------------------------------------------- 3 files changed, 554 deletions(-) delete mode 100644 modules/cron.py delete mode 100644 modules/googlecal.py delete mode 100644 modules/url.py diff --git a/modules/cron.py b/modules/cron.py deleted file mode 100644 index 3564327..0000000 --- a/modules/cron.py +++ /dev/null @@ -1,65 +0,0 @@ -import shlex -import os -from datetime import datetime - -from .common.module import BotModule - - -class MatrixModule(BotModule): - daily_commands = dict() # room_id -> command json - last_hour = datetime.now().hour - - async def matrix_message(self, bot, room, event): - bot.must_be_admin(room, event) - - args = shlex.split(event.body) - args.pop(0) - if len(args) == 3: - if args[0] == 'daily': - dailytime = int(args[1]) - dailycmd = args[2] - if not self.daily_commands.get(room.room_id): - self.daily_commands[room.room_id] = [] - self.daily_commands[room.room_id].append( - {'time': dailytime, 'command': dailycmd}) - bot.save_settings() - await bot.send_text(room, 'Daily command added.') - elif len(args) == 1: - if args[0] == 'list': - await bot.send_text(room, 'Daily commands on this room: ' + str(self.daily_commands.get(room.room_id))) - elif args[0] == 'clear': - self.daily_commands.pop(room.room_id, None) - bot.save_settings() - await bot.send_text(room, 'Cleared commands on this room.') - elif args[0] == 'time': - await bot.send_text(room, '{datetime} {timezone}'.format(datetime=datetime.now(), timezone=os.environ.get('TZ'))) - - def help(self): - return ('Runs scheduled commands') - - def get_settings(self): - data = super().get_settings() - data['daily_commands'] = self.daily_commands - return data - - def set_settings(self, data): - super().set_settings(data) - if data.get('daily_commands'): - self.daily_commands = data['daily_commands'] - - async def matrix_poll(self, bot, pollcount): - delete_rooms = [] - if self.last_hour != datetime.now().hour: - self.last_hour = datetime.now().hour - - for room_id in self.daily_commands: - if room_id in bot.client.rooms: - commands = self.daily_commands[room_id] - for command in commands: - if int(command['time']) == self.last_hour: - await bot.send_text(bot.get_room_by_id(room_id), command['command'], 'm.text') - else: - delete_rooms.append(room_id) - - for roomid in delete_rooms: - self.daily_commands.pop(roomid, None) diff --git a/modules/googlecal.py b/modules/googlecal.py deleted file mode 100644 index 588f506..0000000 --- a/modules/googlecal.py +++ /dev/null @@ -1,182 +0,0 @@ -from __future__ import print_function - -import os -import os.path -import pickle -from datetime import datetime, timedelta - -from google.auth.transport.requests import Request -from google_auth_oauthlib.flow import InstalledAppFlow -from googleapiclient.discovery import build - -# -# Google calendar notifications -# -# Note: Provide a token.pickle file for the service. -# It's created on first run (run from console!) and -# can be copied to another computer. -# -from modules.common.module import BotModule - - -class MatrixModule(BotModule): - def __init__(self, name): - super().__init__(name) - self.credentials_file = "credentials.json" - self.SCOPES = ['https://www.googleapis.com/auth/calendar.readonly'] - self.bot = None - self.service = None - self.calendar_rooms = dict() # Contains room_id -> [calid, calid] .. - self.enabled = False - - def matrix_start(self, bot): - super().matrix_start(bot) - self.bot = bot - creds = None - - if not os.path.exists(self.credentials_file) or os.path.getsize(self.credentials_file) == 0: - return # No-op if not set up - - if os.path.exists('token.pickle'): - with open('token.pickle', 'rb') as token: - creds = pickle.load(token) - self.logger.info('Loaded existing pickle file') - # If there are no (valid) credentials available, let the user log in. - if not creds or not creds.valid: - self.logger.warn('No credentials or credentials not valid!') - if creds and creds.expired and creds.refresh_token: - creds.refresh(Request()) - else: - flow = InstalledAppFlow.from_client_secrets_file(self.credentials_file, self.SCOPES) - # urn:ietf:wg:oauth:2.0:oob - creds = flow.run_local_server(port=0) - # Save the credentials for the next run - with open('token.pickle', 'wb') as token: - pickle.dump(creds, token) - self.logger.info('Pickle saved') - - self.service = build('calendar', 'v3', credentials=creds) - - try: - calendar_list = self.service.calendarList().list().execute()['items'] - self.logger.info(f'Google calendar set up successfully with access to {len(calendar_list)} calendars:\n') - for calendar in calendar_list: - self.logger.info(f"{calendar['summary']} - + {calendar['id']}") - except Exception: - self.logger.error('Getting calendar list failed!') - - async def matrix_message(self, bot, room, event): - if not self.service: - await bot.send_text(room, 'Google calendar not set up for this bot.') - return - args = event.body.split() - events = [] - calendars = self.calendar_rooms.get(room.room_id) or [] - - if len(args) == 2: - if args[1] == 'today': - for calid in calendars: - self.logger.info(f'Listing events in cal {calid}') - events = events + self.list_today(calid) - if args[1] == 'list': - await bot.send_text(room, 'Calendars in this room: ' + str(self.calendar_rooms.get(room.room_id))) - return - - elif len(args) == 3: - if args[1] == 'add': - bot.must_be_admin(room, event) - - calid = args[2] - self.logger.info(f'Adding calendar {calid} to room id {room.room_id}') - - if self.calendar_rooms.get(room.room_id): - if calid not in self.calendar_rooms[room.room_id]: - self.calendar_rooms[room.room_id].append(calid) - else: - await bot.send_text(room, 'This google calendar already added in this room!') - return - else: - self.calendar_rooms[room.room_id] = [calid] - - self.logger.info(f'Calendars now for this room {self.calendar_rooms.get(room.room_id)}') - - bot.save_settings() - - await bot.send_text(room, 'Added new google calendar to this room') - return - - if args[1] == 'del': - bot.must_be_admin(room, event) - - calid = args[2] - self.logger.info(f'Removing calendar {calid} from room id {room.room_id}') - - if self.calendar_rooms.get(room.room_id): - self.calendar_rooms[room.room_id].remove(calid) - - self.logger.info(f'Calendars now for this room {self.calendar_rooms.get(room.room_id)}') - - bot.save_settings() - - await bot.send_text(room, 'Removed google calendar from this room') - return - - else: - for calid in calendars: - self.logger.info(f'Listing events in cal {calid}') - events = events + self.list_upcoming(calid) - - if len(events) > 0: - self.logger.info(f'Found {len(events)} events') - await self.send_events(bot, events, room) - else: - self.logger.info(f'No events found') - await bot.send_text(room, 'No events found, try again later :)') - - async def send_events(self, bot, events, room): - for event in events: - start = event['start'].get('dateTime', event['start'].get('date')) - await bot.send_html(room, f'{self.parse_date(start)} {event["summary"]}', - f'{self.parse_date(start)} {event["summary"]}') - - def list_upcoming(self, calid): - startTime = datetime.utcnow() - now = startTime.isoformat() + 'Z' - events_result = self.service.events().list(calendarId=calid, timeMin=now, - maxResults=10, singleEvents=True, - orderBy='startTime').execute() - events = events_result.get('items', []) - return events - - def list_today(self, calid): - startTime = datetime.utcnow() - startTime = startTime.replace(hour=0, minute=0, second=0, microsecond=0) - endTime = startTime + timedelta(hours=24) - now = startTime.isoformat() + 'Z' - end = endTime.isoformat() + 'Z' - self.logger.info(f'Looking for events between {now} {end}') - events_result = self.service.events().list(calendarId=calid, timeMin=now, - timeMax=end, maxResults=10, singleEvents=True, - orderBy='startTime').execute() - return events_result.get('items', []) - - def help(self): - return 'Google calendar. Lists 10 next events by default. today = list today\'s events.' - - def get_settings(self): - data = super().get_settings() - data['calendar_rooms'] = self.calendar_rooms - return data - - def set_settings(self, data): - super().set_settings(data) - if data.get('calendar_rooms'): - self.calendar_rooms = data['calendar_rooms'] - - def parse_date(self, start): - try: - dt = datetime.strptime(start, '%Y-%m-%dT%H:%M:%S%z') - return dt.strftime("%d.%m %H:%M") - except ValueError: - dt = datetime.strptime(start, '%Y-%m-%d') - return dt.strftime("%d.%m") diff --git a/modules/url.py b/modules/url.py deleted file mode 100644 index 954890b..0000000 --- a/modules/url.py +++ /dev/null @@ -1,307 +0,0 @@ -import re -import shlex -from functools import lru_cache - -import httpx -import sys -import traceback -from bs4 import BeautifulSoup -from nio import RoomMessageText - -from modules.common.module import BotModule - - -class MatrixModule(BotModule): - """ - Simple url fetch and spit out title module. - - Everytime a url is seen in a message we do http request to it and try to get a title tag contents to spit out to the room. - """ - - def __init__(self, name): - super().__init__(name) - - self.bot = None - self.status = dict() # room_id -> what to do with urls - self.type = "m.notice" # notice or text - # this will be extended when matrix_start is called - self.useragent = "Mozilla/5.0 (compatible; Hemppa; +https://github.com/vranki/hemppa/)" - - self.STATUSES = { - "OFF": "Not spamming this channel", - "TITLE": "Spamming this channel with titles", - "DESCRIPTION": "Spamming this channel with descriptions", - "BOTH": "Spamming this channel with both title and description", - } - self.blacklist = [ ] - self.enabled = False - - def matrix_start(self, bot): - """ - Register callback for all RoomMessageText events on startup - """ - super().matrix_start(bot) - self.bot = bot - bot.client.add_event_callback(self.text_cb, RoomMessageText) - # extend the useragent string to contain version and bot name - self.useragent = f"Mozilla/5.0 (compatible; Hemppa/{self.bot.version}; {self.bot.client.user}; +https://github.com/vranki/hemppa/)" - self.logger.debug(f"useragent: {self.useragent}") - - - def matrix_stop(self, bot): - super().matrix_stop(bot) - bot.remove_callback(self.text_cb) - - def user_agent_for_url(self, url): - if ('youtube.com' in url) or ('youtu.be' in url) or ('google.com' in url): - return 'curl/7.64.0' - return self.useragent - - # Currently not used, but for future needs: - def cookies_for_url(self, url): - if ('youtube.com' in url) or ('youtu.be' in url) or ('google.com' in url): - cookies = httpx.Cookies() - cookies.set('CONSENT', 'YES', domain='.youtube.com') - return cookies -# return {'CONSENT': 'YES', 'Domain': '.youtube.com', -# 'Path': '/', 'SameSite' : 'None', 'Expires': 'Sun, 10 Jan 2038 07:59:59 GMT', 'Max-Age': '946080000'} - return {} - - async def text_cb(self, room, event): - """ - Handle client callbacks for all room text events - """ - if self.bot.should_ignore_event(event): - return - - # no content at all? - if len(event.body) < 1: - return - - if "content" in event.source: - # skip edited content to prevent spamming the same thing multiple times - if "m.new_content" in event.source["content"]: - self.logger.debug("Skipping edited event to prevent spam") - return - # skip reply messages to prevent spam - if "m.relates_to" in event.source["content"]: - self.logger.debug("Skipping reply message to prevent spam") - return - - # are we on in this room? - status = self.status.get(room.room_id, "OFF") - if status not in self.STATUSES: - return - if status == "OFF": - return - - try: - # extract possible urls from message - urls = re.findall(r"(https?://\S+)", event.body) - - # no urls, nothing to do - if len(urls) == 0: - return - - # fetch the urls and if we can see a title spit it out - for url in urls: - # fix for #98 a bit ugly, but skip all matrix.to urls - # those are 99.99% pills and should not - # spam the channel with matrix.to titles - if url.startswith("https://matrix.to/#/"): - self.logger.debug(f"Skipping matrix.to url (#98): {url}") - continue - - url_blacklisted = False - for blacklisted in self.blacklist: - if blacklisted in url: - url_blacklisted = True - if url_blacklisted: - self.logger.debug(f"Skipping blacklisted url {url}") - continue - - try: - title, description = self.get_content_from_url(url) - except Exception as e: - self.logger.warning(f"could not fetch url: {e}") - traceback.print_exc(file=sys.stderr) - # failed fetching, give up - continue - - msg = "" - - if status == "TITLE" and title is not None: - msg = f"Title: {title}" - elif status == "DESCRIPTION" and description is not None: - msg = f"Description: {description}" - - elif status == "BOTH" and title is not None and description is not None: - msg = f"Title: {title}\nDescription: {description}" - - elif status == "BOTH" and title is not None: - msg = f"Title: {title}" - elif status == "BOTH" and description is not None: - msg = f"Description: {description}" - - if msg.strip(): # Evaluates to true on non-empty strings - await self.bot.send_text(room, msg, msgtype=self.type, bot_ignore=True) - except Exception as e: - self.logger.warning(f"Unexpected error in url module text_cb: {e}") - traceback.print_exc(file=sys.stderr) - - @lru_cache(maxsize=128) - def get_content_from_url(self, url): - """ - Fetch url and try to get the title and description from the response - """ - title = None - description = None - # timeout will still handle network timeouts - timeout = httpx.Timeout(10.0) - responsetext = "" # read our response here - try: - self.logger.debug(f"start streaming {url}") - # stream the response so that we can set a upper limit on how much we want to fetch. - # as we are using stream the r.text wont be available, save our read data ourself - - # maximum size to read of the response in characters (this prevents us from reading stream forever) - maxsize = 800000 - headers = { - 'user-agent': self.user_agent_for_url(url) - } - # Google may break things anytime so here are some things to try: - # If needed some day.. 'Set-Cookie': "CONSENT=YES; Domain=.youtube.com; Path=/; SameSite=None; Secure; Expires=Sun, 10 Jan 2038 07:59:59 GMT; Max-Age=946080000" - # cookies = self.cookies_for_url(url) - # print('cookies', url, cookies) - # print('headers', headers) - with httpx.stream("GET", url, timeout=timeout, headers=headers) as r: - for part in r.iter_text(): - # self.logger.debug( - # f"reading response stream, limiting in {maxsize} bytes" - # ) - - responsetext += part - maxsize -= len(part) - - if maxsize < 0: - break - - self.logger.debug(f"end streaming {url}") - except Exception as e: - self.logger.warning(f"Failed fetching url {url}. Error: {e}") - return (title, description) - - if r.status_code != 200: - self.logger.warning( - f"Failed fetching url {url}. Status code: {r.status_code}" - ) - return (title, description) - - # try parse and get the title - try: - soup = BeautifulSoup(responsetext, "html.parser") - - if soup.title and len(soup.title.string) > 0: - title = soup.title.string - else: - title_tag = soup.find("meta", attrs={"name": "title"}) - ogtitle = soup.find("meta", property="og:title") - if title_tag: - title = title_tag.get("content", None) - elif ogtitle: - title = ogtitle["content"] - elif soup.head and soup.head.title: - title = soup.head.title.string.strip() - descr_tag = soup.find("meta", attrs={"name": "description"}) - if descr_tag: - description = descr_tag.get("content", None) - except Exception as e: - self.logger.warning(f"Failed parsing response from url {url}. Error: {e}") - return (title, description) - - # Title should not contain newlines or tabs - if title is not None: - assert isinstance(title, str) - title = title.replace("\n", "") - title = title.replace("\t", "") - return (title, description) - - async def matrix_message(self, bot, room, event): - """ - commands for setting what to do in this channel - """ - bot.must_be_admin(room, event) - - args = shlex.split(event.body) - args.pop(0) - - # save the new status - if len(args) == 1 and self.STATUSES.get(args[0].upper()) is not None: - self.status[room.room_id] = args[0].upper() - bot.save_settings() - await bot.send_text( - room, f"Ok, {self.STATUSES.get(self.status[room.room_id])}" - ) - return - - # show status - elif len(args) == 1 and args[0] == "status": - status = self.STATUSES.get(self.status.get(room.room_id, "OFF")) + f', URL blacklist: {self.blacklist}' - await bot.send_text( - room, status - ) - return - - # set type to notice - elif len(args) == 1 and args[0] == "notice": - bot.must_be_owner(event) - self.type = "m.notice" - bot.save_settings() - await bot.send_text(room, "Sending titles as notices from now on.") - return - - # show status - elif len(args) == 1 and args[0] == "text": - bot.must_be_owner(event) - self.type = "m.text" - bot.save_settings() - await bot.send_text(room, "Sending titles as text from now on.") - return - - # set blacklist - elif len(args) == 2 and args[0] == "blacklist": - bot.must_be_owner(event) - if args[1] == 'clear': - self.blacklist = [] - else: - self.blacklist = args[1].split(',') - bot.save_settings() - await bot.send_text(room, f"Blacklisted URLs set to {self.blacklist}") - return - - # invalid command - await bot.send_text( - room, - "Sorry, I did not understand. See README for command list.", - ) - - return - - def get_settings(self): - data = super().get_settings() - data["status"] = self.status - data["type"] = self.type - data["blacklist"] = self.blacklist - return data - - def set_settings(self, data): - super().set_settings(data) - if data.get("status"): - self.status = data["status"] - if data.get("type"): - self.type = data["type"] - if data.get("blacklist"): - self.blacklist = data["blacklist"] - - def help(self): - return "If I see a url in a message I will try to get the title from the page and spit it out"