Compare commits

...

12 Commits

25 changed files with 151 additions and 2100 deletions

View File

@ -12,8 +12,11 @@ RUN pip install -r requirements.txt
COPY bot.py *.json *.pickle /bot/
COPY config config
COPY modules modules
VOLUME /bot/config
RUN useradd -m HomeBot && chown HomeBot -R /bot && apt install curl jq -y
USER HomeBot
WORKDIR /bot
COPY modules modules
CMD [ "python", "-u", "./bot.py" ]

127
modules/cam.py Normal file
View File

@ -0,0 +1,127 @@
import re
from modules.common.module import BotModule
import requests
class MatrixModule(BotModule):
def __init__(self,name):
super().__init__(name)
self.motionurl = 'http://localhost:8080'
self.cameras = []
self.allowed_cmds = {
'config': ['list','set','get','write'],
'detection': ['status','connection','start','pause'],
'action': ['eventstart','eventend','snapshot','restart','quit','end']
}
self.restricted_cmds = ['list','set','get','write','start','pause','restart','quit','end']
self.helptext = """Control the motion daemon.
Available commands:
- config list|set|get|write
- detection status|connection|start|pause
- action eventstart|eventend|snapshot|restart|quit|end
- url get|set <motionurl>
Usage: '!cam <id> category command'
<id> is the numerical id of the camera. Use 0 for all cameras.
If <id> is omitted, 0 is assumed."""
def get_settings(self):
data = super().get_settings()
data['motionurl'] = self.motionurl
data['cameras'] = self.cameras
return data
def set_settings(self, data):
super().set_settings(data)
if data.get('motionurl'):
self.motionurl = data['motionurl']
if data.get('cameras'):
self.cameras = data['cameras']
async def matrix_message(self, bot, room, event):
args = event.body.split()
args.pop(0)
if args[0] == 'help':
await bot.send_text(room, self.helptext, event)
return
elif args[0] == 'url':
if args[1] == 'set':
newurl = args[2]
bot.must_be_owner(event)
self.motionurl = newurl
bot.save_settings()
await bot.send_text(room, f"Motion API URL set to {self.motionurl}", event)
elif args[1] == 'get':
await bot.send_text(room, f"Motion URL is currently {self.motionurl}", event)
elif args[0] == 'cameras':
if args[1] == 'set':
bot.must_be_owner(event)
self.cameras = args[2:]
bot.save_settings()
await bot.send_text(room, "Updated camera id list", event)
elif args[1] == 'get':
camstr = ''
if len(self.cameras) == 0:
await bot.send_text(room, "No camera ids configured", event)
else:
for n, cam in enumerate(self.cameras):
camstr = camstr + cam
if n < len(self.cameras) - 1:
camstr = camstr + ","
await bot.send_text(room, f"Following camera ids are configured:\n{camstr}", event)
else:
cmdindex = 1
try:
# Check if first argument is numeric (camera id)
camid = int(args[0])
camid = str(camid)
except ValueError:
cmdindex = 0
camid = '0'
category = args[cmdindex]
## Quick commands start
if category == 'now':
if camid != '0':
await self.get_snapshot(camid, bot, room, event)
elif camid == '0' and len(self.cameras) > 0:
for cam in self.cameras:
await self.get_snapshot(cam, bot, room, event)
else:
self.logger.info("User requested snapshots with id 0, but no camera id list configured")
await bot.send_text(room, "No camera ids configured", event)
return
## Quick commands end
if category not in self.allowed_cmds:
await bot.send_text(room, f'Unknown category: "{args[1]}"', event)
return
cmdindex = cmdindex + 1
if args[cmdindex] not in self.allowed_cmds[category]:
await bot.send_text(room, f'Unknown command: "{args[cmdindex]}"', event)
return
command = args[cmdindex]
req_url = f'{self.motionurl}/{camid}/{category}/{command}'
if command in self.restricted_cmds:
bot.must_be_owner(event)
if category == 'config' and command == 'get':
queryparam = args[cmdindex + 1]
req_url = f'{req_url}?query={queryparam}'
elif category == 'config' and command == 'set':
param = args[cmdindex + 1]
value = args[cmdindex + 2]
req_url = f'{req_url}?{param}={value}'
if camid != 0 and command == 'snapshot':
await self.get_snapshot(camid, bot, room, event)
resp = requests.get(req_url).text
await bot.send_text(room, resp, event)
async def get_snapshot(self, camid, bot, room, event):
imgurl = f"{self.motionurl.replace(':8080',':8081')}/{camid}/current"
self.logger.info(f"Fetching image from {imgurl}")
await bot.upload_and_send_image(room, imgurl, event, no_cache=True)
def help(self):
return self.helptext.splitlines()[0]

View File

