Removed unused modules

This commit is contained in:
Jarno Rankinen 2022-11-20 12:25:01 +02:00
parent bb1057800b
commit 8613c57eec
3 changed files with 0 additions and 554 deletions

View File

@ -1,65 +0,0 @@
import shlex
import os
from datetime import datetime
from .common.module import BotModule
class MatrixModule(BotModule):
daily_commands = dict() # room_id -> command json
last_hour = datetime.now().hour
async def matrix_message(self, bot, room, event):
bot.must_be_admin(room, event)
args = shlex.split(event.body)
args.pop(0)
if len(args) == 3:
if args[0] == 'daily':
dailytime = int(args[1])
dailycmd = args[2]
if not self.daily_commands.get(room.room_id):
self.daily_commands[room.room_id] = []
self.daily_commands[room.room_id].append(
{'time': dailytime, 'command': dailycmd})
bot.save_settings()
await bot.send_text(room, 'Daily command added.')
elif len(args) == 1:
if args[0] == 'list':
await bot.send_text(room, 'Daily commands on this room: ' + str(self.daily_commands.get(room.room_id)))
elif args[0] == 'clear':
self.daily_commands.pop(room.room_id, None)
bot.save_settings()
await bot.send_text(room, 'Cleared commands on this room.')
elif args[0] == 'time':
await bot.send_text(room, '{datetime} {timezone}'.format(datetime=datetime.now(), timezone=os.environ.get('TZ')))
def help(self):
return ('Runs scheduled commands')
def get_settings(self):
data = super().get_settings()
data['daily_commands'] = self.daily_commands
return data
def set_settings(self, data):
super().set_settings(data)
if data.get('daily_commands'):
self.daily_commands = data['daily_commands']
async def matrix_poll(self, bot, pollcount):
delete_rooms = []
if self.last_hour != datetime.now().hour:
self.last_hour = datetime.now().hour
for room_id in self.daily_commands:
if room_id in bot.client.rooms:
commands = self.daily_commands[room_id]
for command in commands:
if int(command['time']) == self.last_hour:
await bot.send_text(bot.get_room_by_id(room_id), command['command'], 'm.text')
else:
delete_rooms.append(room_id)
for roomid in delete_rooms:
self.daily_commands.pop(roomid, None)

View File

