Compare commits
10 Commits
master
...
cam_module
Author | SHA1 | Date |
---|---|---|
Jarno Rankinen | 5a6c560086 | |
Jarno Rankinen | 9aedbef3a8 | |
Jarno Rankinen | 3783313af0 | |
Jarno Rankinen | bdf46124bd | |
Jarno Rankinen | a6715d8dcf | |
Jarno Rankinen | 14428d7382 | |
Jarno Rankinen | 0953457b96 | |
Jarno Rankinen | e9a2f9dca0 | |
Jarno Rankinen | d844e8ccb5 | |
Jarno Rankinen | b2215b5426 |
|
@ -12,8 +12,11 @@ RUN pip install -r requirements.txt
|
|||
|
||||
COPY bot.py *.json *.pickle /bot/
|
||||
COPY config config
|
||||
COPY modules modules
|
||||
|
||||
VOLUME /bot/config
|
||||
RUN useradd -m HomeBot && chown HomeBot -R /bot && apt install curl jq -y
|
||||
USER HomeBot
|
||||
WORKDIR /bot
|
||||
COPY modules modules
|
||||
|
||||
CMD [ "python", "-u", "./bot.py" ]
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
import re
|
||||
from modules.common.module import BotModule
|
||||
import requests
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self,name):
|
||||
super().__init__(name)
|
||||
self.motionurl = 'http://localhost:8080'
|
||||
self.cameras = []
|
||||
self.allowed_cmds = {
|
||||
'config': ['list','set','get','write'],
|
||||
'detection': ['status','connection','start','pause'],
|
||||
'action': ['eventstart','eventend','snapshot','restart','quit','end']
|
||||
}
|
||||
self.restricted_cmds = ['list','set','get','write','start','pause','restart','quit','end']
|
||||
self.helptext = """Control the motion daemon.
|
||||
Available commands:
|
||||
- config list|set|get|write
|
||||
- detection status|connection|start|pause
|
||||
- action eventstart|eventend|snapshot|restart|quit|end
|
||||
- url get|set <motionurl>
|
||||
|
||||
Usage: '!cam <id> category command'
|
||||
|
||||
<id> is the numerical id of the camera. Use 0 for all cameras.
|
||||
If <id> is omitted, 0 is assumed."""
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['motionurl'] = self.motionurl
|
||||
data['cameras'] = self.cameras
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get('motionurl'):
|
||||
self.motionurl = data['motionurl']
|
||||
if data.get('cameras'):
|
||||
self.cameras = data['cameras']
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
if args[0] == 'help':
|
||||
await bot.send_text(room, self.helptext, event)
|
||||
return
|
||||
|
||||
elif args[0] == 'url':
|
||||
if args[1] == 'set':
|
||||
newurl = args[2]
|
||||
bot.must_be_owner(event)
|
||||
self.motionurl = newurl
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f"Motion API URL set to {self.motionurl}", event)
|
||||
elif args[1] == 'get':
|
||||
await bot.send_text(room, f"Motion URL is currently {self.motionurl}", event)
|
||||
|
||||
elif args[0] == 'cameras':
|
||||
if args[1] == 'set':
|
||||
bot.must_be_owner(event)
|
||||
self.cameras = args[2:]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, "Updated camera id list", event)
|
||||
elif args[1] == 'get':
|
||||
camstr = ''
|
||||
if len(self.cameras) == 0:
|
||||
await bot.send_text(room, "No camera ids configured", event)
|
||||
else:
|
||||
for n, cam in enumerate(self.cameras):
|
||||
camstr = camstr + cam
|
||||
if n < len(self.cameras) - 1:
|
||||
camstr = camstr + ","
|
||||
await bot.send_text(room, f"Following camera ids are configured:\n{camstr}", event)
|
||||
|
||||
else:
|
||||
cmdindex = 1
|
||||
try:
|
||||
# Check if first argument is numeric (camera id)
|
||||
camid = int(args[0])
|
||||
camid = str(camid)
|
||||
except ValueError:
|
||||
cmdindex = 0
|
||||
camid = '0'
|
||||
category = args[cmdindex]
|
||||
## Quick commands start
|
||||
if category == 'now':
|
||||
if camid != '0':
|
||||
await self.get_snapshot(camid, bot, room, event)
|
||||
elif camid == '0' and len(self.cameras) > 0:
|
||||
for cam in self.cameras:
|
||||
await self.get_snapshot(cam, bot, room, event)
|
||||
else:
|
||||
self.logger.info("User requested snapshots with id 0, but no camera id list configured")
|
||||
await bot.send_text(room, "No camera ids configured", event)
|
||||
return
|
||||
## Quick commands end
|
||||
if category not in self.allowed_cmds:
|
||||
await bot.send_text(room, f'Unknown category: "{args[1]}"', event)
|
||||
return
|
||||
cmdindex = cmdindex + 1
|
||||
if args[cmdindex] not in self.allowed_cmds[category]:
|
||||
await bot.send_text(room, f'Unknown command: "{args[cmdindex]}"', event)
|
||||
return
|
||||
command = args[cmdindex]
|
||||
req_url = f'{self.motionurl}/{camid}/{category}/{command}'
|
||||
if command in self.restricted_cmds:
|
||||
bot.must_be_owner(event)
|
||||
if category == 'config' and command == 'get':
|
||||
queryparam = args[cmdindex + 1]
|
||||
req_url = f'{req_url}?query={queryparam}'
|
||||
elif category == 'config' and command == 'set':
|
||||
param = args[cmdindex + 1]
|
||||
value = args[cmdindex + 2]
|
||||
req_url = f'{req_url}?{param}={value}'
|
||||
if camid != 0 and command == 'snapshot':
|
||||
await self.get_snapshot(camid, bot, room, event)
|
||||
resp = requests.get(req_url).text
|
||||
await bot.send_text(room, resp, event)
|
||||
|
||||
async def get_snapshot(self, camid, bot, room, event):
|
||||
imgurl = f"{self.motionurl.replace(':8080',':8081')}/{camid}/current"
|
||||
self.logger.info(f"Fetching image from {imgurl}")
|
||||
await bot.upload_and_send_image(room, imgurl, event, no_cache=True)
|
||||
|
||||
def help(self):
|
||||
return self.helptext.splitlines()[0]
|
270
modules/flog.py
270
modules/flog.py
|
@ -1,270 +0,0 @@
|
|||
from logging import log
|
||||
import sys
|
||||
import traceback
|
||||
import json
|
||||
import time
|
||||
import datetime
|
||||
import requests
|
||||
import urllib3
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from random import randrange
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
urllib3.disable_warnings()
|
||||
|
||||
# API docs at: https://gitlab.com/lemoidului/ogn-flightbook/-/blob/master/doc/API.md
|
||||
class FlightBook:
|
||||
def __init__(self):
|
||||
self.base_url = 'https://flightbook.glidernet.org/api'
|
||||
self.AC_TYPES = [ '?', 'Glider', 'Towplane', \
|
||||
'Helicopter', 'Parachute', 'Drop plane', 'Hang glider', \
|
||||
'Paraglider', 'Powered', 'Jet', 'UFO', 'Balloon', \
|
||||
'Airship', 'UAV', '?', 'Static object' ]
|
||||
self.logged_flights = dict() # station -> [index of flight]
|
||||
self.device_cache = dict() # Registration -> [address, CN]
|
||||
|
||||
def get_flights(self, icao):
|
||||
log_url = f'{self.base_url}/logbook/{icao}'
|
||||
data = None
|
||||
with requests.Session() as session:
|
||||
response = session.get(log_url, headers={'Connection': 'close'}, verify=False)
|
||||
data = response.json()
|
||||
|
||||
# print(json.dumps(data, sort_keys=True, indent=4))
|
||||
self.update_device_cache(data)
|
||||
return data
|
||||
|
||||
def update_device_cache(self, data):
|
||||
devices = data['devices']
|
||||
for device in devices:
|
||||
if device["address"] and device["registration"]:
|
||||
cache_entry = [device["address"], device["competition"]]
|
||||
self.device_cache[device["registration"]] = cache_entry
|
||||
|
||||
def address_for_registration(self, registration):
|
||||
for reg in self.device_cache.keys():
|
||||
if reg.lower() == registration.lower():
|
||||
return self.device_cache[reg][0]
|
||||
return None
|
||||
|
||||
def address_for_cn(self, cn):
|
||||
for reg in self.device_cache.keys():
|
||||
if self.device_cache[reg][1] == cn.upper():
|
||||
return self.device_cache[reg][0]
|
||||
return None
|
||||
|
||||
def format_time(self, time):
|
||||
if not time:
|
||||
return '··:··'
|
||||
time = time.replace('h', ':')
|
||||
return time
|
||||
|
||||
def flight2string(self, flight, data):
|
||||
devices = data['devices']
|
||||
device = devices[flight['device']]
|
||||
start = self.format_time(flight["start"])
|
||||
end = self.format_time(flight["stop"])
|
||||
duration = ' '
|
||||
if flight["duration"]:
|
||||
duration = time.strftime('%H:%M', time.gmtime(flight["duration"]))
|
||||
maxalt = ''
|
||||
if flight["max_alt"]:
|
||||
maxalt = str(flight["max_alt"]) + 'm'
|
||||
|
||||
identity = f'{device.get("registration") or ""} {device.get("aircraft") or ""} {device.get("competition") or ""} {maxalt}'
|
||||
identity = ' '.join(identity.split())
|
||||
return f'{start} - {end} {duration} {identity}'
|
||||
|
||||
def print_flights(self, data, showtow=False):
|
||||
print(f'✈ Flights at {data["airfield"]["name"]} ({data["airfield"]["code"]}) {data["date"]}:')
|
||||
flights = data['flights']
|
||||
for flight in flights:
|
||||
if not showtow and flight["towing"]:
|
||||
continue
|
||||
print(self.flight2string(flight, data))
|
||||
|
||||
def test():
|
||||
fb = FlightBook()
|
||||
data = fb.get_flights('LFMX')
|
||||
fb.print_flights(data)
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.service_name = 'FLOG'
|
||||
self.station_rooms = dict() # Roomid -> ogn station
|
||||
self.live_rooms = [] # Roomid's with live enabled
|
||||
self.logged_flights = dict() # Station -> number of flights
|
||||
self.first_poll = True
|
||||
self.enabled = False
|
||||
self.fb = FlightBook()
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
self.add_module_aliases(bot, ['sar'])
|
||||
|
||||
async def matrix_poll(self, bot, pollcount):
|
||||
if pollcount % (6 * 5) == 0: # Poll every 5 min
|
||||
await self.poll_implementation(bot)
|
||||
|
||||
async def poll_implementation(self, bot):
|
||||
for roomid in self.live_rooms:
|
||||
station = self.station_rooms[roomid]
|
||||
data = self.fb.get_flights(station)
|
||||
if not data:
|
||||
self.logger.warning(f"FLOG: Failed to get flights at {station}!")
|
||||
return
|
||||
flights = data['flights']
|
||||
|
||||
if len(flights) == 0 or (not station in self.logged_flights):
|
||||
self.logged_flights[station] = []
|
||||
#print('Reset flight count for station ' + station)
|
||||
# else:
|
||||
# print(f'Got {len(flights)} flights at {station}')
|
||||
|
||||
flightindex = 0
|
||||
for flight in flights:
|
||||
if flight["towing"]:
|
||||
continue
|
||||
if flight["stop"]:
|
||||
if not flightindex in self.logged_flights[station]:
|
||||
if not self.first_poll:
|
||||
await bot.send_text(bot.get_room_by_id(roomid), self.fb.flight2string(flight, data))
|
||||
self.logged_flights[station].append(flightindex)
|
||||
# print(f'Logged flights at {station} now {self.logged_flights[station]}')
|
||||
flightindex = flightindex + 1
|
||||
self.first_poll = False
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 1 and args[0] == "!flog":
|
||||
if room.room_id in self.station_rooms:
|
||||
station = self.station_rooms[room.room_id]
|
||||
await self.show_flog(bot, room, station)
|
||||
else:
|
||||
await bot.send_text(room, 'No OGN station set for this room - set it first.')
|
||||
|
||||
elif len(args) == 2 and args[0] == "!flog":
|
||||
if args[1] == 'rmstation':
|
||||
bot.must_be_admin(room, event)
|
||||
del self.station_rooms[room.room_id]
|
||||
self.live_rooms.remove(room.room_id)
|
||||
await bot.send_text(room, f'Cleared OGN station for this room')
|
||||
|
||||
elif args[1] == 'status':
|
||||
print(self.logged_flights)
|
||||
print(self.fb.device_cache)
|
||||
bot.must_be_admin(room, event)
|
||||
await bot.send_text(room, f'OGN station for this room: {self.station_rooms.get(room.room_id)}, live updates enabled: {room.room_id in self.live_rooms}')
|
||||
|
||||
elif args[1] == 'poll':
|
||||
bot.must_be_admin(room, event)
|
||||
await self.poll_implementation(bot)
|
||||
|
||||
elif args[1] == 'live':
|
||||
bot.must_be_admin(room, event)
|
||||
self.live_rooms.append(room.room_id)
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Sending live updates for station {self.station_rooms.get(room.room_id)} to this room')
|
||||
|
||||
elif args[1] == 'rmlive':
|
||||
bot.must_be_admin(room, event)
|
||||
self.live_rooms.remove(room.room_id)
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Not sending live updates for station {self.station_rooms.get(room.room_id)} to this room anymore')
|
||||
|
||||
else:
|
||||
# Assume parameter is a station name
|
||||
station = args[1]
|
||||
await self.show_flog(bot, room, station)
|
||||
elif len(args) == 2 and args[0] == "!sar":
|
||||
registration = args[1]
|
||||
address = self.fb.address_for_registration(registration)
|
||||
if not address:
|
||||
cn = args[1]
|
||||
address = self.fb.address_for_cn(cn)
|
||||
|
||||
coords = None
|
||||
if address:
|
||||
coords = self.get_coords_for_address(address)
|
||||
if coords:
|
||||
await bot.send_location(room, f'{registration} ({coords["utc"]})', coords["lat"], coords["lng"])
|
||||
else:
|
||||
await bot.send_text(room, f'No Flarm ID found for {registration}!')
|
||||
|
||||
elif len(args) == 3 and args[0] == "!flog":
|
||||
if args[1] == 'station':
|
||||
bot.must_be_admin(room, event)
|
||||
|
||||
station = args[2]
|
||||
self.station_rooms[room.room_id] = station
|
||||
self.logger.info(f'Station now for this room {self.station_rooms.get(room.room_id)}')
|
||||
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Set OGN station {station} to this room')
|
||||
|
||||
|
||||
def get_coords_for_address(self, address):
|
||||
# https://flightbook.glidernet.org/api/live/address/~91DADF5B86
|
||||
url = f'{self.fb.base_url}/live/address/{address}'
|
||||
data = None
|
||||
with requests.Session() as session:
|
||||
response = session.get(url, headers={'Connection': 'close'}, verify=False)
|
||||
data = response.json()
|
||||
|
||||
# print(json.dumps(data, sort_keys=True, indent=4))
|
||||
return data
|
||||
|
||||
|
||||
def text_flog(self, data, showtow):
|
||||
out = ""
|
||||
if len(data["flights"]) == 0:
|
||||
out = f'No known flights today at {data["airfield"]["name"]}'
|
||||
else:
|
||||
out = f'Flights at {data["airfield"]["name"]} ({data["airfield"]["code"]}) {data["date"]}:' + "\n"
|
||||
flights = data['flights']
|
||||
for flight in flights:
|
||||
if not showtow and flight["towing"]:
|
||||
continue
|
||||
out = out + self.fb.flight2string(flight, data) + "\n"
|
||||
return out
|
||||
|
||||
def html_flog(self, data, showtow):
|
||||
out = ""
|
||||
if len(data["flights"]) == 0:
|
||||
out = f'No known flights today at {data["airfield"]["name"]}'
|
||||
else:
|
||||
out = f'<b>✈ Flights at {data["airfield"]["name"]} ({data["airfield"]["code"]}) {data["date"]}:' + "</b>\n"
|
||||
flights = data['flights']
|
||||
out = out + "<ul>"
|
||||
for flight in flights:
|
||||
if not showtow and flight["towing"]:
|
||||
continue
|
||||
out = out + "<li>" + self.fb.flight2string(flight, data) + "</li>\n"
|
||||
out = out + "</ul>"
|
||||
return out
|
||||
|
||||
async def show_flog(self, bot, room, station):
|
||||
data = self.fb.get_flights(station)
|
||||
if data:
|
||||
await bot.send_html(room, self.html_flog(data, False), self.text_flog(data, False))
|
||||
else:
|
||||
await bot.send_text(room, f"Failed to get flight log for {station}")
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['station_rooms'] = self.station_rooms
|
||||
data['live_rooms'] = self.live_rooms
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get('station_rooms'):
|
||||
self.station_rooms = data['station_rooms']
|
||||
if data.get('live_rooms'):
|
||||
self.live_rooms = data['live_rooms']
|
||||
|
||||
def help(self):
|
||||
return ('Open Glider Network Field Log')
|
|
@ -1,100 +0,0 @@
|
|||
import urllib.request
|
||||
import urllib.parse
|
||||
import urllib.error
|
||||
|
||||
import requests
|
||||
from nio import AsyncClient, UploadError
|
||||
from nio import UploadResponse
|
||||
|
||||
from collections import namedtuple
|
||||
from modules.common.module import BotModule
|
||||
|
||||
class gfycat(object):
|
||||
"""
|
||||
A very simple module that allows you to
|
||||
1. search a gif on gfycat from a remote location
|
||||
"""
|
||||
|
||||
# Urls
|
||||
url = "https://api.gfycat.com"
|
||||
|
||||
def __init__(self):
|
||||
super(gfycat, self).__init__()
|
||||
|
||||
def __fetch(self, url, param):
|
||||
import json
|
||||
try:
|
||||
# added simple User-Ajent string to avoid CloudFlare block this request
|
||||
headers = {'User-Agent': 'Mozilla/5.0'}
|
||||
req = urllib.request.Request(url+param, headers=headers)
|
||||
connection = urllib.request.urlopen(req).read()
|
||||
except urllib.error.HTTPError as err:
|
||||
raise ValueError(err.read())
|
||||
result = namedtuple("result", "raw json")
|
||||
return result(raw=connection, json=json.loads(connection))
|
||||
|
||||
def search(self, param):
|
||||
result = self.__fetch(self.url, "/v1/gfycats/search?search_text=%s" % urllib.parse.quote_plus(param))
|
||||
if "errorMessage" in result.json:
|
||||
raise ValueError("%s" % self.json["errorMessage"])
|
||||
return _gfycatSearch(result)
|
||||
|
||||
class _gfycatUtils(object):
|
||||
|
||||
"""
|
||||
A utility class that provides the necessary common
|
||||
for all the other classes
|
||||
"""
|
||||
|
||||
def __init__(self, param, json):
|
||||
super(_gfycatUtils, self).__init__()
|
||||
# This can be used for other functions related to this class
|
||||
self.res = param
|
||||
self.js = json
|
||||
|
||||
def raw(self):
|
||||
return self.res.raw
|
||||
|
||||
def json(self):
|
||||
return self.js
|
||||
|
||||
def __len__(self):
|
||||
return len(self.js)
|
||||
|
||||
def get(self, what):
|
||||
try:
|
||||
return self.js[what]
|
||||
except KeyError as error:
|
||||
return ("Sorry, can't find %s" % error)
|
||||
|
||||
class _gfycatSearch(_gfycatUtils):
|
||||
|
||||
"""
|
||||
This class will provide more information for an existing url
|
||||
"""
|
||||
|
||||
def __init__(self, param):
|
||||
super(_gfycatSearch, self).__init__(param, param.json["gfycats"])
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) > 1:
|
||||
gif_url = "No image found"
|
||||
query = event.body[len(args[0])+1:]
|
||||
try:
|
||||
gifs = gfycat().search(query)
|
||||
if len(gifs) < 1:
|
||||
await bot.send_text(room, gif_url)
|
||||
return
|
||||
|
||||
gif_url = gifs.get(0)["content_urls"]["largeGif"]["url"]
|
||||
await bot.upload_and_send_image(room, gif_url)
|
||||
except Exception as exc:
|
||||
gif_url = str(exc)
|
||||
await bot.send_text(room, gif_url)
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !gfycat <query>')
|
||||
|
||||
def help(self):
|
||||
return ('Gfycat bot')
|
|
@ -1,134 +0,0 @@
|
|||
from github import Github
|
||||
import re
|
||||
import json
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
# Helper class with reusable code for github project stuff
|
||||
class GithubProject:
|
||||
# New format to support array of colors: domains={"koneet":["#BFDADC","#0CBBF0","#0CBBF0","#E15D19","#ED49CF"],"tilat":["#0E8A16","#1E8A16"]}
|
||||
def get_domains(description):
|
||||
p = re.compile('domains=\{.*\}')
|
||||
matches = json.loads(p.findall(description)[0][8:])
|
||||
return matches
|
||||
|
||||
def get_domain(reponame, domain):
|
||||
g = Github()
|
||||
repo = g.get_repo(reponame)
|
||||
domains = GithubProject.get_domains(repo.description)
|
||||
if(not len(domains)):
|
||||
return None, None
|
||||
domain_colors = domains.get(domain, None)
|
||||
if not domain_colors:
|
||||
return None, None
|
||||
|
||||
open_issues = repo.get_issues(state='open')
|
||||
domain_labels = []
|
||||
labels = repo.get_labels()
|
||||
for label in labels:
|
||||
for domain_color in domain_colors:
|
||||
if label.color == domain_color[1:]:
|
||||
domain_labels.append(label)
|
||||
|
||||
domain_issues = dict()
|
||||
domain_ok = []
|
||||
for label in domain_labels:
|
||||
label_issues = []
|
||||
for issue in open_issues:
|
||||
if label in issue.labels:
|
||||
label_issues.append(issue)
|
||||
if len(label_issues):
|
||||
domain_issues[label.name] = label_issues
|
||||
else:
|
||||
domain_ok.append(label.name)
|
||||
|
||||
return domain_issues, domain_ok
|
||||
|
||||
def domain_to_string(reponame, issues, ok):
|
||||
text_out = reponame + ":\n"
|
||||
for label in issues.keys():
|
||||
text_out = text_out + f'{label}: '
|
||||
for issue in issues[label]:
|
||||
# todo: add {issue.html_url} when URL previews can be disabled
|
||||
text_out = text_out + f'[{issue.title}] '
|
||||
text_out = text_out + f'\n'
|
||||
|
||||
text_out = text_out + " OK : " + ', '.join(ok)
|
||||
return text_out
|
||||
|
||||
def domain_to_html(reponame, issues, ok):
|
||||
html_out = f'<b>{reponame}:</b> <br/>'
|
||||
for label in issues.keys():
|
||||
html_out = html_out + f'🚧 {label}: '
|
||||
for issue in issues[label]:
|
||||
# todo: add {issue.html_url} when URL previews can be disabled
|
||||
html_out = html_out + f'[{issue.title}] '
|
||||
html_out = html_out + f'<br/>'
|
||||
|
||||
html_out = html_out + " OK ☑️ " + ', '.join(ok)
|
||||
return html_out
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.repo_rooms = dict()
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
|
||||
if len(args) == 1:
|
||||
if args[0] == 'rmrepo':
|
||||
bot.must_be_admin(room, event)
|
||||
del self.repo_rooms[room.room_id]
|
||||
await bot.send_text(room, 'Github repo removed from this room.')
|
||||
bot.save_settings()
|
||||
return
|
||||
if args[0] == 'repo':
|
||||
await bot.send_text(room, f'Github repo for this room is {self.repo_rooms.get(room.room_id, "not set")}.')
|
||||
return
|
||||
|
||||
domain = args[0]
|
||||
reponame = self.repo_rooms.get(room.room_id, None)
|
||||
if reponame:
|
||||
issues, ok = GithubProject.get_domain(reponame, domain)
|
||||
if issues or ok:
|
||||
await self.send_domain_status(bot, room, reponame, issues, ok)
|
||||
else:
|
||||
await bot.send_text(room, f'No labels with domain {domain} found.')
|
||||
else:
|
||||
await bot.send_text(room, f'No github repo set for this room. Use setrepo to set it.')
|
||||
return
|
||||
|
||||
if len(args) == 2:
|
||||
if args[0] == 'setrepo':
|
||||
bot.must_be_admin(room, event)
|
||||
|
||||
reponame = args[1]
|
||||
self.logger.info(f'Adding repo {reponame} to room id {room.room_id}')
|
||||
|
||||
self.repo_rooms[room.room_id] = reponame
|
||||
await bot.send_text(room, f'Github repo {reponame} set to this room.')
|
||||
bot.save_settings()
|
||||
return
|
||||
|
||||
await bot.send_text(room, 'Unknown command')
|
||||
|
||||
async def send_domain_status(self, bot, room, reponame, issues, ok):
|
||||
text_out = GithubProject.domain_to_string(reponame, issues, ok)
|
||||
html_out = GithubProject.domain_to_html(reponame, issues, ok)
|
||||
await bot.send_html(room, html_out, text_out)
|
||||
|
||||
|
||||
def help(self):
|
||||
return 'Github asset management'
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data["repo_rooms"] = self.repo_rooms
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("repo_rooms"):
|
||||
self.repo_rooms = data["repo_rooms"]
|
|
@ -1,60 +0,0 @@
|
|||
import urllib.request
|
||||
import urllib.parse
|
||||
import urllib.error
|
||||
|
||||
import os
|
||||
import giphypop
|
||||
import requests
|
||||
from nio import AsyncClient, UploadError
|
||||
from nio import UploadResponse
|
||||
|
||||
from collections import namedtuple
|
||||
from modules.common.module import BotModule
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
api_key = None
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 3 and args[1] == 'apikey':
|
||||
bot.must_be_owner(event)
|
||||
|
||||
self.api_key = args[2]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, 'Api key set')
|
||||
elif len(args) > 1:
|
||||
gif_url = "No image found"
|
||||
query = event.body[len(args[0])+1:]
|
||||
try:
|
||||
g = giphypop.Giphy(api_key=self.api_key)
|
||||
gifs = []
|
||||
try:
|
||||
for x in g.search(phrase=query, limit=1):
|
||||
gifs.append(x)
|
||||
except Exception:
|
||||
pass
|
||||
if len(gifs) < 1:
|
||||
await bot.send_text(room, gif_url)
|
||||
return
|
||||
|
||||
gif_url = gifs[0].media_url
|
||||
await bot.upload_and_send_image(room, gif_url)
|
||||
return
|
||||
except Exception as exc:
|
||||
gif_url = str(exc)
|
||||
await bot.send_text(room, gif_url)
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !giphy <query>')
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data["api_key"] = self.api_key
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("api_key"):
|
||||
self.api_key = data["api_key"]
|
||||
|
||||
def help(self):
|
||||
return ('Giphy bot')
|
|
@ -6,6 +6,7 @@ class MatrixModule(BotModule):
|
|||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.msg_users = False
|
||||
self.info = "More information at https://github.com/vranki/hemppa"
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
|
|
|
@ -1,41 +0,0 @@
|
|||
import sys
|
||||
import traceback
|
||||
from datetime import datetime, timedelta
|
||||
from random import randrange
|
||||
|
||||
from igramscraper.exception.instagram_not_found_exception import \
|
||||
InstagramNotFoundException
|
||||
from igramscraper.instagram import Instagram
|
||||
|
||||
from modules.common.pollingservice import PollingService
|
||||
|
||||
|
||||
class MatrixModule(PollingService):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.instagram = Instagram()
|
||||
self.service_name = 'Instagram'
|
||||
self.enabled = False
|
||||
|
||||
async def poll_implementation(self, bot, account, roomid, send_messages):
|
||||
try:
|
||||
medias = self.instagram.get_medias(account, 5)
|
||||
self.logger.info(f'Polling instagram account {account} for room {roomid} - got {len(medias)} posts.')
|
||||
for media in medias:
|
||||
if send_messages:
|
||||
if media.identifier not in self.known_ids:
|
||||
await bot.send_html(bot.get_room_by_id(roomid),
|
||||
f'<a href="{media.link}">Instagram {account}:</a> {media.caption}',
|
||||
f'{account}: {media.caption} {media.link}')
|
||||
self.known_ids.add(media.identifier)
|
||||
|
||||
except InstagramNotFoundException:
|
||||
self.logger.error(f"{account} does not exist - deleting from room")
|
||||
self.account_rooms[roomid].remove(account)
|
||||
bot.save_settings()
|
||||
except Exception:
|
||||
self.logger.error('Polling instagram account failed:')
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
|
||||
polldelay = timedelta(minutes=30 + randrange(30))
|
||||
self.next_poll_time[roomid] = datetime.now() + polldelay
|
|
@ -1,45 +0,0 @@
|
|||
from nio import RoomMessageUnknown, UnknownEvent
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
bot = None
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
self.bot = bot
|
||||
bot.client.add_event_callback(self.unknownevent_cb, (UnknownEvent,))
|
||||
|
||||
def matrix_stop(self, bot):
|
||||
super().matrix_stop(bot)
|
||||
bot.remove_callback(self.unknownevent_cb)
|
||||
|
||||
async def unknownevent_cb(self, room, event):
|
||||
try:
|
||||
if 'type' in event.source and event.source['type'] == 'im.vector.modular.widgets' and event.source['content']['type'] == 'jitsi':
|
||||
# Todo: Domain not found in Element Android events!
|
||||
domain = event.source['content']['data']['domain']
|
||||
conferenceId = event.source['content']['data']['conferenceId']
|
||||
isAudioOnly = event.source['content']['data']['isAudioOnly']
|
||||
sender = event.source['sender']
|
||||
sender_response = await self.bot.client.get_displayname(event.sender)
|
||||
sender = sender_response.displayname
|
||||
# This is just a guess - is this the proper way to generate URL? Probably not.
|
||||
jitsiUrl = f'https://{domain}/{conferenceId}'
|
||||
|
||||
calltype = 'video call'
|
||||
if isAudioOnly:
|
||||
calltype = 'audio call'
|
||||
|
||||
plainMessage = f'{sender} started a {calltype}: {jitsiUrl}'
|
||||
htmlMessage = f'{sender} started a <a href="{jitsiUrl}">{calltype}</a>'
|
||||
await self.bot.send_html(room, htmlMessage, plainMessage)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed parsing Jitsi event. Error: {e}")
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
pass
|
||||
|
||||
def help(self):
|
||||
return 'Sends text links when user starts a Jitsi video or audio call in room'
|
148
modules/loc.py
148
modules/loc.py
|
@ -1,148 +0,0 @@
|
|||
from geopy.geocoders import Nominatim
|
||||
from nio import RoomMessageUnknown
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.bot = None
|
||||
self.enabled_rooms = []
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
self.bot = bot
|
||||
bot.client.add_event_callback(self.unknown_cb, RoomMessageUnknown)
|
||||
|
||||
def matrix_stop(self, bot):
|
||||
super().matrix_stop(bot)
|
||||
bot.remove_callback(self.unknown_cb)
|
||||
|
||||
'''
|
||||
Location events are like: https://spec.matrix.org/v1.2/client-server-api/#mlocation
|
||||
{
|
||||
"content": {
|
||||
"body": "geo:61.49342512194717,23.765914658307736",
|
||||
"geo_uri": "geo:61.49342512194717,23.765914658307736",
|
||||
"msgtype": "m.location",
|
||||
"org.matrix.msc1767.text": "geo:61.49342512194717,23.765914658307736",
|
||||
"org.matrix.msc3488.asset": {
|
||||
"type": "m.pin"
|
||||
},
|
||||
"org.matrix.msc3488.location": {
|
||||
"description": "geo:61.49342512194717,23.765914658307736",
|
||||
"uri": "geo:61.49342512194717,23.765914658307736"
|
||||
},
|
||||
"org.matrix.msc3488.ts": 1653837929839
|
||||
},
|
||||
"room_id": "!xsBGdLYGrfYhGfLtHG:hacklab.fi",
|
||||
"type": "m.room.message"
|
||||
}
|
||||
|
||||
BUT sometimes there's ; separating altitude??
|
||||
{
|
||||
"content": {
|
||||
"body": "geo:61.4704211,23.4864855;36.900001525878906",
|
||||
"geo_uri": "geo:61.4704211,23.4864855;36.900001525878906",
|
||||
"msgtype": "m.location",
|
||||
"org.matrix.msc1767.text": "geo:61.4704211,23.4864855;36.900001525878906",
|
||||
"org.matrix.msc3488.asset": {
|
||||
"type": "m.self"
|
||||
},
|
||||
"org.matrix.msc3488.location": {
|
||||
"description": "geo:61.4704211,23.4864855;36.900001525878906",
|
||||
"uri": "geo:61.4704211,23.4864855;36.900001525878906"
|
||||
},
|
||||
"org.matrix.msc3488.ts": 1653931683087
|
||||
},
|
||||
"origin_server_ts": 1653931683998,
|
||||
"sender": "@cos:hacklab.fi",
|
||||
"type": "m.room.message",
|
||||
"unsigned": {
|
||||
"age": 70
|
||||
},
|
||||
"event_id": "$6xXutKF9EppPMMdc4aQLZjHyd8My0rIZuNZEcuSIPws",
|
||||
"room_id": "!CLofqdurVWZCMpFnqM:hacklab.fi"
|
||||
}
|
||||
'''
|
||||
|
||||
async def unknown_cb(self, room, event):
|
||||
if event.msgtype != 'm.location':
|
||||
return
|
||||
if room.room_id not in self.enabled_rooms:
|
||||
return
|
||||
location_text = event.content['body']
|
||||
|
||||
# Fallback if body is empty
|
||||
if (len(location_text) == 0) or ('geo:' in location_text):
|
||||
location_text = 'location'
|
||||
|
||||
sender_response = await self.bot.client.get_displayname(event.sender)
|
||||
sender = sender_response.displayname
|
||||
|
||||
geo_uri = event.content['geo_uri']
|
||||
try:
|
||||
geo_uri = geo_uri[4:] # Strip geo:
|
||||
|
||||
if ';' in geo_uri: # Strip altitude, if present
|
||||
geo_uri = geo_uri.split(';')[0]
|
||||
latlon = geo_uri.split(',')
|
||||
|
||||
# Sanity checks to avoid url manipulation
|
||||
float(latlon[0])
|
||||
float(latlon[1])
|
||||
except Exception:
|
||||
self.bot.send_text(room, "Error: Invalid location " + geo_uri)
|
||||
return
|
||||
|
||||
osm_link = f"https://www.openstreetmap.org/?mlat={latlon[0]}&mlon={latlon[1]}"
|
||||
|
||||
plain = f'{sender} sent {location_text} {osm_link} 🚩'
|
||||
html = f'{sender} sent <a href="{osm_link}">{location_text}</a> 🚩'
|
||||
|
||||
await self.bot.send_html(room, html, plain)
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
if len(args) == 0:
|
||||
await bot.send_text(room, 'Usage: !loc <location name>')
|
||||
return
|
||||
elif len(args) == 1:
|
||||
if args[0] == 'enable':
|
||||
bot.must_be_admin(room, event)
|
||||
self.enabled_rooms.append(room.room_id)
|
||||
self.enabled_rooms = list(dict.fromkeys(self.enabled_rooms)) # Deduplicate
|
||||
await bot.send_text(room, "Ok, sending locations events here as text versions")
|
||||
bot.save_settings()
|
||||
return
|
||||
if args[0] == 'disable':
|
||||
bot.must_be_admin(room, event)
|
||||
self.enabled_rooms.remove(room.room_id)
|
||||
await bot.send_text(room, "Ok, disabled here")
|
||||
bot.save_settings()
|
||||
return
|
||||
|
||||
query = event.body[4:]
|
||||
geolocator = Nominatim(user_agent=bot.appid)
|
||||
self.logger.info('loc: looking up %s ..', query)
|
||||
location = geolocator.geocode(query)
|
||||
self.logger.info('loc rx %s', location)
|
||||
if location:
|
||||
await bot.send_location(room, location.address, location.latitude, location.longitude, "m.pin")
|
||||
else:
|
||||
await bot.send_text(room, "Can't find " + query + " on map!")
|
||||
|
||||
def help(self):
|
||||
return 'Search for locations and display Matrix location events as OSM links'
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data["enabled_rooms"] = self.enabled_rooms
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("enabled_rooms"):
|
||||
self.enabled_rooms = data["enabled_rooms"]
|
149
modules/md.py
149
modules/md.py
|
@ -1,149 +0,0 @@
|
|||
from mastodon import Mastodon
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
apps = dict() # instance url <-> [app_id, app_secret]
|
||||
logins = dict() # mxid <-> [username, accesstoken, instanceurl]
|
||||
roomlogins = dict() # roomid <-> [username, accesstoken, instanceurl]
|
||||
public = False
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
if len(args) >= 1:
|
||||
if args[0] == "toot":
|
||||
toot_body = " ".join(args[1:])
|
||||
accesstoken = None
|
||||
if room.room_id in self.roomlogins.keys():
|
||||
bot.must_be_admin(room, event)
|
||||
username = self.roomlogins[room.room_id][0]
|
||||
accesstoken = self.roomlogins[room.room_id][1]
|
||||
instanceurl = self.roomlogins[room.room_id][2]
|
||||
elif event.sender in self.logins.keys():
|
||||
if not self.public:
|
||||
bot.must_be_owner(event)
|
||||
username = self.logins[event.sender][0]
|
||||
accesstoken = self.logins[event.sender][1]
|
||||
instanceurl = self.logins[event.sender][2]
|
||||
if accesstoken:
|
||||
toottodon = Mastodon(
|
||||
access_token = accesstoken,
|
||||
api_base_url = instanceurl
|
||||
)
|
||||
tootdict = toottodon.toot(toot_body)
|
||||
await bot.send_text(room, tootdict['url'])
|
||||
else:
|
||||
await bot.send_text(room, f'{event.sender} has not logged in yet with the bot. Please do so.')
|
||||
return
|
||||
|
||||
if len(args) == 4:
|
||||
if args[0] == "login":
|
||||
if not self.public:
|
||||
bot.must_be_owner(event)
|
||||
mxid = event.sender
|
||||
instanceurl = args[1]
|
||||
username = args[2]
|
||||
password = args[3]
|
||||
await self.register_app_if_necessary(bot, room, instanceurl)
|
||||
await self.login_to_account(bot, room, mxid, None, instanceurl, username, password)
|
||||
return
|
||||
if len(args) == 5:
|
||||
if args[0] == "roomlogin":
|
||||
if not self.public:
|
||||
bot.must_be_owner(event)
|
||||
roomalias = args[1]
|
||||
instanceurl = args[2]
|
||||
username = args[3]
|
||||
password = args[4]
|
||||
roomid = await bot.get_room_by_alias(roomalias)
|
||||
if roomid:
|
||||
await self.register_app_if_necessary(bot, room, instanceurl)
|
||||
await self.login_to_account(bot, room, None, roomid, instanceurl, username, password)
|
||||
else:
|
||||
await bot.send_text(room, f'Unknown room alias {roomalias} - invite bot to the room first.')
|
||||
return
|
||||
if len(args) == 1:
|
||||
if args[0] == "status":
|
||||
out = f'App registered on {len(self.apps)} instances, public use enabled: {self.public}\n'
|
||||
out = out + f'{len(self.logins)} users logged in:\n'
|
||||
for login in self.logins.keys():
|
||||
out = out + f' - {login} as {self.logins[login][0]} on {self.logins[login][2]}\n'
|
||||
out = out + f'{len(self.roomlogins)} per-room logins:\n'
|
||||
for roomlogin in self.roomlogins:
|
||||
out = out + f' - {roomlogin} as {self.roomlogins[roomlogin][0]} on {self.roomlogins[roomlogin][2]}\n'
|
||||
|
||||
await bot.send_text(room, out)
|
||||
if args[0] == "logout":
|
||||
if event.sender in self.logins.keys():
|
||||
# TODO: Is there a way to invalidate the access token with API?
|
||||
del self.logins[event.sender]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'{event.sender} login data removed from the bot.')
|
||||
if args[0] == "roomlogout":
|
||||
bot.must_be_admin(room, event)
|
||||
if room.room_id in self.roomlogins.keys():
|
||||
del self.roomlogins[room.room_id]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Login data for this room removed from the bot.')
|
||||
else:
|
||||
await bot.send_text(room, f'No login found for room id {room.room_id}.')
|
||||
if args[0] == "clear":
|
||||
bot.must_be_owner(event)
|
||||
self.logins = dict()
|
||||
self.roomlogins = dict()
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'All Mastodon logins cleared')
|
||||
if args[0] == "setpublic":
|
||||
bot.must_be_owner(event)
|
||||
self.public = True
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Mastodon usage is now public use')
|
||||
if args[0] == "setprivate":
|
||||
bot.must_be_owner(event)
|
||||
self.public = False
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Mastodon usage is now restricted to bot owners')
|
||||
|
||||
async def register_app_if_necessary(self, bot, room, instanceurl):
|
||||
if not instanceurl in self.apps.keys():
|
||||
app = Mastodon.create_app(f'Hemppa The Bot - {bot.client.user}', api_base_url = instanceurl)
|
||||
self.apps[instanceurl] = [app[0], app[1]]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Registered Mastodon app on {instanceurl}')
|
||||
|
||||
async def login_to_account(self, bot, room, mxid, roomid, instanceurl, username, password):
|
||||
mastodon = Mastodon(client_id = self.apps[instanceurl][0], client_secret = self.apps[instanceurl][1], api_base_url = instanceurl)
|
||||
access_token = mastodon.log_in(username, password)
|
||||
print('login_To_account', mxid, roomid)
|
||||
if mxid:
|
||||
self.logins[mxid] = [username, access_token, instanceurl]
|
||||
await bot.send_text(room, f'Logged Matrix user {mxid} into {instanceurl} as {username}')
|
||||
elif roomid:
|
||||
self.roomlogins[roomid] = [username, access_token, instanceurl]
|
||||
await bot.send_text(room, f'Set room {roomid} Mastodon user to {username} on {instanceurl}')
|
||||
|
||||
bot.save_settings()
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['apps'] = self.apps
|
||||
data['logins'] = self.logins
|
||||
data['roomlogins'] = self.roomlogins
|
||||
data['public'] = self.public
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("apps"):
|
||||
self.apps = data["apps"]
|
||||
if data.get("logins"):
|
||||
self.logins = data["logins"]
|
||||
if data.get("roomlogins"):
|
||||
self.roomlogins = data["roomlogins"]
|
||||
if data.get("public"):
|
||||
self.public = data["public"]
|
||||
|
||||
def help(self):
|
||||
return ('Mastodon')
|
|
@ -1,20 +0,0 @@
|
|||
import urllib.request
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 2:
|
||||
icao = args[1]
|
||||
metar_url = "https://tgftp.nws.noaa.gov/data/observations/metar/stations/" + \
|
||||
icao.upper() + ".TXT"
|
||||
response = urllib.request.urlopen(metar_url)
|
||||
lines = response.readlines()
|
||||
await bot.send_text(room, lines[1].decode("utf-8").strip())
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !metar <icao code>')
|
||||
|
||||
def help(self):
|
||||
return ('Metar data access (usage: !metar <icao code>)')
|
|
@ -1,96 +0,0 @@
|
|||
from modules.common.module import BotModule
|
||||
import random
|
||||
import socket
|
||||
from struct import pack, unpack
|
||||
import time
|
||||
|
||||
# Modified from https://gist.github.com/azlux/315c924af4800ffbc2c91db3ab8a59bc
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.host = None
|
||||
self.port = 64738
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get('host'):
|
||||
self.host = data['host']
|
||||
if data.get('port'):
|
||||
self.port = data['port']
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['host'] = self.host
|
||||
data['port'] = self.port
|
||||
return data
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
|
||||
if len(args) > 1 and args[1] in ['set', 'setserver']:
|
||||
bot.must_be_owner(event)
|
||||
self.logger.info(f"room: {room.name} sender: {event.sender} is setting the server settings")
|
||||
if len(args) < 3:
|
||||
self.host = None
|
||||
return await bot.send_text(room, f'Usage: !{args[0]} {args[1]} [host] ([port])')
|
||||
self.host = args[2]
|
||||
if len(args) > 3:
|
||||
self.port = int(args[3])
|
||||
if not self.port:
|
||||
self.port = 64738
|
||||
bot.save_settings()
|
||||
return await bot.send_text(room, f'Set server settings: host: {self.host} port: {self.port}')
|
||||
|
||||
self.logger.info(f"room: {room.name} sender: {event.sender} wants mumble info")
|
||||
if not self.host:
|
||||
return await bot.send_text(room, f'No mumble host info set!')
|
||||
|
||||
try:
|
||||
ret = self.mumble_ping()
|
||||
# https://wiki.mumble.info/wiki/Protocol
|
||||
# [0,1,2,3] = version
|
||||
version = '.'.join(map(str, ret[1:4]))
|
||||
# [4] = identifier passed to the server (used here to get ping time)
|
||||
ping = int(time.time() * 1000) - ret[4]
|
||||
# [7] = bandwidth
|
||||
# [5] = users
|
||||
# [6] = max users
|
||||
await bot.send_text(room, f'{self.host}:{self.port} (v{version}): {ret[5]} / {ret[6]} (ping: {ping}ms)')
|
||||
except socket.gaierror as e:
|
||||
self.logger.error(f"room: {room.name}: mumble_ping failed: {e}")
|
||||
await bot.send_text(room, f'Could not get get mumble server info: {e}')
|
||||
|
||||
def mumble_ping(self):
|
||||
addrinfo = socket.getaddrinfo(self.host, self.port, 0, 0, socket.SOL_UDP)
|
||||
|
||||
for (family, socktype, proto, canonname, sockaddr) in addrinfo:
|
||||
s = socket.socket(family, socktype, proto=proto)
|
||||
s.settimeout(2)
|
||||
|
||||
buf = pack(">iQ", 0, int(time.time() * 1000))
|
||||
try:
|
||||
s.sendto(buf, sockaddr)
|
||||
except (socket.gaierror, socket.timeout) as e:
|
||||
continue
|
||||
|
||||
try:
|
||||
data, addr = s.recvfrom(1024)
|
||||
except socket.timeout:
|
||||
continue
|
||||
|
||||
return unpack(">bbbbQiii", data)
|
||||
|
||||
def help(self):
|
||||
return 'Show info about a mumble server'
|
||||
|
||||
def long_help(self):
|
||||
text = self.help() + (
|
||||
'\n- "!mumble": Get the status of the configured mumble server')
|
||||
|
||||
if bot and event and bot.is_owner(event):
|
||||
text += (
|
||||
'\nOwner commands:'
|
||||
'\n- "!mumble set [host] ([port])": Set use the following host and port'
|
||||
'\n- If no port is given, defaults to 64738')
|
||||
return text
|
|
@ -1,30 +0,0 @@
|
|||
from modules.common.module import BotModule
|
||||
import requests, json
|
||||
import traceback
|
||||
|
||||
from modules.common.pollingservice import PollingService
|
||||
|
||||
class MatrixModule(PollingService):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.service_name = 'MXMA'
|
||||
self.poll_interval_min = 5
|
||||
self.poll_interval_random = 2
|
||||
self.owner_only = True
|
||||
self.send_all = True
|
||||
self.enabled = False
|
||||
|
||||
async def poll_implementation(self, bot, account, roomid, send_messages):
|
||||
try:
|
||||
response = requests.get(url=account, timeout=5)
|
||||
if response.status_code == 200:
|
||||
if 'messages' in response.json():
|
||||
messages = response.json()['messages']
|
||||
for message in messages:
|
||||
success = await bot.send_msg(message['to'], message['title'], message['message'])
|
||||
except Exception:
|
||||
self.logger.error('Polling MXMA failed:')
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
|
||||
def help(self):
|
||||
return 'Matrix messaging API'
|
|
@ -1,47 +0,0 @@
|
|||
import re
|
||||
import urllib.request
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 2 and len(args[1]) == 4:
|
||||
icao = args[1].upper()
|
||||
notam = self.get_notam(icao)
|
||||
await bot.send_text(room, notam)
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !notam <icao code>')
|
||||
|
||||
def help(self):
|
||||
return ('NOTAM data access (usage: !notam <icao code>) - Currently Finnish airports only')
|
||||
|
||||
# TODO: This handles only finnish airports. Implement support for other countries.
|
||||
def get_notam(self, icao):
|
||||
if not icao.startswith('EF'):
|
||||
return ('Only Finnish airports supported currently, sorry.')
|
||||
|
||||
icao_first_letter = icao[2]
|
||||
if icao_first_letter < 'M':
|
||||
notam_url = "https://www.ais.fi/ais/bulletins/envfra.htm"
|
||||
else:
|
||||
notam_url = "https://www.ais.fi/ais/bulletins/envfrm.htm"
|
||||
|
||||
response = urllib.request.urlopen(notam_url)
|
||||
lines = response.readlines()
|
||||
lines = b''.join(lines)
|
||||
lines = lines.decode("ISO-8859-1")
|
||||
# Strip EN-ROUTE from end
|
||||
lines = lines[0:lines.find('<a name="EN-ROUTE">')]
|
||||
|
||||
startpos = lines.find('<a name="' + icao + '">')
|
||||
if startpos > -1:
|
||||
endpos = lines.find('<h3>', startpos)
|
||||
if endpos == -1:
|
||||
endpos = len(lines)
|
||||
notam = lines[startpos:endpos]
|
||||
notam = re.sub('<[^<]+?>', ' ', notam)
|
||||
if len(notam) > 4:
|
||||
return notam
|
||||
return f'Cannot parse notam for {icao} at {notam_url}'
|
|
@ -1,116 +0,0 @@
|
|||
from modules.common.module import BotModule
|
||||
from nio import RoomMessageMedia
|
||||
from typing import Optional
|
||||
import sys
|
||||
import traceback
|
||||
import cups
|
||||
import httpx
|
||||
import aiofiles
|
||||
import os
|
||||
|
||||
# Credit: https://medium.com/swlh/how-to-boost-your-python-apps-using-httpx-and-asynchronous-calls-9cfe6f63d6ad
|
||||
async def download_file(url: str, filename: Optional[str] = None) -> str:
|
||||
filename = filename or url.split("/")[-1]
|
||||
filename = f"/tmp/{filename}"
|
||||
client = httpx.AsyncClient()
|
||||
async with client.stream("GET", url) as resp:
|
||||
resp.raise_for_status()
|
||||
async with aiofiles.open(filename, "wb") as f:
|
||||
async for data in resp.aiter_bytes():
|
||||
if data:
|
||||
await f.write(data)
|
||||
await client.aclose()
|
||||
return filename
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.printers = dict() # roomid <-> printername
|
||||
self.bot = None
|
||||
self.paper_size = 'A4' # Todo: configurable
|
||||
self.enabled = False
|
||||
|
||||
async def file_cb(self, room, event):
|
||||
try:
|
||||
if self.bot.should_ignore_event(event):
|
||||
return
|
||||
if room.room_id in self.printers:
|
||||
printer = self.printers[room.room_id]
|
||||
self.logger.debug(f'RX file - MXC {event.url} - from {event.sender}')
|
||||
https_url = await self.bot.client.mxc_to_http(event.url)
|
||||
self.logger.debug(f'HTTPS URL {https_url}')
|
||||
filename = await download_file(https_url)
|
||||
self.logger.debug(f'RX filename {filename}')
|
||||
conn = cups.Connection ()
|
||||
conn.printFile(printer, filename, f"Printed from Matrix - {filename}", {'fit-to-page': 'TRUE', 'PageSize': self.paper_size})
|
||||
await self.bot.send_text(room, f'Printing file on {printer}..')
|
||||
os.remove(filename) # Not sure if we should wait first?
|
||||
else:
|
||||
self.logger.debug(f'No printer configured for room {room.room_id}')
|
||||
except:
|
||||
self.logger.warning(f"File callback failure")
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
await self.bot.send_text(room, f'Printing failed, sorry. See log for details.')
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
bot.client.add_event_callback(self.file_cb, RoomMessageMedia)
|
||||
self.bot = bot
|
||||
|
||||
def matrix_stop(self, bot):
|
||||
super().matrix_stop(bot)
|
||||
bot.remove_callback(self.file_cb)
|
||||
self.bot = None
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
bot.must_be_owner(event)
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
conn = cups.Connection ()
|
||||
printers = conn.getPrinters ()
|
||||
|
||||
if len(args) == 1:
|
||||
if args[0] == 'list':
|
||||
msg = f"Available printers:\n"
|
||||
for printer in printers:
|
||||
print(printer, printers[printer]["device-uri"])
|
||||
msg += f' - {printer} / {printers[printer]["device-uri"]}'
|
||||
for roomid, printerid in self.printers.items():
|
||||
if printerid == printer:
|
||||
msg += f' <- room {roomid}'
|
||||
msg += '\n'
|
||||
await bot.send_text(room, msg)
|
||||
elif args[0] == 'rmroomprinter':
|
||||
del self.printers[room.room_id]
|
||||
await bot.send_text(room, f'Deleted printer from this room.')
|
||||
bot.save_settings()
|
||||
|
||||
if len(args) == 2:
|
||||
if args[0] == 'setroomprinter':
|
||||
printer = args[1]
|
||||
if printer in printers:
|
||||
await bot.send_text(room, f'Printing with {printer} here.')
|
||||
self.printers[room.room_id] = printer
|
||||
bot.save_settings()
|
||||
else:
|
||||
await bot.send_text(room, f'No printer called {printer} in your CUPS.')
|
||||
if args[0] == 'setpapersize':
|
||||
self.paper_size = args[1]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Paper size set to {self.paper_size}.')
|
||||
|
||||
def help(self):
|
||||
return 'Print files from Matrix'
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data["printers"] = self.printers
|
||||
data["paper_size"] = self.paper_size
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("printers"):
|
||||
self.printers = data["printers"]
|
||||
if data.get("paper_size"):
|
||||
self.paper_size = data["paper_size"]
|
|
@ -1,80 +0,0 @@
|
|||
from typing import Text
|
||||
import urllib
|
||||
import urllib.request
|
||||
from urllib.parse import urlencode, quote_plus
|
||||
import json
|
||||
import time
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
class PeerTubeClient:
|
||||
def __init__(self):
|
||||
self.instance_url = 'https://sepiasearch.org/'
|
||||
|
||||
def search(self, search_string, count=0):
|
||||
if count == 0:
|
||||
count = 15 # Pt default, could also remove from params..
|
||||
params = urlencode({'search': search_string, 'count': count}, quote_via=quote_plus)
|
||||
search_url = self.instance_url + 'api/v1/search/videos?' + params
|
||||
response = urllib.request.urlopen(search_url)
|
||||
data = json.loads(response.read().decode("utf-8"))
|
||||
return data
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.instance_url = 'https://sepiasearch.org/'
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
self.add_module_aliases(bot, ['ptall'])
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 3:
|
||||
if args[1] == "setinstance":
|
||||
bot.must_be_owner(event)
|
||||
self.instance_url = args[2]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, 'Instance url set to ' + self.instance_url, bot_ignore=True)
|
||||
return
|
||||
|
||||
if len(args) == 2:
|
||||
if args[1] == "showinstance":
|
||||
await bot.send_text(room, 'Using instance at ' + self.instance_url, bot_ignore=True)
|
||||
return
|
||||
|
||||
if len(args) > 1:
|
||||
query = event.body[len(args[0])+1:]
|
||||
p = PeerTubeClient()
|
||||
p.instance_url = self.instance_url
|
||||
count = 1
|
||||
if args[0] == '!ptall':
|
||||
count = 0
|
||||
data = p.search(query, count)
|
||||
if len(data['data']) > 0:
|
||||
for video in data['data']:
|
||||
video_url = video.get("url") or self.instance_url + 'videos/watch/' + video["uuid"]
|
||||
duration = time.strftime('%H:%M:%S', time.gmtime(video["duration"]))
|
||||
instancedata = video["account"]["host"]
|
||||
html = f'<a href="{video_url}">{video["name"]}</a> {video["description"] or ""} [{duration}] @ {instancedata}'
|
||||
text = f'{video_url} : {video["name"]} {video.get("description") or ""} [{duration}]'
|
||||
await bot.send_html(room, html, text, bot_ignore=True)
|
||||
else:
|
||||
await bot.send_text(room, 'Sorry, no videos found found.', bot_ignore=True)
|
||||
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !pt <query> or !ptall <query> to return all results')
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['instance_url'] = self.instance_url
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("instance_url"):
|
||||
self.instance_url = data["instance_url"]
|
||||
|
||||
def help(self):
|
||||
return ('PeerTube search')
|
|
@ -1,19 +0,0 @@
|
|||
from modules.common.module import BotModule
|
||||
import urllib.request
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
day = 0
|
||||
hour = 12
|
||||
if len(args) >= 1:
|
||||
day = int(args[0]) - 1
|
||||
if len(args) == 2:
|
||||
hour = int(args[1])
|
||||
|
||||
imgurl = 'http://ennuste.ilmailuliitto.fi/' + str(day) + '/wstar_bsratio.curr.' + str(hour) + '00lst.d2.png'
|
||||
await bot.upload_and_send_image(room, imgurl, f"RASP Day {day+1} at {hour}:00", no_cache=True)
|
||||
|
||||
def help(self):
|
||||
return 'RASP Gliding Weather forecast, Finland only'
|
109
modules/relay.py
109
modules/relay.py
|
@ -1,109 +0,0 @@
|
|||
from modules.common.module import BotModule
|
||||
from nio import RoomMessageText
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.bridges = dict()
|
||||
self.bot = None
|
||||
self.enabled = False
|
||||
|
||||
async def message_cb(self, room, event):
|
||||
if self.bot.should_ignore_event(event):
|
||||
return
|
||||
|
||||
if event.body.startswith('!'):
|
||||
return
|
||||
|
||||
source_id = None
|
||||
target_id = None
|
||||
|
||||
for src_id, tgt_id in self.bridges.items():
|
||||
if room.room_id == src_id:
|
||||
source_id = src_id
|
||||
target_id = tgt_id
|
||||
elif room.room_id == tgt_id:
|
||||
source_id = tgt_id
|
||||
target_id = src_id
|
||||
|
||||
if not source_id or not target_id:
|
||||
return
|
||||
|
||||
target_room = self.bot.get_room_by_id(target_id)
|
||||
if(target_room):
|
||||
sendernick = target_room.user_name(event.sender)
|
||||
if not sendernick:
|
||||
sendernick = event.sender
|
||||
await self.bot.send_text(target_room, f'<{sendernick}> {event.body}', msgtype="m.text", bot_ignore=True)
|
||||
else:
|
||||
self.logger.warning(f"Bot doesn't seem to be in bridged room {target_id}")
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
bot.client.add_event_callback(self.message_cb, RoomMessageText)
|
||||
self.bot = bot
|
||||
|
||||
def matrix_stop(self, bot):
|
||||
super().matrix_stop(bot)
|
||||
bot.remove_callback(self.message_cb)
|
||||
self.bot = None
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
bot.must_be_admin(room, event)
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
if len(args) == 1:
|
||||
if args[0] == 'list':
|
||||
i = 1
|
||||
msg = f"Active relay bridges ({len(self.bridges)}):\n"
|
||||
for src_id, tgt_id in self.bridges.items():
|
||||
srcroom = self.bot.get_room_by_id(src_id)
|
||||
tgtroom = self.bot.get_room_by_id(tgt_id)
|
||||
|
||||
if srcroom:
|
||||
srcroom = srcroom.display_name
|
||||
else:
|
||||
srcroom = f'??? {src_id}'
|
||||
|
||||
if tgtroom:
|
||||
tgtroom = tgtroom.display_name
|
||||
else:
|
||||
tgtroom = f'??? {tgt_id}'
|
||||
|
||||
msg += f'{i}: {srcroom} <-> {tgtroom}'
|
||||
i = i + 1
|
||||
await bot.send_text(room, msg)
|
||||
|
||||
if len(args) == 2:
|
||||
if args[0] == 'bridge':
|
||||
roomid = args[1]
|
||||
room_to_bridge = bot.get_room_by_id(roomid)
|
||||
if room_to_bridge:
|
||||
await bot.send_text(room, f'Bridging {room_to_bridge.display_name} here.')
|
||||
self.bridges[room.room_id] = roomid
|
||||
bot.save_settings()
|
||||
else:
|
||||
await bot.send_text(room, f'I am not on room with id {roomid} (note: use id, not alias)!')
|
||||
elif args[0] == 'unbridge':
|
||||
idx = int(args[1]) - 1
|
||||
i = 0
|
||||
for src_id, tgt_id in self.bridges.items():
|
||||
if i == idx:
|
||||
del self.bridges[src_id]
|
||||
await bot.send_text(room, f'Unbridged {src_id} and {tgt_id}.')
|
||||
bot.save_settings()
|
||||
return
|
||||
i = i + 1
|
||||
|
||||
def help(self):
|
||||
return 'Simple relaybot between two Matrix rooms'
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data["bridges"] = self.bridges
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("bridges"):
|
||||
self.bridges = data["bridges"]
|
|
@ -1,66 +0,0 @@
|
|||
from modules.common.pollingservice import PollingService
|
||||
from urllib.request import urlopen
|
||||
import json
|
||||
import time
|
||||
|
||||
class MatrixModule(PollingService):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.accountroomid_laststatus = {}
|
||||
self.template = '{spacename} is now {open_closed}'
|
||||
self.i18n = {'open': 'open 🔓', 'closed': 'closed 🔒'}
|
||||
|
||||
async def poll_implementation(self, bot, account, roomid, send_messages):
|
||||
self.logger.debug(f'polling space api {account}.')
|
||||
spacename, is_open = MatrixModule.open_status(account)
|
||||
|
||||
open_str = self.i18n['open'] if is_open else self.i18n['closed']
|
||||
text = self.template.format(spacename=spacename, open_closed=open_str)
|
||||
self.logger.debug(text)
|
||||
|
||||
last_status = self.accountroomid_laststatus.get(account+roomid, False)
|
||||
if send_messages and last_status != is_open:
|
||||
await bot.send_text(bot.get_room_by_id(roomid), text)
|
||||
self.accountroomid_laststatus[account+roomid] = is_open
|
||||
bot.save_settings()
|
||||
|
||||
@staticmethod
|
||||
def open_status(spaceurl):
|
||||
with urlopen(spaceurl, timeout=5) as response:
|
||||
js = json.load(response)
|
||||
|
||||
return js['space'], js['state']['open']
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['laststatus'] = self.accountroomid_laststatus
|
||||
data['template'] = self.template
|
||||
data['i18n'] = self.i18n
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get('laststatus'):
|
||||
self.accountroomid_laststatus = data['laststatus']
|
||||
if data.get('template'):
|
||||
self.template = data['template']
|
||||
if data.get('i18n'):
|
||||
self.i18n = data['i18n']
|
||||
|
||||
def help(self):
|
||||
return "Notify about Space-API status changes (open or closed)."
|
||||
|
||||
def long_help(self, bot, event, **kwargs):
|
||||
text = self.help() + \
|
||||
' This is a polling service. Therefore there are additional ' + \
|
||||
'commands: list, debug, poll, clear, add URL, del URL\n' + \
|
||||
'!spaceapi add URL: to add a space-api endpoint\n' + \
|
||||
'!spaceapi list: to list the endpoint configured for this room.\n' + \
|
||||
f'I will look for changes roughly every {self.poll_interval_min} ' + \
|
||||
'minutes. Find out more about Space-API at https://spaceapi.io/.'
|
||||
if bot.is_owner(event):
|
||||
text += '\nA template and I18N can be configured via settings of ' + \
|
||||
'the module. Use "!bot export spacepi", then change the ' + \
|
||||
'settings and import again with "!bot import spacepi SETTINGS".'
|
||||
|
||||
return text
|
|
@ -1,23 +0,0 @@
|
|||
import urllib.request
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 2:
|
||||
icao = args[1]
|
||||
taf_url = "https://aviationweather.gov/adds/dataserver_current/httpparam?dataSource=tafs&requestType=retrieve&format=csv&hoursBeforeNow=3&timeType=issue&mostRecent=true&stationString=" + icao.upper()
|
||||
response = urllib.request.urlopen(taf_url)
|
||||
lines = response.readlines()
|
||||
if len(lines) > 6:
|
||||
taf = lines[6].decode("utf-8").split(',')[0]
|
||||
await bot.send_text(room, taf.strip())
|
||||
else:
|
||||
await bot.send_text(room, 'Cannot find taf for ' + icao)
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !taf <icao code>')
|
||||
|
||||
def help(self):
|
||||
return ('Taf data access (usage: !taf <icao code>)')
|
|
@ -1,247 +0,0 @@
|
|||
import time
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
import urllib.error
|
||||
|
||||
import aiohttp.web
|
||||
import requests
|
||||
import os
|
||||
import json
|
||||
import asyncio
|
||||
from aiohttp import web
|
||||
from future.moves.urllib.parse import urlencode
|
||||
from nio import MatrixRoom
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
import nest_asyncio
|
||||
nest_asyncio.apply()
|
||||
|
||||
rooms = dict()
|
||||
global_bot = None
|
||||
|
||||
send_entry_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def send_entry(blob, content_type, fmt_params, rooms):
|
||||
async with send_entry_lock:
|
||||
for room_id in rooms:
|
||||
room = MatrixRoom(room_id=room_id, own_user_id=os.getenv("BOT_OWNERS"),
|
||||
encrypted=rooms[room_id])
|
||||
if blob and content_type:
|
||||
await global_bot.upload_and_send_image(room, blob, text="", blob=True, blob_content_type=content_type)
|
||||
|
||||
await global_bot.send_html(room, msg_template_html.format(**fmt_params),
|
||||
msg_template_plain.format(**fmt_params))
|
||||
|
||||
|
||||
def get_image(img=None, width=1000, height=1500):
|
||||
"""
|
||||
Return image data as array.
|
||||
Array contains the image content type and image binary
|
||||
|
||||
Parameters required: img { Plex image location }
|
||||
Optional parameters: width { the image width }
|
||||
height { the image height }
|
||||
Output: array
|
||||
"""
|
||||
|
||||
pms_url = os.getenv("PLEX_MEDIA_SERVER_URL")
|
||||
pms_token = os.getenv("PLEX_MEDIA_SERVER_TOKEN")
|
||||
if not pms_url or not pms_token:
|
||||
return None
|
||||
|
||||
width = width or 1000
|
||||
height = height or 1500
|
||||
|
||||
if img:
|
||||
params = {'url': 'http://127.0.0.1:32400%s' % (img), 'width': width, 'height': height, 'format': "png"}
|
||||
|
||||
uri = pms_url + '/photo/:/transcode?%s' % urlencode(params)
|
||||
|
||||
headers = {'X-Plex-Token': pms_token}
|
||||
|
||||
session = requests.Session()
|
||||
try:
|
||||
r = session.request("GET", uri, headers=headers)
|
||||
r.raise_for_status()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
response_status = r.status_code
|
||||
response_content = r.content
|
||||
response_headers = r.headers
|
||||
if response_status in (200, 201):
|
||||
return response_content, response_headers['Content-Type']
|
||||
|
||||
|
||||
def get_from_entry(entry):
|
||||
blob = None
|
||||
content_type = ""
|
||||
if "art" in entry:
|
||||
pms_image = get_image(entry["art"], 600, 300)
|
||||
if pms_image:
|
||||
(blob, content_type) = pms_image
|
||||
|
||||
fmt_params = {
|
||||
"title": entry["title"],
|
||||
"year": entry["year"],
|
||||
"audience_rating": entry["audience_rating"],
|
||||
"directors": ", ".join(entry["directors"]),
|
||||
"actors": ", ".join(entry["actors"]),
|
||||
"summary": entry["summary"],
|
||||
"tagline": entry["tagline"],
|
||||
"genres": ", ".join(entry["genres"])
|
||||
}
|
||||
|
||||
return (blob, content_type, fmt_params)
|
||||
|
||||
|
||||
msg_template_html = """
|
||||
<b>{title} -({year})- Rating: {audience_rating}</b><br>
|
||||
Director(s): {directors}<br>
|
||||
Actors: {actors}<br>
|
||||
<I>{summary}</I><br>
|
||||
{tagline}<br>
|
||||
Genre(s): {genres}<br><br>"""
|
||||
|
||||
msg_template_plain = """*{title} -({year})- Rating: {audience_rating}*
|
||||
Director(s): {directors}
|
||||
Actors: {actors}
|
||||
{summary}
|
||||
{tagline}
|
||||
Genre(s): {genres}
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class WebServer:
|
||||
def __init__(self, host, port):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.app = web.Application()
|
||||
self.app.router.add_post('/notify', self.notify)
|
||||
|
||||
async def run(self):
|
||||
if not self.host or not self.port:
|
||||
return
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
runner = web.AppRunner(self.app)
|
||||
loop.run_until_complete(runner.setup())
|
||||
site = web.TCPSite(runner, host=self.host, port=self.port)
|
||||
loop.run_until_complete(site.start())
|
||||
|
||||
async def notify(self, request: web.Request) -> web.Response:
|
||||
try:
|
||||
data = await request.json()
|
||||
if "genres" in data:
|
||||
data["genres"] = data["genres"].split(",")
|
||||
|
||||
if "actors" in data:
|
||||
data["actors"] = data["actors"].split(",")
|
||||
|
||||
if "directors" in data:
|
||||
data["directors"] = data["directors"].split(",")
|
||||
|
||||
global rooms
|
||||
(blob, content_type, fmt_params) = get_from_entry(data)
|
||||
await send_entry(blob, content_type, fmt_params, rooms)
|
||||
|
||||
except Exception as exc:
|
||||
message = str(exc)
|
||||
return web.HTTPBadRequest(body=message)
|
||||
|
||||
return web.Response()
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
httpd = None
|
||||
rooms = dict()
|
||||
api_key = None
|
||||
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.httpd = WebServer(os.getenv("TAUTULLI_NOTIFIER_ADDR"), os.getenv("TAUTULLI_NOTIFIER_PORT"))
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
global global_bot
|
||||
global_bot = bot
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(self.httpd.run())
|
||||
|
||||
def matrix_stop(self, bot):
|
||||
super().matrix_stop(bot)
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 3 and args[1] == 'apikey':
|
||||
bot.must_be_owner(event)
|
||||
|
||||
self.api_key = args[2]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, 'Api key set')
|
||||
elif len(args) == 2:
|
||||
media_type = args[1]
|
||||
if media_type != "movie" and media_type != "show" and media_type != "artist":
|
||||
await bot.send_text(room, "media type '%s' provided not valid" % media_type)
|
||||
return
|
||||
|
||||
try:
|
||||
url = "{}/api/v2?apikey={}&cmd=get_recently_added&count=10".format(os.getenv("TAUTULLI_URL"), self.api_key)
|
||||
req = urllib.request.Request(url + "&media_type=" + media_type)
|
||||
connection = urllib.request.urlopen(req).read()
|
||||
entries = json.loads(connection)
|
||||
if "response" not in entries and "data" not in entries["response"] and "recently_added" not in entries["response"]["data"]:
|
||||
await bot.send_text(room, "no recently added for %s" % media_type)
|
||||
return
|
||||
|
||||
for entry in entries["response"]["data"]["recently_added"]:
|
||||
(blob, content_type, fmt_params) = get_from_entry(entry)
|
||||
await send_entry(blob, content_type, fmt_params, {room.room_id: room})
|
||||
|
||||
except urllib.error.HTTPError as err:
|
||||
raise ValueError(err.read())
|
||||
except Exception as exc:
|
||||
message = str(exc)
|
||||
await bot.send_text(room, message)
|
||||
elif len(args) == 4:
|
||||
if args[1] == "add" or args[1] == "remove":
|
||||
room_id = args[2]
|
||||
encrypted = args[3]
|
||||
if args[1] == "add":
|
||||
self.rooms[room_id] = encrypted == "encrypted"
|
||||
await bot.send_text(room, f"Added {room_id} to rooms notification list")
|
||||
else:
|
||||
del self.rooms[room_id]
|
||||
await bot.send_text(room, f"Removed {room_id} to rooms notification list")
|
||||
|
||||
bot.save_settings()
|
||||
global rooms
|
||||
rooms = self.rooms
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !tautulli <movie|show|artist>|<add|remove> %room_id% %encrypted%')
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !tautulli <movie|show|artist>|<add|remove> %room_id% %encrypted%')
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data["api_key"] = self.api_key
|
||||
data["rooms"] = self.rooms
|
||||
global rooms
|
||||
rooms = self.rooms
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("rooms"):
|
||||
self.rooms = data["rooms"]
|
||||
global rooms
|
||||
rooms = self.rooms
|
||||
if data.get("api_key"):
|
||||
self.api_key = data["api_key"]
|
||||
|
||||
def help(self):
|
||||
return ('Tautulli recently added bot')
|
||||
|
|
@ -1,165 +0,0 @@
|
|||
import time
|
||||
from datetime import datetime
|
||||
|
||||
from pyteamup import Calendar
|
||||
|
||||
#
|
||||
# TeamUp calendar notifications
|
||||
#
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.api_key = None
|
||||
self.calendar_rooms = dict() # Roomid -> [calid, calid..]
|
||||
self.calendars = dict() # calid -> Calendar
|
||||
self.enabled = False
|
||||
|
||||
async def matrix_poll(self, bot, pollcount):
|
||||
if self.api_key:
|
||||
if pollcount % (6 * 5) == 0: # Poll every 5 min
|
||||
await self.poll_all_calendars(bot)
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 1:
|
||||
if self.calendar_rooms.get(room.room_id):
|
||||
for calendarid in self.calendar_rooms.get(room.room_id):
|
||||
calendar = self.calendars[calendarid]
|
||||
events = calendar.get_event_collection()
|
||||
for event in events:
|
||||
s = '<b>' + str(event.start_dt.day) + \
|
||||
'.' + str(event.start_dt.month)
|
||||
if not event.all_day:
|
||||
s = s + ' ' + \
|
||||
event.start_dt.strftime(
|
||||
"%H:%M") + ' (' + str(event.duration) + ' min)'
|
||||
s = s + '</b> ' + event.title + \
|
||||
" " + (event.notes or '')
|
||||
await bot.send_html(room, s, s)
|
||||
elif len(args) == 2:
|
||||
if args[1] == 'list':
|
||||
await bot.send_text(room, f'Calendars in this room: {self.calendar_rooms.get(room.room_id) or []}')
|
||||
elif args[1] == 'poll':
|
||||
bot.must_be_owner(event)
|
||||
await self.poll_all_calendars(bot)
|
||||
elif len(args) == 3:
|
||||
if args[1] == 'add':
|
||||
bot.must_be_admin(room, event)
|
||||
|
||||
calid = args[2]
|
||||
self.logger.info(f'Adding calendar {calid} to room id {room.room_id}')
|
||||
|
||||
if self.calendar_rooms.get(room.room_id):
|
||||
if calid not in self.calendar_rooms[room.room_id]:
|
||||
self.calendar_rooms[room.room_id].append(calid)
|
||||
else:
|
||||
await bot.send_text(room, 'This teamup calendar already added in this room!')
|
||||
return
|
||||
else:
|
||||
self.calendar_rooms[room.room_id] = [calid]
|
||||
|
||||
self.logger.info(f'Calendars now for this room {self.calendar_rooms.get(room.room_id)}')
|
||||
|
||||
bot.save_settings()
|
||||
self.setup_calendars()
|
||||
await bot.send_text(room, 'Added new teamup calendar to this room')
|
||||
if args[1] == 'del':
|
||||
bot.must_be_admin(room, event)
|
||||
|
||||
calid = args[2]
|
||||
self.logger.info(f'Removing calendar {calid} from room id {room.room_id}')
|
||||
|
||||
if self.calendar_rooms.get(room.room_id):
|
||||
self.calendar_rooms[room.room_id].remove(calid)
|
||||
|
||||
self.logger.info(f'Calendars now for this room {self.calendar_rooms.get(room.room_id)}')
|
||||
|
||||
bot.save_settings()
|
||||
self.setup_calendars()
|
||||
await bot.send_text(room, 'Removed teamup calendar from this room')
|
||||
if args[1] == 'apikey':
|
||||
bot.must_be_owner(event)
|
||||
|
||||
self.api_key = args[2]
|
||||
bot.save_settings()
|
||||
self.setup_calendars()
|
||||
await bot.send_text(room, 'Api key set')
|
||||
|
||||
def help(self):
|
||||
return ('Polls teamup calendar.')
|
||||
|
||||
async def poll_all_calendars(self, bot):
|
||||
delete_rooms = []
|
||||
for roomid in self.calendar_rooms:
|
||||
if roomid in bot.client.rooms:
|
||||
calendars = self.calendar_rooms[roomid]
|
||||
for calendarid in calendars:
|
||||
events, timestamp = self.poll_server(
|
||||
self.calendars[calendarid])
|
||||
self.calendars[calendarid].timestamp = timestamp
|
||||
for event in events:
|
||||
await bot.send_text(bot.get_room_by_id(roomid), 'Calendar: ' + self.eventToString(event))
|
||||
else:
|
||||
delete_rooms.append(roomid)
|
||||
|
||||
for roomid in delete_rooms:
|
||||
self.calendar_rooms.pop(roomid, None)
|
||||
|
||||
def poll_server(self, calendar):
|
||||
events, timestamp = calendar.get_changed_events(calendar.timestamp)
|
||||
return events, timestamp
|
||||
|
||||
def to_datetime(self, dts):
|
||||
try:
|
||||
return datetime.strptime(dts, '%Y-%m-%dT%H:%M:%S')
|
||||
except ValueError:
|
||||
pos = len(dts) - 3
|
||||
dts = dts[:pos] + dts[pos + 1:]
|
||||
return datetime.strptime(dts, '%Y-%m-%dT%H:%M:%S%z')
|
||||
|
||||
def eventToString(self, event):
|
||||
startdt = self.to_datetime(event['start_dt'])
|
||||
if len(event['title']) == 0:
|
||||
event['title'] = '(empty name)'
|
||||
|
||||
if (event['delete_dt']):
|
||||
s = event['title'] + ' deleted.'
|
||||
else:
|
||||
s = event['title'] + " " + (event['notes'] or '') + \
|
||||
' ' + str(startdt.day) + '.' + str(startdt.month)
|
||||
if not event['all_day']:
|
||||
s = s + ' ' + \
|
||||
startdt.strftime("%H:%M") + \
|
||||
' (' + str(event['duration']) + ' min)'
|
||||
# todo: proper html stripper..
|
||||
s = s.replace('<p>', '')
|
||||
s = s.replace('</p>', '\n')
|
||||
return s
|
||||
|
||||
def setup_calendars(self):
|
||||
self.calendars = dict()
|
||||
if self.api_key:
|
||||
for roomid in self.calendar_rooms:
|
||||
calendars = self.calendar_rooms[roomid]
|
||||
for calid in calendars:
|
||||
self.calendars[calid] = Calendar(calid, self.api_key)
|
||||
self.calendars[calid].timestamp = int(time.time())
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['apikey'] = self.api_key
|
||||
data['calendar_rooms'] = self.calendar_rooms
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get('calendar_rooms'):
|
||||
self.calendar_rooms = data['calendar_rooms']
|
||||
if data.get('apikey'):
|
||||
self.api_key = data['apikey']
|
||||
if self.api_key and len(self.api_key) == 0:
|
||||
self.api_key = None
|
||||
self.setup_calendars()
|
134
modules/wa.py
134
modules/wa.py
|
@ -1,134 +0,0 @@
|
|||
import urllib.request
|
||||
import wolframalpha
|
||||
from html import escape
|
||||
import json
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
app_id = ''
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
self.add_module_aliases(bot, ['wafull'])
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 3:
|
||||
if args[1] == "appid":
|
||||
bot.must_be_owner(event)
|
||||
self.app_id = args[2]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, 'App id set')
|
||||
return
|
||||
|
||||
if len(args) > 1:
|
||||
if self.app_id == '':
|
||||
await bot.send_text(room, 'Please get and set a appid: https://products.wolframalpha.com/simple-api/documentation/')
|
||||
return
|
||||
|
||||
query = event.body[len(args[0])+1:]
|
||||
client = wolframalpha.Client(self.app_id)
|
||||
res = client.query(query)
|
||||
result = "?SYNTAX ERROR"
|
||||
if res['@success']:
|
||||
self.logger.debug(f"room: {room.name} sender: {event.sender} sent a valid query to wa")
|
||||
else:
|
||||
self.logger.info(f"wa error: {res['@error']}")
|
||||
short, full = self.parse_api_response(res)
|
||||
if full[0] and 'full' in args[0]:
|
||||
html, plain = full
|
||||
elif short[0]:
|
||||
html, plain = short
|
||||
else:
|
||||
plain = 'Could not find response for ' + query
|
||||
html = plain
|
||||
await bot.send_html(room, html, plain)
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !wa <query>')
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['app_id'] = self.app_id
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("app_id"):
|
||||
self.app_id = data["app_id"]
|
||||
|
||||
def parse_api_response(self, res):
|
||||
"""Parses the pods from wa and prepares texts to send to matrix
|
||||
|
||||
:param res: the result from wolframalpha.Client
|
||||
:type res: dict
|
||||
:return: a tuple of tuples: ((primary_html, primary_plaintext), (full_html, full_plaintext))
|
||||
:rtype: tuple
|
||||
"""
|
||||
htmls = []
|
||||
texts = []
|
||||
primary = None
|
||||
fallback = None
|
||||
|
||||
pods = res.get('pod')
|
||||
if not pods:
|
||||
return (('<em>(data not available)</em>', '(data not available)'), ) * 2
|
||||
|
||||
# workaround for bug(?) in upstream wa package
|
||||
if hasattr(pods, 'get'):
|
||||
pods = [pods]
|
||||
for pod in res['pod']:
|
||||
pod_htmls = []
|
||||
pod_texts = []
|
||||
spods = pod.get('subpod')
|
||||
if not spods:
|
||||
continue
|
||||
|
||||
# workaround for bug(?) in upstream wa package
|
||||
if hasattr(spods, 'get'):
|
||||
spods = [spods]
|
||||
for spod in spods:
|
||||
title = spod.get('@title')
|
||||
text = spod.get('plaintext')
|
||||
if not text:
|
||||
continue
|
||||
|
||||
if title:
|
||||
html = f'<strong>{escape(title)}</strong>: {escape(text)}'
|
||||
text = f'{title}: {text}'
|
||||
else:
|
||||
html = escape(text)
|
||||
pod_htmls += html.split('\n')
|
||||
pod_texts += text.split('\n')
|
||||
|
||||
if pod_texts:
|
||||
title = pod.get('@title')
|
||||
pod_html = '\n'.join([f'<p><strong>{escape(title)}</strong>\n<ul>']
|
||||
+ [f'<li>{s}</li>' for s in pod_htmls]
|
||||
+ ['</ul></p>'])
|
||||
pod_text = '\n'.join([title]
|
||||
+ [f'- {s}' for s in pod_texts])
|
||||
htmls.append(pod_html)
|
||||
texts.append(pod_text)
|
||||
if not primary and self.is_primary(pod):
|
||||
primary = (f'<strong>{escape(title)}</strong>: ' + ' | '.join(pod_htmls),
|
||||
f'{title}: ' + ' | '.join(pod_texts))
|
||||
else:
|
||||
fallback = fallback or (' | '.join(pod_htmls), ' | '.join(pod_texts))
|
||||
|
||||
return (primary or fallback, ('\n'.join(htmls), '\n'.join(texts)))
|
||||
|
||||
def is_primary(self, pod):
|
||||
return pod.get('@primary') or 'Definition' in pod.get('@title') or 'Result' in pod.get('@title')
|
||||
|
||||
def help(self):
|
||||
return ('Wolfram Alpha search')
|
||||
|
||||
def long_help(self, bot=None, event=None, **kwargs):
|
||||
text = self.help() + (
|
||||
'\n- "!wa [query]": Query WolframAlpha and return the primary pod'
|
||||
'\n- "!wafull [query]": Query WolframAlpha and return all pods'
|
||||
)
|
||||
if bot and event and bot.is_owner(event):
|
||||
text += '\n- "!wa appid [appid]": Set appid'
|
||||
return text
|
Loading…
Reference in New Issue