@ -1,270 +0,0 @@
from logging import log
import sys
import traceback
import json
import time
import datetime
import requests
import urllib3
from datetime import datetime, timedelta
from random import randrange
from modules.common.module import BotModule
urllib3.disable_warnings()
# API docs at: https://gitlab.com/lemoidului/ogn-flightbook/-/blob/master/doc/API.md
class FlightBook:
def __init__(self):
self.base_url = 'https://flightbook.glidernet.org/api'
self.AC_TYPES = [ '?', 'Glider', 'Towplane', \
'Helicopter', 'Parachute', 'Drop plane', 'Hang glider', \
'Paraglider', 'Powered', 'Jet', 'UFO', 'Balloon', \
'Airship', 'UAV', '?', 'Static object' ]
self.logged_flights = dict() # station -> [index of flight]
self.device_cache = dict() # Registration -> [address, CN]
def get_flights(self, icao):
log_url = f'{self.base_url}/logbook/{icao}'
data = None
with requests.Session() as session:
response = session.get(log_url, headers={'Connection': 'close'}, verify=False)
data = response.json()
# print(json.dumps(data, sort_keys=True, indent=4))
self.update_device_cache(data)
return data
def update_device_cache(self, data):
devices = data['devices']
for device in devices:
if device["address"] and device["registration"]:
cache_entry = [device["address"], device["competition"]]
self.device_cache[device["registration"]] = cache_entry
def address_for_registration(self, registration):
for reg in self.device_cache.keys():
if reg.lower() == registration.lower():
return self.device_cache[reg][0]
return None
def address_for_cn(self, cn):
for reg in self.device_cache.keys():
if self.device_cache[reg][1] == cn.upper():
return self.device_cache[reg][0]
return None
def format_time(self, time):
if not time:
return '··:··'
time = time.replace('h', ':')
return time
def flight2string(self, flight, data):
devices = data['devices']
device = devices[flight['device']]
start = self.format_time(flight["start"])
end = self.format_time(flight["stop"])
duration = ' '
if flight["duration"]:
duration = time.strftime('%H:%M', time.gmtime(flight["duration"]))
maxalt = ''
if flight["max_alt"]:
maxalt = str(flight["max_alt"]) + 'm'
identity = f'{device.get("registration") or ""} {device.get("aircraft") or ""} {device.get("competition") or ""} {maxalt}'
identity = ' '.join(identity.split())
return f'{start} - {end} {duration} {identity}'
def print_flights(self, data, showtow=False):
print(f'✈ Flights at {data["airfield"]["name"]} ({data["airfield"]["code"]}) {data["date"]}:')
flights = data['flights']
for flight in flights:
if not showtow and flight["towing"]:
continue
print(self.flight2string(flight, data))
def test():
fb = FlightBook()
data = fb.get_flights('LFMX')
fb.print_flights(data)
class MatrixModule(BotModule):
def __init__(self, name):
super().__init__(name)
self.service_name = 'FLOG'
self.station_rooms = dict() # Roomid -> ogn station
self.live_rooms = [] # Roomid's with live enabled
self.logged_flights = dict() # Station -> number of flights
self.first_poll = True
self.enabled = False
self.fb = FlightBook()
def matrix_start(self, bot):
super().matrix_start(bot)
self.add_module_aliases(bot, ['sar'])
async def matrix_poll(self, bot, pollcount):
if pollcount % (6 * 5) == 0: # Poll every 5 min
await self.poll_implementation(bot)
async def poll_implementation(self, bot):
for roomid in self.live_rooms:
station = self.station_rooms[roomid]
data = self.fb.get_flights(station)
if not data:
self.logger.warning(f"FLOG: Failed to get flights at {station}!")
return
flights = data['flights']
if len(flights) == 0 or (not station in self.logged_flights):
self.logged_flights[station] = []
#print('Reset flight count for station ' + station)
# else:
# print(f'Got {len(flights)} flights at {station}')
flightindex = 0
for flight in flights:
if flight["towing"]:
continue
if flight["stop"]:
if not flightindex in self.logged_flights[station]:
if not self.first_poll:
await bot.send_text(bot.get_room_by_id(roomid), self.fb.flight2string(flight, data))
self.logged_flights[station].append(flightindex)
# print(f'Logged flights at {station} now {self.logged_flights[station]}')
flightindex = flightindex + 1
self.first_poll = False
async def matrix_message(self, bot, room, event):
args = event.body.split()
if len(args) == 1 and args[0] == "!flog":
if room.room_id in self.station_rooms:
station = self.station_rooms[room.room_id]
await self.show_flog(bot, room, station)
else:
await bot.send_text(room, 'No OGN station set for this room - set it first.')
elif len(args) == 2 and args[0] == "!flog":
if args[1] == 'rmstation':
bot.must_be_admin(room, event)
del self.station_rooms[room.room_id]
self.live_rooms.remove(room.room_id)
await bot.send_text(room, f'Cleared OGN station for this room')
elif args[1] == 'status':
print(self.logged_flights)
print(self.fb.device_cache)
bot.must_be_admin(room, event)
await bot.send_text(room, f'OGN station for this room: {self.station_rooms.get(room.room_id)}, live updates enabled: {room.room_id in self.live_rooms}')
elif args[1] == 'poll':
bot.must_be_admin(room, event)
await self.poll_implementation(bot)
elif args[1] == 'live':
bot.must_be_admin(room, event)
self.live_rooms.append(room.room_id)
bot.save_settings()
await bot.send_text(room, f'Sending live updates for station {self.station_rooms.get(room.room_id)} to this room')
elif args[1] == 'rmlive':
bot.must_be_admin(room, event)
self.live_rooms.remove(room.room_id)
bot.save_settings()
await bot.send_text(room, f'Not sending live updates for station {self.station_rooms.get(room.room_id)} to this room anymore')
else:
# Assume parameter is a station name
station = args[1]
await self.show_flog(bot, room, station)
elif len(args) == 2 and args[0] == "!sar":
registration = args[1]
address = self.fb.address_for_registration(registration)
if not address:
cn = args[1]
address = self.fb.address_for_cn(cn)
coords = None
if address:
coords = self.get_coords_for_address(address)
if coords:
await bot.send_location(room, f'{registration} ({coords["utc"]})', coords["lat"], coords["lng"])
else:
await bot.send_text(room, f'No Flarm ID found for {registration}!')
elif len(args) == 3 and args[0] == "!flog":
if args[1] == 'station':
bot.must_be_admin(room, event)
station = args[2]
self.station_rooms[room.room_id] = station
self.logger.info(f'Station now for this room {self.station_rooms.get(room.room_id)}')
bot.save_settings()
await bot.send_text(room, f'Set OGN station {station} to this room')
def get_coords_for_address(self, address):
# https://flightbook.glidernet.org/api/live/address/~91DADF5B86
url = f'{self.fb.base_url}/live/address/{address}'
data = None
with requests.Session() as session:
response = session.get(url, headers={'Connection': 'close'}, verify=False)
data = response.json()
# print(json.dumps(data, sort_keys=True, indent=4))
return data
def text_flog(self, data, showtow):
out = ""
if len(data["flights"]) == 0:
out = f'No known flights today at {data["airfield"]["name"]}'
else:
out = f'Flights at {data["airfield"]["name"]} ({data["airfield"]["code"]}) {data["date"]}:' + "\n"
flights = data['flights']
for flight in flights:
if not showtow and flight["towing"]:
continue
out = out + self.fb.flight2string(flight, data) + "\n"
return out
def html_flog(self, data, showtow):
out = ""
if len(data["flights"]) == 0:
out = f'No known flights today at {data["airfield"]["name"]}'
else:
out = f'<b>✈ Flights at {data["airfield"]["name"]} ({data["airfield"]["code"]}) {data["date"]}:' + "</b>\n"
flights = data['flights']
out = out + "<ul>"
for flight in flights:
if not showtow and flight["towing"]:
continue
out = out + "<li>" + self.fb.flight2string(flight, data) + "</li>\n"
out = out + "</ul>"
return out
async def show_flog(self, bot, room, station):
data = self.fb.get_flights(station)
if data:
await bot.send_html(room, self.html_flog(data, False), self.text_flog(data, False))
else:
await bot.send_text(room, f"Failed to get flight log for {station}")
def get_settings(self):
data = super().get_settings()
data['station_rooms'] = self.station_rooms
data['live_rooms'] = self.live_rooms
return data
def set_settings(self, data):
super().set_settings(data)
if data.get('station_rooms'):
self.station_rooms = data['station_rooms']
if data.get('live_rooms'):
self.live_rooms = data['live_rooms']
def help(self):
return ('Open Glider Network Field Log')

View File

@ -1,100 +0,0 @@
import urllib.request
import urllib.parse
import urllib.error
import requests
from nio import AsyncClient, UploadError
from nio import UploadResponse
from collections import namedtuple
from modules.common.module import BotModule
class gfycat(object):
"""
A very simple module that allows you to
1. search a gif on gfycat from a remote location
"""
# Urls
url = "https://api.gfycat.com"
def __init__(self):
super(gfycat, self).__init__()
def __fetch(self, url, param):
import json
try:
# added simple User-Ajent string to avoid CloudFlare block this request
headers = {'User-Agent': 'Mozilla/5.0'}
req = urllib.request.Request(url+param, headers=headers)
connection = urllib.request.urlopen(req).read()
except urllib.error.HTTPError as err:
raise ValueError(err.read())
result = namedtuple("result", "raw json")
return result(raw=connection, json=json.loads(connection))
def search(self, param):
result = self.__fetch(self.url, "/v1/gfycats/search?search_text=%s" % urllib.parse.quote_plus(param))
if "errorMessage" in result.json:
raise ValueError("%s" % self.json["errorMessage"])
return _gfycatSearch(result)
class _gfycatUtils(object):
"""
A utility class that provides the necessary common
for all the other classes
"""
def __init__(self, param, json):
super(_gfycatUtils, self).__init__()
# This can be used for other functions related to this class
self.res = param
self.js = json
def raw(self):
return self.res.raw
def json(self):
return self.js
def __len__(self):
return len(self.js)
def get(self, what):
try:
return self.js[what]
except KeyError as error:
return ("Sorry, can't find %s" % error)
class _gfycatSearch(_gfycatUtils):
"""
This class will provide more information for an existing url
"""
def __init__(self, param):
super(_gfycatSearch, self).__init__(param, param.json["gfycats"])
class MatrixModule(BotModule):
async def matrix_message(self, bot, room, event):
args = event.body.split()
if len(args) > 1:
gif_url = "No image found"
query = event.body[len(args[0])+1:]
try:
gifs = gfycat().search(query)
if len(gifs) < 1:
await bot.send_text(room, gif_url)
return
gif_url = gifs.get(0)["content_urls"]["largeGif"]["url"]
await bot.upload_and_send_image(room, gif_url)
except Exception as exc:
gif_url = str(exc)
await bot.send_text(room, gif_url)
else:
await bot.send_text(room, 'Usage: !gfycat <query>')
def help(self):
return ('Gfycat bot')

View File