@ -1,182 +0,0 @@
from __future__ import print_function
import os
import os.path
import pickle
from datetime import datetime, timedelta
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
#
# Google calendar notifications
#
# Note: Provide a token.pickle file for the service.
# It's created on first run (run from console!) and
# can be copied to another computer.
#
from modules.common.module import BotModule
class MatrixModule(BotModule):
def __init__(self, name):
super().__init__(name)
self.credentials_file = "credentials.json"
self.SCOPES = ['https://www.googleapis.com/auth/calendar.readonly']
self.bot = None
self.service = None
self.calendar_rooms = dict() # Contains room_id -> [calid, calid] ..
self.enabled = False
def matrix_start(self, bot):
super().matrix_start(bot)
self.bot = bot
creds = None
if not os.path.exists(self.credentials_file) or os.path.getsize(self.credentials_file) == 0:
return # No-op if not set up
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
self.logger.info('Loaded existing pickle file')
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
self.logger.warn('No credentials or credentials not valid!')
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(self.credentials_file, self.SCOPES)
# urn:ietf:wg:oauth:2.0:oob
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
self.logger.info('Pickle saved')
self.service = build('calendar', 'v3', credentials=creds)
try:
calendar_list = self.service.calendarList().list().execute()['items']
self.logger.info(f'Google calendar set up successfully with access to {len(calendar_list)} calendars:\n')
for calendar in calendar_list:
self.logger.info(f"{calendar['summary']} - + {calendar['id']}")
except Exception:
self.logger.error('Getting calendar list failed!')
async def matrix_message(self, bot, room, event):
if not self.service:
await bot.send_text(room, 'Google calendar not set up for this bot.')
return
args = event.body.split()
events = []
calendars = self.calendar_rooms.get(room.room_id) or []
if len(args) == 2:
if args[1] == 'today':
for calid in calendars:
self.logger.info(f'Listing events in cal {calid}')
events = events + self.list_today(calid)
if args[1] == 'list':
await bot.send_text(room, 'Calendars in this room: ' + str(self.calendar_rooms.get(room.room_id)))
return
elif len(args) == 3:
if args[1] == 'add':
bot.must_be_admin(room, event)
calid = args[2]
self.logger.info(f'Adding calendar {calid} to room id {room.room_id}')
if self.calendar_rooms.get(room.room_id):
if calid not in self.calendar_rooms[room.room_id]:
self.calendar_rooms[room.room_id].append(calid)
else:
await bot.send_text(room, 'This google calendar already added in this room!')
return
else:
self.calendar_rooms[room.room_id] = [calid]
self.logger.info(f'Calendars now for this room {self.calendar_rooms.get(room.room_id)}')
bot.save_settings()
await bot.send_text(room, 'Added new google calendar to this room')
return
if args[1] == 'del':
bot.must_be_admin(room, event)
calid = args[2]
self.logger.info(f'Removing calendar {calid} from room id {room.room_id}')
if self.calendar_rooms.get(room.room_id):
self.calendar_rooms[room.room_id].remove(calid)
self.logger.info(f'Calendars now for this room {self.calendar_rooms.get(room.room_id)}')
bot.save_settings()
await bot.send_text(room, 'Removed google calendar from this room')
return
else:
for calid in calendars:
self.logger.info(f'Listing events in cal {calid}')
events = events + self.list_upcoming(calid)
if len(events) > 0:
self.logger.info(f'Found {len(events)} events')
await self.send_events(bot, events, room)
else:
self.logger.info(f'No events found')
await bot.send_text(room, 'No events found, try again later :)')
async def send_events(self, bot, events, room):
for event in events:
start = event['start'].get('dateTime', event['start'].get('date'))
await bot.send_html(room, f'{self.parse_date(start)} <a href="{event["htmlLink"]}">{event["summary"]}</a>',
f'{self.parse_date(start)} {event["summary"]}')
def list_upcoming(self, calid):
startTime = datetime.utcnow()
now = startTime.isoformat() + 'Z'
events_result = self.service.events().list(calendarId=calid, timeMin=now,
maxResults=10, singleEvents=True,
orderBy='startTime').execute()
events = events_result.get('items', [])
return events
def list_today(self, calid):
startTime = datetime.utcnow()
startTime = startTime.replace(hour=0, minute=0, second=0, microsecond=0)
endTime = startTime + timedelta(hours=24)
now = startTime.isoformat() + 'Z'
end = endTime.isoformat() + 'Z'
self.logger.info(f'Looking for events between {now} {end}')
events_result = self.service.events().list(calendarId=calid, timeMin=now,
timeMax=end, maxResults=10, singleEvents=True,
orderBy='startTime').execute()
return events_result.get('items', [])
def help(self):
return 'Google calendar. Lists 10 next events by default. today = list today\'s events.'
def get_settings(self):
data = super().get_settings()
data['calendar_rooms'] = self.calendar_rooms
return data
def set_settings(self, data):
super().set_settings(data)
if data.get('calendar_rooms'):
self.calendar_rooms = data['calendar_rooms']
def parse_date(self, start):
try:
dt = datetime.strptime(start, '%Y-%m-%dT%H:%M:%S%z')
return dt.strftime("%d.%m %H:%M")
except ValueError:
dt = datetime.strptime(start, '%Y-%m-%d')
return dt.strftime("%d.%m")

View File

