import re import shlex from functools import lru_cache import httpx import sys import traceback from bs4 import BeautifulSoup from nio import RoomMessageText from modules.common.module import BotModule class MatrixModule(BotModule): """ Simple url fetch and spit out title module. Everytime a url is seen in a message we do http request to it and try to get a title tag contents to spit out to the room. """ def __init__(self, name): super().__init__(name) self.bot = None self.status = dict() # room_id -> what to do with urls self.type = "m.notice" # notice or text # this will be extended when matrix_start is called self.useragent = "Mozilla/5.0 (compatible; Hemppa; +https://github.com/vranki/hemppa/)" self.STATUSES = { "OFF": "Not spamming this channel", "TITLE": "Spamming this channel with titles", "DESCRIPTION": "Spamming this channel with descriptions", "BOTH": "Spamming this channel with both title and description", } self.blacklist = [ ] self.enabled = False def matrix_start(self, bot): """ Register callback for all RoomMessageText events on startup """ super().matrix_start(bot) self.bot = bot bot.client.add_event_callback(self.text_cb, RoomMessageText) # extend the useragent string to contain version and bot name self.useragent = f"Mozilla/5.0 (compatible; Hemppa/{self.bot.version}; {self.bot.client.user}; +https://github.com/vranki/hemppa/)" self.logger.debug(f"useragent: {self.useragent}") def matrix_stop(self, bot): super().matrix_stop(bot) bot.remove_callback(self.text_cb) def user_agent_for_url(self, url): if ('youtube.com' in url) or ('youtu.be' in url) or ('google.com' in url): return 'curl/7.64.0' return self.useragent # Currently not used, but for future needs: def cookies_for_url(self, url): if ('youtube.com' in url) or ('youtu.be' in url) or ('google.com' in url): cookies = httpx.Cookies() cookies.set('CONSENT', 'YES', domain='.youtube.com') return cookies # return {'CONSENT': 'YES', 'Domain': '.youtube.com', # 'Path': '/', 'SameSite' : 'None', 'Expires': 'Sun, 10 Jan 2038 07:59:59 GMT', 'Max-Age': '946080000'} return {} async def text_cb(self, room, event): """ Handle client callbacks for all room text events """ if self.bot.should_ignore_event(event): return # no content at all? if len(event.body) < 1: return if "content" in event.source: # skip edited content to prevent spamming the same thing multiple times if "m.new_content" in event.source["content"]: self.logger.debug("Skipping edited event to prevent spam") return # skip reply messages to prevent spam if "m.relates_to" in event.source["content"]: self.logger.debug("Skipping reply message to prevent spam") return # are we on in this room? status = self.status.get(room.room_id, "OFF") if status not in self.STATUSES: return if status == "OFF": return try: # extract possible urls from message urls = re.findall(r"(https?://\S+)", event.body) # no urls, nothing to do if len(urls) == 0: return # fetch the urls and if we can see a title spit it out for url in urls: # fix for #98 a bit ugly, but skip all matrix.to urls # those are 99.99% pills and should not # spam the channel with matrix.to titles if url.startswith("https://matrix.to/#/"): self.logger.debug(f"Skipping matrix.to url (#98): {url}") continue url_blacklisted = False for blacklisted in self.blacklist: if blacklisted in url: url_blacklisted = True if url_blacklisted: self.logger.debug(f"Skipping blacklisted url {url}") continue try: title, description = self.get_content_from_url(url) except Exception as e: self.logger.warning(f"could not fetch url: {e}") traceback.print_exc(file=sys.stderr) # failed fetching, give up continue msg = None if status == "TITLE" and title is not None: msg = f"Title: {title}" elif status == "DESCRIPTION" and description is not None: msg = f"Description: {description}" elif status == "BOTH" and title is not None and description is not None: msg = f"Title: {title}\nDescription: {description}" elif status == "BOTH" and title is not None: msg = f"Title: {title}" elif status == "BOTH" and description is not None: msg = f"Description: {description}" if msg is not None: await self.bot.send_text(room, msg, msgtype=self.type, bot_ignore=True) except Exception as e: self.logger.warning(f"Unexpected error in url module text_cb: {e}") traceback.print_exc(file=sys.stderr) @lru_cache(maxsize=128) def get_content_from_url(self, url): """ Fetch url and try to get the title and description from the response """ title = None description = None # timeout will still handle network timeouts timeout = httpx.Timeout(10.0) responsetext = "" # read our response here try: self.logger.debug(f"start streaming {url}") # stream the response so that we can set a upper limit on how much we want to fetch. # as we are using stream the r.text wont be available, save our read data ourself # maximum size to read of the response in characters (this prevents us from reading stream forever) maxsize = 800000 headers = { 'user-agent': self.user_agent_for_url(url) } # Google may break things anytime so here are some things to try: # If needed some day.. 'Set-Cookie': "CONSENT=YES; Domain=.youtube.com; Path=/; SameSite=None; Secure; Expires=Sun, 10 Jan 2038 07:59:59 GMT; Max-Age=946080000" # cookies = self.cookies_for_url(url) # print('cookies', url, cookies) # print('headers', headers) with httpx.stream("GET", url, timeout=timeout, headers=headers) as r: for part in r.iter_text(): # self.logger.debug( # f"reading response stream, limiting in {maxsize} bytes" # ) responsetext += part maxsize -= len(part) if maxsize < 0: break self.logger.debug(f"end streaming {url}") except Exception as e: self.logger.warning(f"Failed fetching url {url}. Error: {e}") return (title, description) if r.status_code != 200: self.logger.warning( f"Failed fetching url {url}. Status code: {r.status_code}" ) return (title, description) # try parse and get the title try: soup = BeautifulSoup(responsetext, "html.parser") if soup.title and len(soup.title.string) > 0: title = soup.title.string else: title_tag = soup.find("meta", attrs={"name": "title"}) ogtitle = soup.find("meta", property="og:title") if title_tag: title = title_tag.get("content", None) elif ogtitle: title = ogtitle["content"] elif soup.head and soup.head.title: title = soup.head.title.string.strip() descr_tag = soup.find("meta", attrs={"name": "description"}) if descr_tag: description = descr_tag.get("content", None) except Exception as e: self.logger.warning(f"Failed parsing response from url {url}. Error: {e}") return (title, description) # Title should not contain newlines or tabs if title is not None: assert isinstance(title, str) title = title.replace("\n", "") title = title.replace("\t", "") return (title, description) async def matrix_message(self, bot, room, event): """ commands for setting what to do in this channel """ bot.must_be_admin(room, event) args = shlex.split(event.body) args.pop(0) # save the new status if len(args) == 1 and self.STATUSES.get(args[0].upper()) is not None: self.status[room.room_id] = args[0].upper() bot.save_settings() await bot.send_text( room, f"Ok, {self.STATUSES.get(self.status[room.room_id])}" ) return # show status elif len(args) == 1 and args[0] == "status": status = self.STATUSES.get(self.status.get(room.room_id, "OFF")) + f', URL blacklist: {self.blacklist}' await bot.send_text( room, status ) return # set type to notice elif len(args) == 1 and args[0] == "notice": bot.must_be_owner(event) self.type = "m.notice" bot.save_settings() await bot.send_text(room, "Sending titles as notices from now on.") return # show status elif len(args) == 1 and args[0] == "text": bot.must_be_owner(event) self.type = "m.text" bot.save_settings() await bot.send_text(room, "Sending titles as text from now on.") return # set blacklist elif len(args) == 2 and args[0] == "blacklist": bot.must_be_owner(event) if args[1] == 'clear': self.blacklist = [] else: self.blacklist = args[1].split(',') bot.save_settings() await bot.send_text(room, f"Blacklisted URLs set to {self.blacklist}") return # invalid command await bot.send_text( room, "Sorry, I did not understand. See README for command list.", ) return def get_settings(self): data = super().get_settings() data["status"] = self.status data["type"] = self.type data["blacklist"] = self.blacklist return data def set_settings(self, data): super().set_settings(data) if data.get("status"): self.status = data["status"] if data.get("type"): self.type = data["type"] if data.get("blacklist"): self.blacklist = data["blacklist"] def help(self): return "If I see a url in a message I will try to get the title from the page and spit it out"