@ -1,134 +0,0 @@
from github import Github
import re
import json
from modules.common.module import BotModule
# Helper class with reusable code for github project stuff
class GithubProject:
# New format to support array of colors: domains={"koneet":["#BFDADC","#0CBBF0","#0CBBF0","#E15D19","#ED49CF"],"tilat":["#0E8A16","#1E8A16"]}
def get_domains(description):
p = re.compile('domains=\{.*\}')
matches = json.loads(p.findall(description)[0][8:])
return matches
def get_domain(reponame, domain):
g = Github()
repo = g.get_repo(reponame)
domains = GithubProject.get_domains(repo.description)
if(not len(domains)):
return None, None
domain_colors = domains.get(domain, None)
if not domain_colors:
return None, None
open_issues = repo.get_issues(state='open')
domain_labels = []
labels = repo.get_labels()
for label in labels:
for domain_color in domain_colors:
if label.color == domain_color[1:]:
domain_labels.append(label)
domain_issues = dict()
domain_ok = []
for label in domain_labels:
label_issues = []
for issue in open_issues:
if label in issue.labels:
label_issues.append(issue)
if len(label_issues):
domain_issues[label.name] = label_issues
else:
domain_ok.append(label.name)
return domain_issues, domain_ok
def domain_to_string(reponame, issues, ok):
text_out = reponame + ":\n"
for label in issues.keys():
text_out = text_out + f'{label}: '
for issue in issues[label]:
# todo: add {issue.html_url} when URL previews can be disabled
text_out = text_out + f'[{issue.title}] '
text_out = text_out + f'\n'
text_out = text_out + " OK : " + ', '.join(ok)
return text_out
def domain_to_html(reponame, issues, ok):
html_out = f'<b>{reponame}:</b> <br/>'
for label in issues.keys():
html_out = html_out + f'🚧 {label}: '
for issue in issues[label]:
# todo: add {issue.html_url} when URL previews can be disabled
html_out = html_out + f'[{issue.title}] '
html_out = html_out + f'<br/>'
html_out = html_out + " OK ☑️ " + ', '.join(ok)
return html_out
class MatrixModule(BotModule):
def __init__(self, name):
super().__init__(name)
self.repo_rooms = dict()
async def matrix_message(self, bot, room, event):
args = event.body.split()
args.pop(0)
if len(args) == 1:
if args[0] == 'rmrepo':
bot.must_be_admin(room, event)
del self.repo_rooms[room.room_id]
await bot.send_text(room, 'Github repo removed from this room.')
bot.save_settings()
return
if args[0] == 'repo':
await bot.send_text(room, f'Github repo for this room is {self.repo_rooms.get(room.room_id, "not set")}.')
return
domain = args[0]
reponame = self.repo_rooms.get(room.room_id, None)
if reponame:
issues, ok = GithubProject.get_domain(reponame, domain)
if issues or ok:
await self.send_domain_status(bot, room, reponame, issues, ok)
else:
await bot.send_text(room, f'No labels with domain {domain} found.')
else:
await bot.send_text(room, f'No github repo set for this room. Use setrepo to set it.')
return
if len(args) == 2:
if args[0] == 'setrepo':
bot.must_be_admin(room, event)
reponame = args[1]
self.logger.info(f'Adding repo {reponame} to room id {room.room_id}')
self.repo_rooms[room.room_id] = reponame
await bot.send_text(room, f'Github repo {reponame} set to this room.')
bot.save_settings()
return
await bot.send_text(room, 'Unknown command')
async def send_domain_status(self, bot, room, reponame, issues, ok):
text_out = GithubProject.domain_to_string(reponame, issues, ok)
html_out = GithubProject.domain_to_html(reponame, issues, ok)
await bot.send_html(room, html_out, text_out)
def help(self):
return 'Github asset management'
def get_settings(self):
data = super().get_settings()
data["repo_rooms"] = self.repo_rooms
return data
def set_settings(self, data):
super().set_settings(data)
if data.get("repo_rooms"):
self.repo_rooms = data["repo_rooms"]

View File

@ -1,60 +0,0 @@
import urllib.request
import urllib.parse
import urllib.error
import os
import giphypop
import requests
from nio import AsyncClient, UploadError
from nio import UploadResponse
from collections import namedtuple
from modules.common.module import BotModule
class MatrixModule(BotModule):
api_key = None
async def matrix_message(self, bot, room, event):
args = event.body.split()
if len(args) == 3 and args[1] == 'apikey':
bot.must_be_owner(event)
self.api_key = args[2]
bot.save_settings()
await bot.send_text(room, 'Api key set')
elif len(args) > 1:
gif_url = "No image found"
query = event.body[len(args[0])+1:]
try:
g = giphypop.Giphy(api_key=self.api_key)
gifs = []
try:
for x in g.search(phrase=query, limit=1):
gifs.append(x)
except Exception:
pass
if len(gifs) < 1:
await bot.send_text(room, gif_url)
return
gif_url = gifs[0].media_url
await bot.upload_and_send_image(room, gif_url)
return
except Exception as exc:
gif_url = str(exc)
await bot.send_text(room, gif_url)
else:
await bot.send_text(room, 'Usage: !giphy <query>')
def get_settings(self):
data = super().get_settings()
data["api_key"] = self.api_key
return data
def set_settings(self, data):
super().set_settings(data)
if data.get("api_key"):
self.api_key = data["api_key"]
def help(self):
return ('Giphy bot')

View File

@ -6,6 +6,7 @@ class MatrixModule(BotModule):
def __init__(self, name):
super().__init__(name)
self.msg_users = False
self.info = "More information at https://github.com/vranki/hemppa"
def get_settings(self):
data = super().get_settings()

View File

@ -1,41 +0,0 @@
import sys
import traceback
from datetime import datetime, timedelta
from random import randrange
from igramscraper.exception.instagram_not_found_exception import \
InstagramNotFoundException
from igramscraper.instagram import Instagram
from modules.common.pollingservice import PollingService
class MatrixModule(PollingService):
def __init__(self, name):
super().__init__(name)
self.instagram = Instagram()
self.service_name = 'Instagram'
self.enabled = False
async def poll_implementation(self, bot, account, roomid, send_messages):
try:
medias = self.instagram.get_medias(account, 5)
self.logger.info(f'Polling instagram account {account} for room {roomid} - got {len(medias)} posts.')
for media in medias:
if send_messages:
if media.identifier not in self.known_ids:
await bot.send_html(bot.get_room_by_id(roomid),
f'<a href="{media.link}">Instagram {account}:</a> {media.caption}',
f'{account}: {media.caption} {media.link}')
self.known_ids.add(media.identifier)
except InstagramNotFoundException:
self.logger.error(f"{account} does not exist - deleting from room")
self.account_rooms[roomid].remove(account)
bot.save_settings()
except Exception:
self.logger.error('Polling instagram account failed:')
traceback.print_exc(file=sys.stderr)
polldelay = timedelta(minutes=30 + randrange(30))
self.next_poll_time[roomid] = datetime.now() + polldelay

View File

@ -1,45 +0,0 @@
from nio import RoomMessageUnknown, UnknownEvent
from modules.common.module import BotModule
class MatrixModule(BotModule):
bot = None
def matrix_start(self, bot):
super().matrix_start(bot)
self.bot = bot
bot.client.add_event_callback(self.unknownevent_cb, (UnknownEvent,))
def matrix_stop(self, bot):
super().matrix_stop(bot)
bot.remove_callback(self.unknownevent_cb)
async def unknownevent_cb(self, room, event):
try:
if 'type' in event.source and event.source['type'] == 'im.vector.modular.widgets' and event.source['content']['type'] == 'jitsi':
# Todo: Domain not found in Element Android events!
domain = event.source['content']['data']['domain']
conferenceId = event.source['content']['data']['conferenceId']
isAudioOnly = event.source['content']['data']['isAudioOnly']
sender = event.source['sender']
sender_response = await self.bot.client.get_displayname(event.sender)
sender = sender_response.displayname
# This is just a guess - is this the proper way to generate URL? Probably not.
jitsiUrl = f'https://{domain}/{conferenceId}'
calltype = 'video call'
if isAudioOnly:
calltype = 'audio call'
plainMessage = f'{sender} started a {calltype}: {jitsiUrl}'
htmlMessage = f'{sender} started a <a href="{jitsiUrl}">{calltype}</a>'
await self.bot.send_html(room, htmlMessage, plainMessage)
except Exception as e:
self.logger.error(f"Failed parsing Jitsi event. Error: {e}")
async def matrix_message(self, bot, room, event):
pass
def help(self):
return 'Sends text links when user starts a Jitsi video or audio call in room'

19
modules/kauppa.py Normal file
View File

@ -0,0 +1,19 @@
from modules.common.module import BotModule
class MatrixModule(BotModule):
async def matrix_message(self, bot, room, event):
rawlist = event.body.split(' ', 1)
rawlist.pop(0)
mdlist = list()
rawlist = rawlist[0].splitlines()
for item in rawlist:
mdlist.append('- [ ] ' + item)
self.logger.debug(mdlist)
await bot.send_text(room, "\n".join(mdlist), event)
def help(self):
return 'Formats the list given as arguments to a Markdown checkbox list'

View File