@ -1,307 +0,0 @@
import re
import shlex
from functools import lru_cache
import httpx
import sys
import traceback
from bs4 import BeautifulSoup
from nio import RoomMessageText
from modules.common.module import BotModule
class MatrixModule(BotModule):
"""
Simple url fetch and spit out title module.
Everytime a url is seen in a message we do http request to it and try to get a title tag contents to spit out to the room.
"""
def __init__(self, name):
super().__init__(name)
self.bot = None
self.status = dict() # room_id -> what to do with urls
self.type = "m.notice" # notice or text
# this will be extended when matrix_start is called
self.useragent = "Mozilla/5.0 (compatible; Hemppa; +https://github.com/vranki/hemppa/)"
self.STATUSES = {
"OFF": "Not spamming this channel",
"TITLE": "Spamming this channel with titles",
"DESCRIPTION": "Spamming this channel with descriptions",
"BOTH": "Spamming this channel with both title and description",
}
self.blacklist = [ ]
self.enabled = False
def matrix_start(self, bot):
"""
Register callback for all RoomMessageText events on startup
"""
super().matrix_start(bot)
self.bot = bot
bot.client.add_event_callback(self.text_cb, RoomMessageText)
# extend the useragent string to contain version and bot name
self.useragent = f"Mozilla/5.0 (compatible; Hemppa/{self.bot.version}; {self.bot.client.user}; +https://github.com/vranki/hemppa/)"
self.logger.debug(f"useragent: {self.useragent}")
def matrix_stop(self, bot):
super().matrix_stop(bot)
bot.remove_callback(self.text_cb)
def user_agent_for_url(self, url):
if ('youtube.com' in url) or ('youtu.be' in url) or ('google.com' in url):
return 'curl/7.64.0'
return self.useragent
# Currently not used, but for future needs:
def cookies_for_url(self, url):
if ('youtube.com' in url) or ('youtu.be' in url) or ('google.com' in url):
cookies = httpx.Cookies()
cookies.set('CONSENT', 'YES', domain='.youtube.com')
return cookies
# return {'CONSENT': 'YES', 'Domain': '.youtube.com',
# 'Path': '/', 'SameSite' : 'None', 'Expires': 'Sun, 10 Jan 2038 07:59:59 GMT', 'Max-Age': '946080000'}
return {}
async def text_cb(self, room, event):
"""
Handle client callbacks for all room text events
"""
if self.bot.should_ignore_event(event):
return
# no content at all?
if len(event.body) < 1:
return
if "content" in event.source:
# skip edited content to prevent spamming the same thing multiple times
if "m.new_content" in event.source["content"]:
self.logger.debug("Skipping edited event to prevent spam")
return
# skip reply messages to prevent spam
if "m.relates_to" in event.source["content"]:
self.logger.debug("Skipping reply message to prevent spam")
return
# are we on in this room?
status = self.status.get(room.room_id, "OFF")
if status not in self.STATUSES:
return
if status == "OFF":
return
try:
# extract possible urls from message
urls = re.findall(r"(https?://\S+)", event.body)
# no urls, nothing to do
if len(urls) == 0:
return
# fetch the urls and if we can see a title spit it out
for url in urls:
# fix for #98 a bit ugly, but skip all matrix.to urls
# those are 99.99% pills and should not
# spam the channel with matrix.to titles
if url.startswith("https://matrix.to/#/"):
self.logger.debug(f"Skipping matrix.to url (#98): {url}")
continue
url_blacklisted = False
for blacklisted in self.blacklist:
if blacklisted in url:
url_blacklisted = True
if url_blacklisted:
self.logger.debug(f"Skipping blacklisted url {url}")
continue
try:
title, description = self.get_content_from_url(url)
except Exception as e:
self.logger.warning(f"could not fetch url: {e}")
traceback.print_exc(file=sys.stderr)
# failed fetching, give up
continue
msg = ""
if status == "TITLE" and title is not None:
msg = f"Title: {title}"
elif status == "DESCRIPTION" and description is not None:
msg = f"Description: {description}"
elif status == "BOTH" and title is not None and description is not None:
msg = f"Title: {title}\nDescription: {description}"
elif status == "BOTH" and title is not None:
msg = f"Title: {title}"
elif status == "BOTH" and description is not None:
msg = f"Description: {description}"
if msg.strip(): # Evaluates to true on non-empty strings
await self.bot.send_text(room, msg, msgtype=self.type, bot_ignore=True)
except Exception as e:
self.logger.warning(f"Unexpected error in url module text_cb: {e}")
traceback.print_exc(file=sys.stderr)
@lru_cache(maxsize=128)
def get_content_from_url(self, url):
"""
Fetch url and try to get the title and description from the response
"""
title = None
description = None
# timeout will still handle network timeouts
timeout = httpx.Timeout(10.0)
responsetext = "" # read our response here
try:
self.logger.debug(f"start streaming {url}")
# stream the response so that we can set a upper limit on how much we want to fetch.
# as we are using stream the r.text wont be available, save our read data ourself
# maximum size to read of the response in characters (this prevents us from reading stream forever)
maxsize = 800000
headers = {
'user-agent': self.user_agent_for_url(url)
}
# Google may break things anytime so here are some things to try:
# If needed some day.. 'Set-Cookie': "CONSENT=YES; Domain=.youtube.com; Path=/; SameSite=None; Secure; Expires=Sun, 10 Jan 2038 07:59:59 GMT; Max-Age=946080000"
# cookies = self.cookies_for_url(url)
# print('cookies', url, cookies)
# print('headers', headers)
with httpx.stream("GET", url, timeout=timeout, headers=headers) as r:
for part in r.iter_text():
# self.logger.debug(
# f"reading response stream, limiting in {maxsize} bytes"
# )
responsetext += part
maxsize -= len(part)
if maxsize < 0:
break
self.logger.debug(f"end streaming {url}")
except Exception as e:
self.logger.warning(f"Failed fetching url {url}. Error: {e}")
return (title, description)
if r.status_code != 200:
self.logger.warning(
f"Failed fetching url {url}. Status code: {r.status_code}"
)
return (title, description)
# try parse and get the title
try:
soup = BeautifulSoup(responsetext, "html.parser")
if soup.title and len(soup.title.string) > 0:
title = soup.title.string
else:
title_tag = soup.find("meta", attrs={"name": "title"})
ogtitle = soup.find("meta", property="og:title")
if title_tag:
title = title_tag.get("content", None)
elif ogtitle:
title = ogtitle["content"]
elif soup.head and soup.head.title:
title = soup.head.title.string.strip()
descr_tag = soup.find("meta", attrs={"name": "description"})
if descr_tag:
description = descr_tag.get("content", None)
except Exception as e:
self.logger.warning(f"Failed parsing response from url {url}. Error: {e}")
return (title, description)
# Title should not contain newlines or tabs
if title is not None:
assert isinstance(title, str)
title = title.replace("\n", "")
title = title.replace("\t", "")
return (title, description)
async def matrix_message(self, bot, room, event):
"""
commands for setting what to do in this channel
"""
bot.must_be_admin(room, event)
args = shlex.split(event.body)
args.pop(0)
# save the new status
if len(args) == 1 and self.STATUSES.get(args[0].upper()) is not None:
self.status[room.room_id] = args[0].upper()
bot.save_settings()
await bot.send_text(
room, f"Ok, {self.STATUSES.get(self.status[room.room_id])}"
)
return
# show status
elif len(args) == 1 and args[0] == "status":
status = self.STATUSES.get(self.status.get(room.room_id, "OFF")) + f', URL blacklist: {self.blacklist}'
await bot.send_text(
room, status
)
return
# set type to notice
elif len(args) == 1 and args[0] == "notice":
bot.must_be_owner(event)
self.type = "m.notice"
bot.save_settings()
await bot.send_text(room, "Sending titles as notices from now on.")
return
# show status
elif len(args) == 1 and args[0] == "text":
bot.must_be_owner(event)
self.type = "m.text"
bot.save_settings()
await bot.send_text(room, "Sending titles as text from now on.")
return
# set blacklist
elif len(args) == 2 and args[0] == "blacklist":
bot.must_be_owner(event)
if args[1] == 'clear':
self.blacklist = []
else:
self.blacklist = args[1].split(',')
bot.save_settings()
await bot.send_text(room, f"Blacklisted URLs set to {self.blacklist}")
return
# invalid command
await bot.send_text(
room,
"Sorry, I did not understand. See README for command list.",
)
return
def get_settings(self):
data = super().get_settings()
data["status"] = self.status
data["type"] = self.type
data["blacklist"] = self.blacklist
return data
def set_settings(self, data):
super().set_settings(data)
if data.get("status"):
self.status = data["status"]
if data.get("type"):
self.type = data["type"]
if data.get("blacklist"):
self.blacklist = data["blacklist"]
def help(self):
return "If I see a url in a message I will try to get the title from the page and spit it out"