@ -1,148 +0,0 @@
from geopy.geocoders import Nominatim
from nio import RoomMessageUnknown
from modules.common.module import BotModule
class MatrixModule(BotModule):
def __init__(self, name):
super().__init__(name)
self.bot = None
self.enabled_rooms = []
def matrix_start(self, bot):
super().matrix_start(bot)
self.bot = bot
bot.client.add_event_callback(self.unknown_cb, RoomMessageUnknown)
def matrix_stop(self, bot):
super().matrix_stop(bot)
bot.remove_callback(self.unknown_cb)
'''
Location events are like: https://spec.matrix.org/v1.2/client-server-api/#mlocation
{
"content": {
"body": "geo:61.49342512194717,23.765914658307736",
"geo_uri": "geo:61.49342512194717,23.765914658307736",
"msgtype": "m.location",
"org.matrix.msc1767.text": "geo:61.49342512194717,23.765914658307736",
"org.matrix.msc3488.asset": {
"type": "m.pin"
},
"org.matrix.msc3488.location": {
"description": "geo:61.49342512194717,23.765914658307736",
"uri": "geo:61.49342512194717,23.765914658307736"
},
"org.matrix.msc3488.ts": 1653837929839
},
"room_id": "!xsBGdLYGrfYhGfLtHG:hacklab.fi",
"type": "m.room.message"
}
BUT sometimes there's ; separating altitude??
{
"content": {
"body": "geo:61.4704211,23.4864855;36.900001525878906",
"geo_uri": "geo:61.4704211,23.4864855;36.900001525878906",
"msgtype": "m.location",
"org.matrix.msc1767.text": "geo:61.4704211,23.4864855;36.900001525878906",
"org.matrix.msc3488.asset": {
"type": "m.self"
},
"org.matrix.msc3488.location": {
"description": "geo:61.4704211,23.4864855;36.900001525878906",
"uri": "geo:61.4704211,23.4864855;36.900001525878906"
},
"org.matrix.msc3488.ts": 1653931683087
},
"origin_server_ts": 1653931683998,
"sender": "@cos:hacklab.fi",
"type": "m.room.message",
"unsigned": {
"age": 70
},
"event_id": "$6xXutKF9EppPMMdc4aQLZjHyd8My0rIZuNZEcuSIPws",
"room_id": "!CLofqdurVWZCMpFnqM:hacklab.fi"
}
'''
async def unknown_cb(self, room, event):
if event.msgtype != 'm.location':
return
if room.room_id not in self.enabled_rooms:
return
location_text = event.content['body']
# Fallback if body is empty
if (len(location_text) == 0) or ('geo:' in location_text):
location_text = 'location'
sender_response = await self.bot.client.get_displayname(event.sender)
sender = sender_response.displayname
geo_uri = event.content['geo_uri']
try:
geo_uri = geo_uri[4:] # Strip geo:
if ';' in geo_uri: # Strip altitude, if present
geo_uri = geo_uri.split(';')[0]
latlon = geo_uri.split(',')
# Sanity checks to avoid url manipulation
float(latlon[0])
float(latlon[1])
except Exception:
self.bot.send_text(room, "Error: Invalid location " + geo_uri)
return
osm_link = f"https://www.openstreetmap.org/?mlat={latlon[0]}&mlon={latlon[1]}"
plain = f'{sender} sent {location_text} {osm_link} 🚩'
html = f'{sender} sent <a href="{osm_link}">{location_text}</a> 🚩'
await self.bot.send_html(room, html, plain)
async def matrix_message(self, bot, room, event):
args = event.body.split()
args.pop(0)
if len(args) == 0:
await bot.send_text(room, 'Usage: !loc <location name>')
return
elif len(args) == 1:
if args[0] == 'enable':
bot.must_be_admin(room, event)
self.enabled_rooms.append(room.room_id)
self.enabled_rooms = list(dict.fromkeys(self.enabled_rooms)) # Deduplicate
await bot.send_text(room, "Ok, sending locations events here as text versions")
bot.save_settings()
return
if args[0] == 'disable':
bot.must_be_admin(room, event)
self.enabled_rooms.remove(room.room_id)
await bot.send_text(room, "Ok, disabled here")
bot.save_settings()
return
query = event.body[4:]
geolocator = Nominatim(user_agent=bot.appid)
self.logger.info('loc: looking up %s ..', query)
location = geolocator.geocode(query)
self.logger.info('loc rx %s', location)
if location:
await bot.send_location(room, location.address, location.latitude, location.longitude, "m.pin")
else:
await bot.send_text(room, "Can't find " + query + " on map!")
def help(self):
return 'Search for locations and display Matrix location events as OSM links'
def get_settings(self):
data = super().get_settings()
data["enabled_rooms"] = self.enabled_rooms
return data
def set_settings(self, data):
super().set_settings(data)
if data.get("enabled_rooms"):
self.enabled_rooms = data["enabled_rooms"]

View File

@ -1,149 +0,0 @@
from mastodon import Mastodon
from modules.common.module import BotModule
class MatrixModule(BotModule):
apps = dict() # instance url <-> [app_id, app_secret]
logins = dict() # mxid <-> [username, accesstoken, instanceurl]
roomlogins = dict() # roomid <-> [username, accesstoken, instanceurl]
public = False
async def matrix_message(self, bot, room, event):
args = event.body.split()
args.pop(0)
if len(args) >= 1:
if args[0] == "toot":
toot_body = " ".join(args[1:])
accesstoken = None
if room.room_id in self.roomlogins.keys():
bot.must_be_admin(room, event)
username = self.roomlogins[room.room_id][0]
accesstoken = self.roomlogins[room.room_id][1]
instanceurl = self.roomlogins[room.room_id][2]
elif event.sender in self.logins.keys():
if not self.public:
bot.must_be_owner(event)
username = self.logins[event.sender][0]
accesstoken = self.logins[event.sender][1]
instanceurl = self.logins[event.sender][2]
if accesstoken:
toottodon = Mastodon(
access_token = accesstoken,
api_base_url = instanceurl
)
tootdict = toottodon.toot(toot_body)
await bot.send_text(room, tootdict['url'])
else:
await bot.send_text(room, f'{event.sender} has not logged in yet with the bot. Please do so.')
return
if len(args) == 4:
if args[0] == "login":
if not self.public:
bot.must_be_owner(event)
mxid = event.sender
instanceurl = args[1]
username = args[2]
password = args[3]
await self.register_app_if_necessary(bot, room, instanceurl)
await self.login_to_account(bot, room, mxid, None, instanceurl, username, password)
return
if len(args) == 5:
if args[0] == "roomlogin":
if not self.public:
bot.must_be_owner(event)
roomalias = args[1]
instanceurl = args[2]
username = args[3]
password = args[4]
roomid = await bot.get_room_by_alias(roomalias)
if roomid:
await self.register_app_if_necessary(bot, room, instanceurl)
await self.login_to_account(bot, room, None, roomid, instanceurl, username, password)
else:
await bot.send_text(room, f'Unknown room alias {roomalias} - invite bot to the room first.')
return
if len(args) == 1:
if args[0] == "status":
out = f'App registered on {len(self.apps)} instances, public use enabled: {self.public}\n'
out = out + f'{len(self.logins)} users logged in:\n'
for login in self.logins.keys():
out = out + f' - {login} as {self.logins[login][0]} on {self.logins[login][2]}\n'
out = out + f'{len(self.roomlogins)} per-room logins:\n'
for roomlogin in self.roomlogins:
out = out + f' - {roomlogin} as {self.roomlogins[roomlogin][0]} on {self.roomlogins[roomlogin][2]}\n'
await bot.send_text(room, out)
if args[0] == "logout":
if event.sender in self.logins.keys():
# TODO: Is there a way to invalidate the access token with API?
del self.logins[event.sender]
bot.save_settings()
await bot.send_text(room, f'{event.sender} login data removed from the bot.')
if args[0] == "roomlogout":
bot.must_be_admin(room, event)
if room.room_id in self.roomlogins.keys():
del self.roomlogins[room.room_id]
bot.save_settings()
await bot.send_text(room, f'Login data for this room removed from the bot.')
else:
await bot.send_text(room, f'No login found for room id {room.room_id}.')
if args[0] == "clear":
bot.must_be_owner(event)
self.logins = dict()
self.roomlogins = dict()
bot.save_settings()
await bot.send_text(room, f'All Mastodon logins cleared')
if args[0] == "setpublic":
bot.must_be_owner(event)
self.public = True
bot.save_settings()
await bot.send_text(room, f'Mastodon usage is now public use')
if args[0] == "setprivate":
bot.must_be_owner(event)
self.public = False
bot.save_settings()
await bot.send_text(room, f'Mastodon usage is now restricted to bot owners')
async def register_app_if_necessary(self, bot, room, instanceurl):
if not instanceurl in self.apps.keys():
app = Mastodon.create_app(f'Hemppa The Bot - {bot.client.user}', api_base_url = instanceurl)
self.apps[instanceurl] = [app[0], app[1]]
bot.save_settings()
await bot.send_text(room, f'Registered Mastodon app on {instanceurl}')
async def login_to_account(self, bot, room, mxid, roomid, instanceurl, username, password):
mastodon = Mastodon(client_id = self.apps[instanceurl][0], client_secret = self.apps[instanceurl][1], api_base_url = instanceurl)
access_token = mastodon.log_in(username, password)
print('login_To_account', mxid, roomid)
if mxid:
self.logins[mxid] = [username, access_token, instanceurl]
await bot.send_text(room, f'Logged Matrix user {mxid} into {instanceurl} as {username}')
elif roomid:
self.roomlogins[roomid] = [username, access_token, instanceurl]
await bot.send_text(room, f'Set room {roomid} Mastodon user to {username} on {instanceurl}')
bot.save_settings()
def get_settings(self):
data = super().get_settings()
data['apps'] = self.apps
data['logins'] = self.logins
data['roomlogins'] = self.roomlogins
data['public'] = self.public
return data
def set_settings(self, data):
super().set_settings(data)
if data.get("apps"):
self.apps = data["apps"]
if data.get("logins"):
self.logins = data["logins"]
if data.get("roomlogins"):
self.roomlogins = data["roomlogins"]
if data.get("public"):
self.public = data["public"]
def help(self):
return ('Mastodon')

View File

@ -1,20 +0,0 @@
import urllib.request
from modules.common.module import BotModule
class MatrixModule(BotModule):
async def matrix_message(self, bot, room, event):
args = event.body.split()
if len(args) == 2:
icao = args[1]
metar_url = "https://tgftp.nws.noaa.gov/data/observations/metar/stations/" + \
icao.upper() + ".TXT"
response = urllib.request.urlopen(metar_url)
lines = response.readlines()
await bot.send_text(room, lines[1].decode("utf-8").strip())
else:
await bot.send_text(room, 'Usage: !metar <icao code>')
def help(self):
return ('Metar data access (usage: !metar <icao code>)')

View File

@ -1,96 +0,0 @@
from modules.common.module import BotModule
import random
import socket
from struct import pack, unpack
import time
# Modified from https://gist.github.com/azlux/315c924af4800ffbc2c91db3ab8a59bc
class MatrixModule(BotModule):
def __init__(self, name):
super().__init__(name)
self.host = None
self.port = 64738
def set_settings(self, data):
super().set_settings(data)
if data.get('host'):
self.host = data['host']
if data.get('port'):
self.port = data['port']
def get_settings(self):
data = super().get_settings()
data['host'] = self.host
data['port'] = self.port
return data
async def matrix_message(self, bot, room, event):
args = event.body.split()
if len(args) > 1 and args[1] in ['set', 'setserver']:
bot.must_be_owner(event)
self.logger.info(f"room: {room.name} sender: {event.sender} is setting the server settings")
if len(args) < 3:
self.host = None
return await bot.send_text(room, f'Usage: !{args[0]} {args[1]} [host] ([port])')
self.host = args[2]
if len(args) > 3:
self.port = int(args[3])
if not self.port:
self.port = 64738
bot.save_settings()
return await bot.send_text(room, f'Set server settings: host: {self.host} port: {self.port}')
self.logger.info(f"room: {room.name} sender: {event.sender} wants mumble info")
if not self.host:
return await bot.send_text(room, f'No mumble host info set!')
try:
ret = self.mumble_ping()
# https://wiki.mumble.info/wiki/Protocol
# [0,1,2,3] = version
version = '.'.join(map(str, ret[1:4]))
# [4] = identifier passed to the server (used here to get ping time)
ping = int(time.time() * 1000) - ret[4]
# [7] = bandwidth
# [5] = users
# [6] = max users
await bot.send_text(room, f'{self.host}:{self.port} (v{version}): {ret[5]} / {ret[6]} (ping: {ping}ms)')
except socket.gaierror as e:
self.logger.error(f"room: {room.name}: mumble_ping failed: {e}")
await bot.send_text(room, f'Could not get get mumble server info: {e}')
def mumble_ping(self):
addrinfo = socket.getaddrinfo(self.host, self.port, 0, 0, socket.SOL_UDP)
for (family, socktype, proto, canonname, sockaddr) in addrinfo:
s = socket.socket(family, socktype, proto=proto)
s.settimeout(2)
buf = pack(">iQ", 0, int(time.time() * 1000))
try:
s.sendto(buf, sockaddr)
except (socket.gaierror, socket.timeout) as e:
continue
try:
data, addr = s.recvfrom(1024)
except socket.timeout:
continue
return unpack(">bbbbQiii", data)
def help(self):
return 'Show info about a mumble server'
def long_help(self):
text = self.help() + (
'\n- "!mumble": Get the status of the configured mumble server')
if bot and event and bot.is_owner(event):
text += (
'\nOwner commands:'
'\n- "!mumble set [host] ([port])": Set use the following host and port'
'\n- If no port is given, defaults to 64738')
return text

View File

@ -1,30 +0,0 @@
from modules.common.module import BotModule
import requests, json
import traceback
from modules.common.pollingservice import PollingService
class MatrixModule(PollingService):
def __init__(self, name):
super().__init__(name)
self.service_name = 'MXMA'
self.poll_interval_min = 5
self.poll_interval_random = 2
self.owner_only = True
self.send_all = True
self.enabled = False
async def poll_implementation(self, bot, account, roomid, send_messages):
try:
response = requests.get(url=account, timeout=5)
if response.status_code == 200:
if 'messages' in response.json():
messages = response.json()['messages']
for message in messages:
success = await bot.send_msg(message['to'], message['title'], message['message'])
except Exception:
self.logger.error('Polling MXMA failed:')
traceback.print_exc(file=sys.stderr)
def help(self):
return 'Matrix messaging API'

View File

@ -1,47 +0,0 @@
import re
import urllib.request
from modules.common.module import BotModule
class MatrixModule(BotModule):
async def matrix_message(self, bot, room, event):
args = event.body.split()
if len(args) == 2 and len(args[1]) == 4:
icao = args[1].upper()
notam = self.get_notam(icao)
await bot.send_text(room, notam)
else:
await bot.send_text(room, 'Usage: !notam <icao code>')
def help(self):
return ('NOTAM data access (usage: !notam <icao code>) - Currently Finnish airports only')
# TODO: This handles only finnish airports. Implement support for other countries.
def get_notam(self, icao):
if not icao.startswith('EF'):
return ('Only Finnish airports supported currently, sorry.')
icao_first_letter = icao[2]
if icao_first_letter < 'M':
notam_url = "https://www.ais.fi/ais/bulletins/envfra.htm"
else:
notam_url = "https://www.ais.fi/ais/bulletins/envfrm.htm"
response = urllib.request.urlopen(notam_url)
lines = response.readlines()
lines = b''.join(lines)
lines = lines.decode("ISO-8859-1")
# Strip EN-ROUTE from end
lines = lines[0:lines.find('<a name="EN-ROUTE">')]
startpos = lines.find('<a name="' + icao + '">')
if startpos > -1:
endpos = lines.find('<h3>', startpos)
if endpos == -1:
endpos = len(lines)
notam = lines[startpos:endpos]
notam = re.sub('<[^<]+?>', ' ', notam)
if len(notam) > 4:
return notam
return f'Cannot parse notam for {icao} at {notam_url}'

View File

@ -1,116 +0,0 @@
from modules.common.module import BotModule
from nio import RoomMessageMedia
from typing import Optional
import sys
import traceback
import cups
import httpx
import aiofiles
import os
# Credit: https://medium.com/swlh/how-to-boost-your-python-apps-using-httpx-and-asynchronous-calls-9cfe6f63d6ad
async def download_file(url: str, filename: Optional[str] = None) -> str:
filename = filename or url.split("/")[-1]
filename = f"/tmp/{filename}"
client = httpx.AsyncClient()
async with client.stream("GET", url) as resp:
resp.raise_for_status()
async with aiofiles.open(filename, "wb") as f:
async for data in resp.aiter_bytes():
if data:
await f.write(data)
await client.aclose()
return filename
class MatrixModule(BotModule):
def __init__(self, name):
super().__init__(name)
self.printers = dict() # roomid <-> printername
self.bot = None
self.paper_size = 'A4' # Todo: configurable
self.enabled = False
async def file_cb(self, room, event):
try:
if self.bot.should_ignore_event(event):
return
if room.room_id in self.printers:
printer = self.printers[room.room_id]
self.logger.debug(f'RX file - MXC {event.url} - from {event.sender}')
https_url = await self.bot.client.mxc_to_http(event.url)
self.logger.debug(f'HTTPS URL {https_url}')
filename = await download_file(https_url)
self.logger.debug(f'RX filename {filename}')
conn = cups.Connection ()
conn.printFile(printer, filename, f"Printed from Matrix - {filename}", {'fit-to-page': 'TRUE', 'PageSize': self.paper_size})
await self.bot.send_text(room, f'Printing file on {printer}..')
os.remove(filename) # Not sure if we should wait first?
else:
self.logger.debug(f'No printer configured for room {room.room_id}')
except:
self.logger.warning(f"File callback failure")
traceback.print_exc(file=sys.stderr)
await self.bot.send_text(room, f'Printing failed, sorry. See log for details.')
def matrix_start(self, bot):
super().matrix_start(bot)
bot.client.add_event_callback(self.file_cb, RoomMessageMedia)
self.bot = bot
def matrix_stop(self, bot):
super().matrix_stop(bot)
bot.remove_callback(self.file_cb)
self.bot = None
async def matrix_message(self, bot, room, event):
bot.must_be_owner(event)
args = event.body.split()
args.pop(0)
conn = cups.Connection ()
printers = conn.getPrinters ()
if len(args) == 1:
if args[0] == 'list':
msg = f"Available printers:\n"
for printer in printers:
print(printer, printers[printer]["device-uri"])
msg += f' - {printer} / {printers[printer]["device-uri"]}'
for roomid, printerid in self.printers.items():
if printerid == printer:
msg += f' <- room {roomid}'
msg += '\n'
await bot.send_text(room, msg)
elif args[0] == 'rmroomprinter':
del self.printers[room.room_id]
await bot.send_text(room, f'Deleted printer from this room.')
bot.save_settings()
if len(args) == 2:
if args[0] == 'setroomprinter':
printer = args[1]
if printer in printers:
await bot.send_text(room, f'Printing with {printer} here.')
self.printers[room.room_id] = printer
bot.save_settings()
else:
await bot.send_text(room, f'No printer called {printer} in your CUPS.')
if args[0] == 'setpapersize':
self.paper_size = args[1]
bot.save_settings()
await bot.send_text(room, f'Paper size set to {self.paper_size}.')
def help(self):
return 'Print files from Matrix'
def get_settings(self):
data = super().get_settings()
data["printers"] = self.printers
data["paper_size"] = self.paper_size
return data
def set_settings(self, data):
super().set_settings(data)
if data.get("printers"):
self.printers = data["printers"]
if data.get("paper_size"):
self.paper_size = data["paper_size"]

View File

@ -1,80 +0,0 @@
from typing import Text
import urllib
import urllib.request
from urllib.parse import urlencode, quote_plus
import json
import time
from modules.common.module import BotModule
class PeerTubeClient:
def __init__(self):
self.instance_url = 'https://sepiasearch.org/'
def search(self, search_string, count=0):
if count == 0:
count = 15 # Pt default, could also remove from params..
params = urlencode({'search': search_string, 'count': count}, quote_via=quote_plus)
search_url = self.instance_url + 'api/v1/search/videos?' + params
response = urllib.request.urlopen(search_url)
data = json.loads(response.read().decode("utf-8"))
return data
class MatrixModule(BotModule):
def __init__(self, name):
super().__init__(name)
self.instance_url = 'https://sepiasearch.org/'
def matrix_start(self, bot):
super().matrix_start(bot)
self.add_module_aliases(bot, ['ptall'])
async def matrix_message(self, bot, room, event):
args = event.body.split()
if len(args) == 3:
if args[1] == "setinstance":
bot.must_be_owner(event)
self.instance_url = args[2]
bot.save_settings()
await bot.send_text(room, 'Instance url set to ' + self.instance_url, bot_ignore=True)
return
if len(args) == 2:
if args[1] == "showinstance":
await bot.send_text(room, 'Using instance at ' + self.instance_url, bot_ignore=True)
return
if len(args) > 1:
query = event.body[len(args[0])+1:]
p = PeerTubeClient()
p.instance_url = self.instance_url
count = 1
if args[0] == '!ptall':
count = 0
data = p.search(query, count)
if len(data['data']) > 0:
for video in data['data']:
video_url = video.get("url") or self.instance_url + 'videos/watch/' + video["uuid"]
duration = time.strftime('%H:%M:%S', time.gmtime(video["duration"]))
instancedata = video["account"]["host"]
html = f'<a href="{video_url}">{video["name"]}</a> {video["description"] or ""} [{duration}] @ {instancedata}'
text = f'{video_url} : {video["name"]} {video.get("description") or ""} [{duration}]'
await bot.send_html(room, html, text, bot_ignore=True)
else:
await bot.send_text(room, 'Sorry, no videos found found.', bot_ignore=True)
else:
await bot.send_text(room, 'Usage: !pt <query> or !ptall <query> to return all results')
def get_settings(self):
data = super().get_settings()
data['instance_url'] = self.instance_url
return data
def set_settings(self, data):
super().set_settings(data)
if data.get("instance_url"):
self.instance_url = data["instance_url"]
def help(self):
return ('PeerTube search')

View File

@ -1,19 +0,0 @@
from modules.common.module import BotModule
import urllib.request
class MatrixModule(BotModule):
async def matrix_message(self, bot, room, event):
args = event.body.split()
args.pop(0)
day = 0
hour = 12
if len(args) >= 1:
day = int(args[0]) - 1
if len(args) == 2:
hour = int(args[1])
imgurl = 'http://ennuste.ilmailuliitto.fi/' + str(day) + '/wstar_bsratio.curr.' + str(hour) + '00lst.d2.png'
await bot.upload_and_send_image(room, imgurl, f"RASP Day {day+1} at {hour}:00", no_cache=True)
def help(self):
return 'RASP Gliding Weather forecast, Finland only'

View File

@ -1,109 +0,0 @@
from modules.common.module import BotModule
from nio import RoomMessageText
class MatrixModule(BotModule):
def __init__(self, name):
super().__init__(name)
self.bridges = dict()
self.bot = None
self.enabled = False
async def message_cb(self, room, event):
if self.bot.should_ignore_event(event):
return
if event.body.startswith('!'):
return
source_id = None
target_id = None
for src_id, tgt_id in self.bridges.items():
if room.room_id == src_id:
source_id = src_id
target_id = tgt_id
elif room.room_id == tgt_id:
source_id = tgt_id
target_id = src_id
if not source_id or not target_id:
return
target_room = self.bot.get_room_by_id(target_id)
if(target_room):
sendernick = target_room.user_name(event.sender)
if not sendernick:
sendernick = event.sender
await self.bot.send_text(target_room, f'<{sendernick}> {event.body}', msgtype="m.text", bot_ignore=True)
else:
self.logger.warning(f"Bot doesn't seem to be in bridged room {target_id}")
def matrix_start(self, bot):
super().matrix_start(bot)
bot.client.add_event_callback(self.message_cb, RoomMessageText)
self.bot = bot
def matrix_stop(self, bot):
super().matrix_stop(bot)
bot.remove_callback(self.message_cb)
self.bot = None
async def matrix_message(self, bot, room, event):
bot.must_be_admin(room, event)
args = event.body.split()
args.pop(0)
if len(args) == 1:
if args[0] == 'list':
i = 1
msg = f"Active relay bridges ({len(self.bridges)}):\n"
for src_id, tgt_id in self.bridges.items():
srcroom = self.bot.get_room_by_id(src_id)
tgtroom = self.bot.get_room_by_id(tgt_id)
if srcroom:
srcroom = srcroom.display_name
else:
srcroom = f'??? {src_id}'
if tgtroom:
tgtroom = tgtroom.display_name
else:
tgtroom = f'??? {tgt_id}'
msg += f'{i}: {srcroom} <-> {tgtroom}'
i = i + 1
await bot.send_text(room, msg)
if len(args) == 2:
if args[0] == 'bridge':
roomid = args[1]
room_to_bridge = bot.get_room_by_id(roomid)
if room_to_bridge:
await bot.send_text(room, f'Bridging {room_to_bridge.display_name} here.')
self.bridges[room.room_id] = roomid
bot.save_settings()
else:
await bot.send_text(room, f'I am not on room with id {roomid} (note: use id, not alias)!')
elif args[0] == 'unbridge':
idx = int(args[1]) - 1
i = 0
for src_id, tgt_id in self.bridges.items():
if i == idx:
del self.bridges[src_id]
await bot.send_text(room, f'Unbridged {src_id} and {tgt_id}.')
bot.save_settings()
return
i = i + 1
def help(self):
return 'Simple relaybot between two Matrix rooms'
def get_settings(self):
data = super().get_settings()
data["bridges"] = self.bridges
return data
def set_settings(self, data):
super().set_settings(data)
if data.get("bridges"):
self.bridges = data["bridges"]

View File

@ -1,66 +0,0 @@
from modules.common.pollingservice import PollingService
from urllib.request import urlopen
import json
import time
class MatrixModule(PollingService):
def __init__(self, name):
super().__init__(name)
self.accountroomid_laststatus = {}
self.template = '{spacename} is now {open_closed}'
self.i18n = {'open': 'open 🔓', 'closed': 'closed 🔒'}
async def poll_implementation(self, bot, account, roomid, send_messages):
self.logger.debug(f'polling space api {account}.')
spacename, is_open = MatrixModule.open_status(account)
open_str = self.i18n['open'] if is_open else self.i18n['closed']
text = self.template.format(spacename=spacename, open_closed=open_str)
self.logger.debug(text)
last_status = self.accountroomid_laststatus.get(account+roomid, False)
if send_messages and last_status != is_open:
await bot.send_text(bot.get_room_by_id(roomid), text)
self.accountroomid_laststatus[account+roomid] = is_open
bot.save_settings()
@staticmethod
def open_status(spaceurl):
with urlopen(spaceurl, timeout=5) as response:
js = json.load(response)
return js['space'], js['state']['open']
def get_settings(self):
data = super().get_settings()
data['laststatus'] = self.accountroomid_laststatus
data['template'] = self.template
data['i18n'] = self.i18n
return data
def set_settings(self, data):
super().set_settings(data)
if data.get('laststatus'):
self.accountroomid_laststatus = data['laststatus']
if data.get('template'):
self.template = data['template']
if data.get('i18n'):
self.i18n = data['i18n']
def help(self):
return "Notify about Space-API status changes (open or closed)."
def long_help(self, bot, event, **kwargs):
text = self.help() + \
' This is a polling service. Therefore there are additional ' + \
'commands: list, debug, poll, clear, add URL, del URL\n' + \
'!spaceapi add URL: to add a space-api endpoint\n' + \
'!spaceapi list: to list the endpoint configured for this room.\n' + \
f'I will look for changes roughly every {self.poll_interval_min} ' + \
'minutes. Find out more about Space-API at https://spaceapi.io/.'
if bot.is_owner(event):
text += '\nA template and I18N can be configured via settings of ' + \
'the module. Use "!bot export spacepi", then change the ' + \
'settings and import again with "!bot import spacepi SETTINGS".'
return text

View File

@ -1,23 +0,0 @@
import urllib.request
from modules.common.module import BotModule
class MatrixModule(BotModule):
async def matrix_message(self, bot, room, event):
args = event.body.split()
if len(args) == 2:
icao = args[1]
taf_url = "https://aviationweather.gov/adds/dataserver_current/httpparam?dataSource=tafs&requestType=retrieve&format=csv&hoursBeforeNow=3&timeType=issue&mostRecent=true&stationString=" + icao.upper()
response = urllib.request.urlopen(taf_url)
lines = response.readlines()
if len(lines) > 6:
taf = lines[6].decode("utf-8").split(',')[0]
await bot.send_text(room, taf.strip())
else:
await bot.send_text(room, 'Cannot find taf for ' + icao)
else:
await bot.send_text(room, 'Usage: !taf <icao code>')
def help(self):
return ('Taf data access (usage: !taf <icao code>)')

View File

@ -1,247 +0,0 @@
import time
import urllib.request
import urllib.parse
import urllib.error
import aiohttp.web
import requests
import os
import json
import asyncio
from aiohttp import web
from future.moves.urllib.parse import urlencode
from nio import MatrixRoom
from modules.common.module import BotModule
import nest_asyncio
nest_asyncio.apply()
rooms = dict()
global_bot = None
send_entry_lock = asyncio.Lock()
async def send_entry(blob, content_type, fmt_params, rooms):
async with send_entry_lock:
for room_id in rooms:
room = MatrixRoom(room_id=room_id, own_user_id=os.getenv("BOT_OWNERS"),
encrypted=rooms[room_id])
if blob and content_type:
await global_bot.upload_and_send_image(room, blob, text="", blob=True, blob_content_type=content_type)
await global_bot.send_html(room, msg_template_html.format(**fmt_params),
msg_template_plain.format(**fmt_params))
def get_image(img=None, width=1000, height=1500):
"""
Return image data as array.
Array contains the image content type and image binary
Parameters required: img { Plex image location }
Optional parameters: width { the image width }
height { the image height }
Output: array
"""
pms_url = os.getenv("PLEX_MEDIA_SERVER_URL")
pms_token = os.getenv("PLEX_MEDIA_SERVER_TOKEN")
if not pms_url or not pms_token:
return None
width = width or 1000
height = height or 1500
if img:
params = {'url': 'http://127.0.0.1:32400%s' % (img), 'width': width, 'height': height, 'format': "png"}
uri = pms_url + '/photo/:/transcode?%s' % urlencode(params)
headers = {'X-Plex-Token': pms_token}
session = requests.Session()
try:
r = session.request("GET", uri, headers=headers)
r.raise_for_status()
except Exception:
return None
response_status = r.status_code
response_content = r.content
response_headers = r.headers
if response_status in (200, 201):
return response_content, response_headers['Content-Type']
def get_from_entry(entry):
blob = None
content_type = ""
if "art" in entry:
pms_image = get_image(entry["art"], 600, 300)
if pms_image:
(blob, content_type) = pms_image
fmt_params = {
"title": entry["title"],
"year": entry["year"],
"audience_rating": entry["audience_rating"],
"directors": ", ".join(entry["directors"]),
"actors": ", ".join(entry["actors"]),
"summary": entry["summary"],
"tagline": entry["tagline"],
"genres": ", ".join(entry["genres"])
}
return (blob, content_type, fmt_params)
msg_template_html = """
<b>{title} -({year})- Rating: {audience_rating}</b><br>
Director(s): {directors}<br>
Actors: {actors}<br>
<I>{summary}</I><br>
{tagline}<br>
Genre(s): {genres}<br><br>"""
msg_template_plain = """*{title} -({year})- Rating: {audience_rating}*
Director(s): {directors}
Actors: {actors}
{summary}
{tagline}
Genre(s): {genres}
"""
class WebServer:
def __init__(self, host, port):
self.host = host
self.port = port
self.app = web.Application()
self.app.router.add_post('/notify', self.notify)
async def run(self):
if not self.host or not self.port:
return
loop = asyncio.get_event_loop()
runner = web.AppRunner(self.app)
loop.run_until_complete(runner.setup())
site = web.TCPSite(runner, host=self.host, port=self.port)
loop.run_until_complete(site.start())
async def notify(self, request: web.Request) -> web.Response:
try:
data = await request.json()
if "genres" in data:
data["genres"] = data["genres"].split(",")
if "actors" in data:
data["actors"] = data["actors"].split(",")
if "directors" in data:
data["directors"] = data["directors"].split(",")
global rooms
(blob, content_type, fmt_params) = get_from_entry(data)
await send_entry(blob, content_type, fmt_params, rooms)
except Exception as exc:
message = str(exc)
return web.HTTPBadRequest(body=message)
return web.Response()
class MatrixModule(BotModule):
httpd = None
rooms = dict()
api_key = None
def __init__(self, name):
super().__init__(name)
self.httpd = WebServer(os.getenv("TAUTULLI_NOTIFIER_ADDR"), os.getenv("TAUTULLI_NOTIFIER_PORT"))
def matrix_start(self, bot):
super().matrix_start(bot)
global global_bot
global_bot = bot
loop = asyncio.get_event_loop()
loop.run_until_complete(self.httpd.run())
def matrix_stop(self, bot):
super().matrix_stop(bot)
async def matrix_message(self, bot, room, event):
args = event.body.split()
if len(args) == 3 and args[1] == 'apikey':
bot.must_be_owner(event)
self.api_key = args[2]
bot.save_settings()
await bot.send_text(room, 'Api key set')
elif len(args) == 2:
media_type = args[1]
if media_type != "movie" and media_type != "show" and media_type != "artist":
await bot.send_text(room, "media type '%s' provided not valid" % media_type)
return
try:
url = "{}/api/v2?apikey={}&cmd=get_recently_added&count=10".format(os.getenv("TAUTULLI_URL"), self.api_key)
req = urllib.request.Request(url + "&media_type=" + media_type)
connection = urllib.request.urlopen(req).read()
entries = json.loads(connection)
if "response" not in entries and "data" not in entries["response"] and "recently_added" not in entries["response"]["data"]:
await bot.send_text(room, "no recently added for %s" % media_type)
return
for entry in entries["response"]["data"]["recently_added"]:
(blob, content_type, fmt_params) = get_from_entry(entry)
await send_entry(blob, content_type, fmt_params, {room.room_id: room})
except urllib.error.HTTPError as err:
raise ValueError(err.read())
except Exception as exc:
message = str(exc)
await bot.send_text(room, message)
elif len(args) == 4:
if args[1] == "add" or args[1] == "remove":
room_id = args[2]
encrypted = args[3]
if args[1] == "add":
self.rooms[room_id] = encrypted == "encrypted"
await bot.send_text(room, f"Added {room_id} to rooms notification list")
else:
del self.rooms[room_id]
await bot.send_text(room, f"Removed {room_id} to rooms notification list")
bot.save_settings()
global rooms
rooms = self.rooms
else:
await bot.send_text(room, 'Usage: !tautulli <movie|show|artist>|<add|remove> %room_id% %encrypted%')
else:
await bot.send_text(room, 'Usage: !tautulli <movie|show|artist>|<add|remove> %room_id% %encrypted%')
def get_settings(self):
data = super().get_settings()
data["api_key"] = self.api_key
data["rooms"] = self.rooms
global rooms
rooms = self.rooms
return data
def set_settings(self, data):
super().set_settings(data)
if data.get("rooms"):
self.rooms = data["rooms"]
global rooms
rooms = self.rooms
if data.get("api_key"):
self.api_key = data["api_key"]
def help(self):
return ('Tautulli recently added bot')

View File

@ -1,165 +0,0 @@
import time
from datetime import datetime
from pyteamup import Calendar
#
# TeamUp calendar notifications
#
from modules.common.module import BotModule
class MatrixModule(BotModule):
def __init__(self, name):
super().__init__(name)
self.api_key = None
self.calendar_rooms = dict() # Roomid -> [calid, calid..]
self.calendars = dict() # calid -> Calendar
self.enabled = False
async def matrix_poll(self, bot, pollcount):
if self.api_key:
if pollcount % (6 * 5) == 0: # Poll every 5 min
await self.poll_all_calendars(bot)
async def matrix_message(self, bot, room, event):
args = event.body.split()
if len(args) == 1:
if self.calendar_rooms.get(room.room_id):
for calendarid in self.calendar_rooms.get(room.room_id):
calendar = self.calendars[calendarid]
events = calendar.get_event_collection()
for event in events:
s = '<b>' + str(event.start_dt.day) + \
'.' + str(event.start_dt.month)
if not event.all_day:
s = s + ' ' + \
event.start_dt.strftime(
"%H:%M") + ' (' + str(event.duration) + ' min)'
s = s + '</b> ' + event.title + \
" " + (event.notes or '')
await bot.send_html(room, s, s)
elif len(args) == 2:
if args[1] == 'list':
await bot.send_text(room, f'Calendars in this room: {self.calendar_rooms.get(room.room_id) or []}')
elif args[1] == 'poll':
bot.must_be_owner(event)
await self.poll_all_calendars(bot)
elif len(args) == 3:
if args[1] == 'add':
bot.must_be_admin(room, event)
calid = args[2]
self.logger.info(f'Adding calendar {calid} to room id {room.room_id}')
if self.calendar_rooms.get(room.room_id):
if calid not in self.calendar_rooms[room.room_id]:
self.calendar_rooms[room.room_id].append(calid)
else:
await bot.send_text(room, 'This teamup calendar already added in this room!')
return
else:
self.calendar_rooms[room.room_id] = [calid]
self.logger.info(f'Calendars now for this room {self.calendar_rooms.get(room.room_id)}')
bot.save_settings()
self.setup_calendars()
await bot.send_text(room, 'Added new teamup calendar to this room')
if args[1] == 'del':
bot.must_be_admin(room, event)
calid = args[2]
self.logger.info(f'Removing calendar {calid} from room id {room.room_id}')
if self.calendar_rooms.get(room.room_id):
self.calendar_rooms[room.room_id].remove(calid)
self.logger.info(f'Calendars now for this room {self.calendar_rooms.get(room.room_id)}')
bot.save_settings()
self.setup_calendars()
await bot.send_text(room, 'Removed teamup calendar from this room')
if args[1] == 'apikey':
bot.must_be_owner(event)
self.api_key = args[2]
bot.save_settings()
self.setup_calendars()
await bot.send_text(room, 'Api key set')
def help(self):
return ('Polls teamup calendar.')
async def poll_all_calendars(self, bot):
delete_rooms = []
for roomid in self.calendar_rooms:
if roomid in bot.client.rooms:
calendars = self.calendar_rooms[roomid]
for calendarid in calendars:
events, timestamp = self.poll_server(
self.calendars[calendarid])
self.calendars[calendarid].timestamp = timestamp
for event in events:
await bot.send_text(bot.get_room_by_id(roomid), 'Calendar: ' + self.eventToString(event))
else:
delete_rooms.append(roomid)
for roomid in delete_rooms:
self.calendar_rooms.pop(roomid, None)
def poll_server(self, calendar):
events, timestamp = calendar.get_changed_events(calendar.timestamp)
return events, timestamp
def to_datetime(self, dts):
try:
return datetime.strptime(dts, '%Y-%m-%dT%H:%M:%S')
except ValueError:
pos = len(dts) - 3
dts = dts[:pos] + dts[pos + 1:]
return datetime.strptime(dts, '%Y-%m-%dT%H:%M:%S%z')
def eventToString(self, event):
startdt = self.to_datetime(event['start_dt'])
if len(event['title']) == 0:
event['title'] = '(empty name)'
if (event['delete_dt']):
s = event['title'] + ' deleted.'
else:
s = event['title'] + " " + (event['notes'] or '') + \
' ' + str(startdt.day) + '.' + str(startdt.month)
if not event['all_day']:
s = s + ' ' + \
startdt.strftime("%H:%M") + \
' (' + str(event['duration']) + ' min)'
# todo: proper html stripper..
s = s.replace('<p>', '')
s = s.replace('</p>', '\n')
return s
def setup_calendars(self):
self.calendars = dict()
if self.api_key:
for roomid in self.calendar_rooms:
calendars = self.calendar_rooms[roomid]
for calid in calendars:
self.calendars[calid] = Calendar(calid, self.api_key)
self.calendars[calid].timestamp = int(time.time())
def get_settings(self):
data = super().get_settings()
data['apikey'] = self.api_key
data['calendar_rooms'] = self.calendar_rooms
return data
def set_settings(self, data):
super().set_settings(data)
if data.get('calendar_rooms'):
self.calendar_rooms = data['calendar_rooms']
if data.get('apikey'):
self.api_key = data['apikey']
if self.api_key and len(self.api_key) == 0:
self.api_key = None
self.setup_calendars()

View File

@ -1,134 +0,0 @@
import urllib.request
import wolframalpha
from html import escape
import json
from modules.common.module import BotModule
class MatrixModule(BotModule):
app_id = ''
def matrix_start(self, bot):
super().matrix_start(bot)
self.add_module_aliases(bot, ['wafull'])
async def matrix_message(self, bot, room, event):
args = event.body.split()
if len(args) == 3:
if args[1] == "appid":
bot.must_be_owner(event)
self.app_id = args[2]
bot.save_settings()
await bot.send_text(room, 'App id set')
return
if len(args) > 1:
if self.app_id == '':
await bot.send_text(room, 'Please get and set a appid: https://products.wolframalpha.com/simple-api/documentation/')
return
query = event.body[len(args[0])+1:]
client = wolframalpha.Client(self.app_id)
res = client.query(query)
result = "?SYNTAX ERROR"
if res['@success']:
self.logger.debug(f"room: {room.name} sender: {event.sender} sent a valid query to wa")
else:
self.logger.info(f"wa error: {res['@error']}")
short, full = self.parse_api_response(res)
if full[0] and 'full' in args[0]:
html, plain = full
elif short[0]:
html, plain = short
else:
plain = 'Could not find response for ' + query
html = plain
await bot.send_html(room, html, plain)
else:
await bot.send_text(room, 'Usage: !wa <query>')
def get_settings(self):
data = super().get_settings()
data['app_id'] = self.app_id
return data
def set_settings(self, data):
super().set_settings(data)
if data.get("app_id"):
self.app_id = data["app_id"]
def parse_api_response(self, res):
"""Parses the pods from wa and prepares texts to send to matrix
:param res: the result from wolframalpha.Client
:type res: dict
:return: a tuple of tuples: ((primary_html, primary_plaintext), (full_html, full_plaintext))
:rtype: tuple
"""
htmls = []
texts = []
primary = None
fallback = None
pods = res.get('pod')
if not pods:
return (('<em>(data not available)</em>', '(data not available)'), ) * 2
# workaround for bug(?) in upstream wa package
if hasattr(pods, 'get'):
pods = [pods]
for pod in res['pod']:
pod_htmls = []
pod_texts = []
spods = pod.get('subpod')
if not spods:
continue
# workaround for bug(?) in upstream wa package
if hasattr(spods, 'get'):
spods = [spods]
for spod in spods:
title = spod.get('@title')
text = spod.get('plaintext')
if not text:
continue
if title:
html = f'<strong>{escape(title)}</strong>: {escape(text)}'
text = f'{title}: {text}'
else:
html = escape(text)
pod_htmls += html.split('\n')
pod_texts += text.split('\n')
if pod_texts:
title = pod.get('@title')
pod_html = '\n'.join([f'<p><strong>{escape(title)}</strong>\n<ul>']
+ [f'<li>{s}</li>' for s in pod_htmls]
+ ['</ul></p>'])
pod_text = '\n'.join([title]
+ [f'- {s}' for s in pod_texts])
htmls.append(pod_html)
texts.append(pod_text)
if not primary and self.is_primary(pod):
primary = (f'<strong>{escape(title)}</strong>: ' + ' | '.join(pod_htmls),
f'{title}: ' + ' | '.join(pod_texts))
else:
fallback = fallback or (' | '.join(pod_htmls), ' | '.join(pod_texts))
return (primary or fallback, ('\n'.join(htmls), '\n'.join(texts)))
def is_primary(self, pod):
return pod.get('@primary') or 'Definition' in pod.get('@title') or 'Result' in pod.get('@title')
def help(self):
return ('Wolfram Alpha search')
def long_help(self, bot=None, event=None, **kwargs):
text = self.help() + (
'\n- "!wa [query]": Query WolframAlpha and return the primary pod'
'\n- "!wafull [query]": Query WolframAlpha and return all pods'
)
if bot and event and bot.is_owner(event):
text += '\n- "!wa appid [appid]": Set appid'
return text