Compare commits
12 Commits
master
...
kauppalist
Author | SHA1 | Date |
---|---|---|
Jarno Rankinen | 7300b69273 | |
Jarno Rankinen | 2f5225d8ac | |
Jarno Rankinen | 5a6c560086 | |
Jarno Rankinen | 9aedbef3a8 | |
Jarno Rankinen | 3783313af0 | |
Jarno Rankinen | bdf46124bd | |
Jarno Rankinen | a6715d8dcf | |
Jarno Rankinen | 14428d7382 | |
Jarno Rankinen | 0953457b96 | |
Jarno Rankinen | e9a2f9dca0 | |
Jarno Rankinen | d844e8ccb5 | |
Jarno Rankinen | b2215b5426 |
|
@ -37,7 +37,7 @@ jobs:
|
|||
# https://github.com/docker/login-action
|
||||
- name: Log into registry ${{ env.REGISTRY }}
|
||||
if: github.event_name != 'pull_request'
|
||||
uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a
|
||||
uses: docker/login-action@49ed152c8eca782a232dede0303416e8f356c37b
|
||||
with:
|
||||
registry: ${{ env.REGISTRY }}
|
||||
username: ${{ github.actor }}
|
||||
|
@ -47,7 +47,7 @@ jobs:
|
|||
# https://github.com/docker/metadata-action
|
||||
- name: Extract Docker metadata
|
||||
id: meta
|
||||
uses: docker/metadata-action@507c2f2dc502c992ad446e3d7a5dfbe311567a96
|
||||
uses: docker/metadata-action@69f6fc9d46f2f8bf0d5491e4aabe0bb8c6a4678a
|
||||
with:
|
||||
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
|
||||
flavor: latest=true
|
||||
|
@ -55,7 +55,7 @@ jobs:
|
|||
# Build and push Docker image with Buildx (don't push on PR)
|
||||
# https://github.com/docker/build-push-action
|
||||
- name: Build and push Docker image
|
||||
uses: docker/build-push-action@3b5e8027fcad23fda98b2e3ac259d8d67585f671
|
||||
uses: docker/build-push-action@c84f38281176d4c9cdb1626ffafcd6b3911b5d94
|
||||
with:
|
||||
context: .
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
|
|
|
@ -12,8 +12,11 @@ RUN pip install -r requirements.txt
|
|||
|
||||
COPY bot.py *.json *.pickle /bot/
|
||||
COPY config config
|
||||
COPY modules modules
|
||||
|
||||
VOLUME /bot/config
|
||||
RUN useradd -m HomeBot && chown HomeBot -R /bot && apt install curl jq -y
|
||||
USER HomeBot
|
||||
WORKDIR /bot
|
||||
COPY modules modules
|
||||
|
||||
CMD [ "python", "-u", "./bot.py" ]
|
||||
|
|
1
Pipfile
1
Pipfile
|
@ -24,7 +24,6 @@ pillow = "*"
|
|||
giphypop = "*"
|
||||
tzlocal = "*"
|
||||
nest_asyncio = "*"
|
||||
d20 = "*"
|
||||
|
||||
[dev-packages]
|
||||
pylint = "*"
|
||||
|
|
99
README.md
99
README.md
|
@ -279,14 +279,14 @@ mxma requires all commands to be run as bot owner.
|
|||
#### SpaceAPI
|
||||
|
||||
Polls the status of Hack- and Makerspaces that provide an endpoint
|
||||
that conforms to the [SpaceAPI](https://spaceapi.io/) protocol and notifies
|
||||
that conforms to the [SpaceAPI](https://spaceapi.io/) protocol and notifies
|
||||
about changes of the opening status.
|
||||
|
||||
To add a new endpoint simply use
|
||||
To add a new endpoint simply use
|
||||
`!spaceapi add https://hackspace.example.org/status`
|
||||
|
||||
For Admins: A template and I18N can be configured via settings of
|
||||
the module. Use `!bot export spacepi`, then change the
|
||||
For Admins: A template and I18N can be configured via settings of
|
||||
the module. Use `!bot export spacepi`, then change the
|
||||
settings and import again with `!bot import spacepi SETTINGS`.
|
||||
|
||||
### Url
|
||||
|
@ -371,40 +371,6 @@ The module uses a demo API Key which can be replaced by your own api key by sett
|
|||
|
||||
You can create one at https://api.nasa.gov/#signUp
|
||||
|
||||
### XKCD
|
||||
|
||||
Fetch comic strips from [XKCD](https://xkcd.com) (A webcomic of romance,
|
||||
sarcasm, math, and language) and post them to the room.
|
||||
|
||||
Command:
|
||||
|
||||
* !xkcd - Sends latest XKCD comic strip to the room
|
||||
* !xkcd ID - Sends xkcd number ID to the room
|
||||
* !xkcd help - show command help
|
||||
|
||||
|
||||
### Inspirobot
|
||||
|
||||
Query a randomly generated inspirational poster from https://inspirobot.me/ upload and send to the room.
|
||||
|
||||
Command:
|
||||
|
||||
* !inspire - Generate inspiration and post to the room
|
||||
* !inspire help - Show this help
|
||||
|
||||
### User Status
|
||||
|
||||
This is a substitute for matrix' (element's?) missing user status feature.
|
||||
Save a custom (status) message for users and allows to query them.
|
||||
|
||||
Commands:
|
||||
|
||||
* !status clear - clear my status
|
||||
* !status show [user] - show the status of the given user. If no user is given, show all status messages
|
||||
* !status help - show this text
|
||||
* !status [status] - set your status (your status must not begin with another command word.)
|
||||
|
||||
|
||||
### Wolfram Alpha
|
||||
|
||||
Make queries to Wolfram Alpha
|
||||
|
@ -552,8 +518,8 @@ Read the documentation to create one at https://developers.giphy.com/docs/api
|
|||
Commands:
|
||||
|
||||
* !giphy apikey [apikey] - Set api key (Must be done as bot owner)
|
||||
* !giphy [query] - Post the first image result from giphy for the given [query]
|
||||
|
||||
* !giphy [query] - Post the first image result from giphy for the given [query]
|
||||
|
||||
|
||||
Example:
|
||||
|
||||
|
@ -567,7 +533,7 @@ Can be used to post a picture from Gfycat given a query string.
|
|||
|
||||
Commands:
|
||||
|
||||
* !gfycat [query] - Post the first image result from gfycat for the given [query]
|
||||
* !gfycat [query] - Post the first image result from gfycat for the given [query]
|
||||
|
||||
Example:
|
||||
|
||||
|
@ -606,7 +572,7 @@ Environ variables seen by command:
|
|||
Docker environment:
|
||||
|
||||
Since the module needs access to the source of the running Tautulli instance volumes on both Docker (hemppa and Tautulli) should be defined and being visible each other.
|
||||
When running on Docker the env variables seen by command should be defined for the bot instance.
|
||||
When running on Docker the env variables seen by command should be defined for the bot instance.
|
||||
|
||||
Example:
|
||||
|
||||
|
@ -624,8 +590,8 @@ environment but can be extended to any purpose.
|
|||
|
||||
* Create labels to github that represent for example different machines and spaces.
|
||||
You can create any number of them.
|
||||
* Define label colors for each type of asset. These are called domains in this module.
|
||||
For example set all machine labels to be #B60205 and space labels to be #0E8A16. These
|
||||
* Define label colors for each type of asset. These are called domains in this module.
|
||||
For example set all machine labels to be #B60205 and space labels to be #0E8A16. These
|
||||
can be easily picked from color chooser.
|
||||
* Edit the repository description and add a json block describing the
|
||||
label domains and their colors (array format supports multiple colors per domain). For example:
|
||||
|
@ -708,35 +674,6 @@ including version, how many users are connected, and ping time.
|
|||
* !mumble - Show info about the configured mumble server
|
||||
- !mumble (set|setserver) [host] ([port]) - Set the configured mumble server
|
||||
|
||||
### Nitter
|
||||
|
||||
Reads Twitter links from room, replaces domain with nitter, removes query parameters and posts link to room.
|
||||
|
||||
#### Usage
|
||||
|
||||
* !nitter enable - enable converting twitter links to nitter links in this room (must be done as room admin)
|
||||
* !nitter disable - disable converting twitter links to nitter links in this room (must be done as room admin)
|
||||
|
||||
### Wikipedia
|
||||
|
||||
Searches Wikipedia for a given query and returns the first result summary and link.
|
||||
|
||||
#### Usage
|
||||
|
||||
* !wikipedia [query] - Search Wikipedia for query
|
||||
|
||||
### Dice Roll
|
||||
|
||||
Rolls dice in XdY format.
|
||||
|
||||
* !roll 1d20 - roll a single d20
|
||||
* !roll 1d20+4 - a skill check or attack roll
|
||||
* !roll 1d20+1 adv - a skill check or attack roll with advantage
|
||||
* !roll 1d20-1 dis - a skill check or attack roll with disadvantage
|
||||
* !roll help - show help info
|
||||
|
||||
For more syntax help, see <https://d20.readthedocs.io/en/latest/start.html#dice-syntax>.
|
||||
|
||||
## Bot setup
|
||||
|
||||
* Create a Matrix user
|
||||
|
@ -804,7 +741,7 @@ docker-compose up
|
|||
## Env variables
|
||||
|
||||
`MATRIX_USER` is the full MXID (not just username) of the Matrix user. `MATRIX_ACCESS_TOKEN`
|
||||
and `MATRIX_SERVER` should be url to the user's server (non-delegated server). Set `JOIN_ON_INVITE` (default true)
|
||||
and `MATRIX_SERVER` should be url to the user's server (non-delegated server). Set `JOIN_ON_INVITE` (default true)
|
||||
to false if you don't want the bot automatically joining rooms.
|
||||
|
||||
You can get access token by logging in with Element Android and looking from Settings / Help & About.
|
||||
|
@ -815,8 +752,6 @@ Typically set your own id into it.
|
|||
|
||||
`OWNERS_ONLY` is an optional variable once defined only the owners can operate the bot (this is a form of whitelisting)
|
||||
|
||||
`INVITE_WHITELIST` (default empty) is an optional comma-separated list of matrix id's to restrict the acceptance of invites into rooms of the bot to users or servers. It supports wild cards: Example value: `@user:matrix.org,@*:myserver.org`
|
||||
|
||||
`LEAVE_EMPTY_ROOMS` (default true) if this is set to false, the bot will stay in empty rooms
|
||||
|
||||
__*ATTENTION:*__ Don't include bot itself in `BOT_OWNERS` if cron or any other module that can cause bot to send custom commands is used, as it could potentially be used to run owner commands as the bot itself.
|
||||
|
@ -894,7 +829,7 @@ class Bot:
|
|||
:param bot_ignore: Flag to mark the message to be ignored by the bot
|
||||
:return:
|
||||
"""
|
||||
|
||||
|
||||
async def send_image(self, room, url, body, event=None, mimetype=None, width=None, height=None, size=None):
|
||||
"""
|
||||
|
||||
|
@ -908,17 +843,17 @@ class Bot:
|
|||
:param size: Size in bytes of the image
|
||||
:return:
|
||||
"""
|
||||
|
||||
|
||||
async def upload_image(self, url, blob=False, blob_content_type="image/png"):
|
||||
"""
|
||||
|
||||
:param url: Url of binary content of the image to upload
|
||||
:param blob: Flag to indicate if the first param is an url or a binary content
|
||||
:param blob: Flag to indicate if the first param is an url or a binary content
|
||||
:param blob_content_type: Content type of the image in case of binary content
|
||||
:return: A MXC-Uri https://matrix.org/docs/spec/client_server/r0.6.0#mxc-uri, Content type, Width, Height, Image size in bytes
|
||||
"""
|
||||
|
||||
|
||||
|
||||
|
||||
async def upload_and_send_image(self, room, url, event=None, text=None, blob=False, blob_content_type="image/png"):
|
||||
"""
|
||||
|
||||
|
@ -926,7 +861,7 @@ class Bot:
|
|||
:param url: Url of binary content of the image to upload
|
||||
:param event: The event to reply to
|
||||
:param text: A textual representation of the image
|
||||
:param blob: Flag to indicate if the second param is an url or a binary content
|
||||
:param blob: Flag to indicate if the second param is an url or a binary content
|
||||
:param blob_content_type: Content type of the image in case of binary content
|
||||
:return:
|
||||
"""
|
||||
|
|
19
bot.py
19
bot.py
|
@ -35,7 +35,6 @@ class Bot:
|
|||
self.version = '1.5'
|
||||
self.client = None
|
||||
self.join_on_invite = False
|
||||
self.invite_whitelist = []
|
||||
self.modules = dict()
|
||||
self.module_aliases = dict()
|
||||
self.leave_empty_rooms = True
|
||||
|
@ -459,25 +458,11 @@ class Bot:
|
|||
def starts_with_command(body):
|
||||
"""Checks if body starts with ! and has one or more letters after it"""
|
||||
return re.match(r"^!\w.*", body) is not None
|
||||
|
||||
def on_invite_whitelist(self, sender):
|
||||
for entry in self.invite_whitelist:
|
||||
if entry == sender:
|
||||
return True
|
||||
controll_value = entry.split(':')
|
||||
if controll_value[0] == '@*' and controll_value[1] == sender.split(':')[1]:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
async def invite_cb(self, room, event):
|
||||
room: MatrixRoom
|
||||
event: InviteEvent
|
||||
|
||||
if len(self.invite_whitelist) > 0 and not self.on_invite_whitelist(event.sender):
|
||||
self.logger.error(f'Cannot join room {room.display_name}, as {event.sender} is not whitelisted for invites!')
|
||||
return
|
||||
|
||||
if self.join_on_invite or self.is_owner(event):
|
||||
for attempt in range(3):
|
||||
self.jointime = datetime.datetime.now()
|
||||
|
@ -573,7 +558,6 @@ class Bot:
|
|||
bot_owners = os.getenv('BOT_OWNERS')
|
||||
access_token = os.getenv('MATRIX_ACCESS_TOKEN')
|
||||
join_on_invite = os.getenv('JOIN_ON_INVITE')
|
||||
invite_whitelist = os.getenv('INVITE_WHITELIST')
|
||||
owners_only = os.getenv('OWNERS_ONLY') is not None
|
||||
leave_empty_rooms = os.getenv('LEAVE_EMPTY_ROOMS')
|
||||
|
||||
|
@ -581,7 +565,6 @@ class Bot:
|
|||
self.client = AsyncClient(matrix_server, self.matrix_user, ssl = matrix_server.startswith("https://"))
|
||||
self.client.access_token = access_token
|
||||
self.join_on_invite = (join_on_invite or '').lower() == 'true'
|
||||
self.invite_whitelist = invite_whitelist.split(',') if invite_whitelist is not None else []
|
||||
self.leave_empty_rooms = (leave_empty_rooms or 'true').lower() == 'true'
|
||||
self.owners = bot_owners.split(',')
|
||||
self.owners_only = owners_only
|
||||
|
@ -633,8 +616,6 @@ class Bot:
|
|||
|
||||
if self.join_on_invite:
|
||||
self.logger.info('Note: Bot will join rooms if invited')
|
||||
if len(self.invite_whitelist) > 0:
|
||||
self.logger.info(f'Note: Bot will only join rooms when the inviting user is contained in {self.invite_whitelist}')
|
||||
self.logger.info('Bot running as %s, owners %s', self.client.user, self.owners)
|
||||
self.bot_task = asyncio.create_task(self.client.sync_forever(timeout=30000))
|
||||
await self.bot_task
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
import re
|
||||
from modules.common.module import BotModule
|
||||
import requests
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self,name):
|
||||
super().__init__(name)
|
||||
self.motionurl = 'http://localhost:8080'
|
||||
self.cameras = []
|
||||
self.allowed_cmds = {
|
||||
'config': ['list','set','get','write'],
|
||||
'detection': ['status','connection','start','pause'],
|
||||
'action': ['eventstart','eventend','snapshot','restart','quit','end']
|
||||
}
|
||||
self.restricted_cmds = ['list','set','get','write','start','pause','restart','quit','end']
|
||||
self.helptext = """Control the motion daemon.
|
||||
Available commands:
|
||||
- config list|set|get|write
|
||||
- detection status|connection|start|pause
|
||||
- action eventstart|eventend|snapshot|restart|quit|end
|
||||
- url get|set <motionurl>
|
||||
|
||||
Usage: '!cam <id> category command'
|
||||
|
||||
<id> is the numerical id of the camera. Use 0 for all cameras.
|
||||
If <id> is omitted, 0 is assumed."""
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['motionurl'] = self.motionurl
|
||||
data['cameras'] = self.cameras
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get('motionurl'):
|
||||
self.motionurl = data['motionurl']
|
||||
if data.get('cameras'):
|
||||
self.cameras = data['cameras']
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
if args[0] == 'help':
|
||||
await bot.send_text(room, self.helptext, event)
|
||||
return
|
||||
|
||||
elif args[0] == 'url':
|
||||
if args[1] == 'set':
|
||||
newurl = args[2]
|
||||
bot.must_be_owner(event)
|
||||
self.motionurl = newurl
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f"Motion API URL set to {self.motionurl}", event)
|
||||
elif args[1] == 'get':
|
||||
await bot.send_text(room, f"Motion URL is currently {self.motionurl}", event)
|
||||
|
||||
elif args[0] == 'cameras':
|
||||
if args[1] == 'set':
|
||||
bot.must_be_owner(event)
|
||||
self.cameras = args[2:]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, "Updated camera id list", event)
|
||||
elif args[1] == 'get':
|
||||
camstr = ''
|
||||
if len(self.cameras) == 0:
|
||||
await bot.send_text(room, "No camera ids configured", event)
|
||||
else:
|
||||
for n, cam in enumerate(self.cameras):
|
||||
camstr = camstr + cam
|
||||
if n < len(self.cameras) - 1:
|
||||
camstr = camstr + ","
|
||||
await bot.send_text(room, f"Following camera ids are configured:\n{camstr}", event)
|
||||
|
||||
else:
|
||||
cmdindex = 1
|
||||
try:
|
||||
# Check if first argument is numeric (camera id)
|
||||
camid = int(args[0])
|
||||
camid = str(camid)
|
||||
except ValueError:
|
||||
cmdindex = 0
|
||||
camid = '0'
|
||||
category = args[cmdindex]
|
||||
## Quick commands start
|
||||
if category == 'now':
|
||||
if camid != '0':
|
||||
await self.get_snapshot(camid, bot, room, event)
|
||||
elif camid == '0' and len(self.cameras) > 0:
|
||||
for cam in self.cameras:
|
||||
await self.get_snapshot(cam, bot, room, event)
|
||||
else:
|
||||
self.logger.info("User requested snapshots with id 0, but no camera id list configured")
|
||||
await bot.send_text(room, "No camera ids configured", event)
|
||||
return
|
||||
## Quick commands end
|
||||
if category not in self.allowed_cmds:
|
||||
await bot.send_text(room, f'Unknown category: "{args[1]}"', event)
|
||||
return
|
||||
cmdindex = cmdindex + 1
|
||||
if args[cmdindex] not in self.allowed_cmds[category]:
|
||||
await bot.send_text(room, f'Unknown command: "{args[cmdindex]}"', event)
|
||||
return
|
||||
command = args[cmdindex]
|
||||
req_url = f'{self.motionurl}/{camid}/{category}/{command}'
|
||||
if command in self.restricted_cmds:
|
||||
bot.must_be_owner(event)
|
||||
if category == 'config' and command == 'get':
|
||||
queryparam = args[cmdindex + 1]
|
||||
req_url = f'{req_url}?query={queryparam}'
|
||||
elif category == 'config' and command == 'set':
|
||||
param = args[cmdindex + 1]
|
||||
value = args[cmdindex + 2]
|
||||
req_url = f'{req_url}?{param}={value}'
|
||||
if camid != 0 and command == 'snapshot':
|
||||
await self.get_snapshot(camid, bot, room, event)
|
||||
resp = requests.get(req_url).text
|
||||
await bot.send_text(room, resp, event)
|
||||
|
||||
async def get_snapshot(self, camid, bot, room, event):
|
||||
imgurl = f"{self.motionurl.replace(':8080',':8081')}/{camid}/current"
|
||||
self.logger.info(f"Fetching image from {imgurl}")
|
||||
await bot.upload_and_send_image(room, imgurl, event, no_cache=True)
|
||||
|
||||
def help(self):
|
||||
return self.helptext.splitlines()[0]
|
270
modules/flog.py
270
modules/flog.py
|
@ -1,270 +0,0 @@
|
|||
from logging import log
|
||||
import sys
|
||||
import traceback
|
||||
import json
|
||||
import time
|
||||
import datetime
|
||||
import requests
|
||||
import urllib3
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from random import randrange
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
urllib3.disable_warnings()
|
||||
|
||||
# API docs at: https://gitlab.com/lemoidului/ogn-flightbook/-/blob/master/doc/API.md
|
||||
class FlightBook:
|
||||
def __init__(self):
|
||||
self.base_url = 'https://flightbook.glidernet.org/api'
|
||||
self.AC_TYPES = [ '?', 'Glider', 'Towplane', \
|
||||
'Helicopter', 'Parachute', 'Drop plane', 'Hang glider', \
|
||||
'Paraglider', 'Powered', 'Jet', 'UFO', 'Balloon', \
|
||||
'Airship', 'UAV', '?', 'Static object' ]
|
||||
self.logged_flights = dict() # station -> [index of flight]
|
||||
self.device_cache = dict() # Registration -> [address, CN]
|
||||
|
||||
def get_flights(self, icao):
|
||||
log_url = f'{self.base_url}/logbook/{icao}'
|
||||
data = None
|
||||
with requests.Session() as session:
|
||||
response = session.get(log_url, headers={'Connection': 'close'}, verify=False)
|
||||
data = response.json()
|
||||
|
||||
# print(json.dumps(data, sort_keys=True, indent=4))
|
||||
self.update_device_cache(data)
|
||||
return data
|
||||
|
||||
def update_device_cache(self, data):
|
||||
devices = data['devices']
|
||||
for device in devices:
|
||||
if device["address"] and device["registration"]:
|
||||
cache_entry = [device["address"], device["competition"]]
|
||||
self.device_cache[device["registration"]] = cache_entry
|
||||
|
||||
def address_for_registration(self, registration):
|
||||
for reg in self.device_cache.keys():
|
||||
if reg.lower() == registration.lower():
|
||||
return self.device_cache[reg][0]
|
||||
return None
|
||||
|
||||
def address_for_cn(self, cn):
|
||||
for reg in self.device_cache.keys():
|
||||
if self.device_cache[reg][1] == cn.upper():
|
||||
return self.device_cache[reg][0]
|
||||
return None
|
||||
|
||||
def format_time(self, time):
|
||||
if not time:
|
||||
return '··:··'
|
||||
time = time.replace('h', ':')
|
||||
return time
|
||||
|
||||
def flight2string(self, flight, data):
|
||||
devices = data['devices']
|
||||
device = devices[flight['device']]
|
||||
start = self.format_time(flight["start"])
|
||||
end = self.format_time(flight["stop"])
|
||||
duration = ' '
|
||||
if flight["duration"]:
|
||||
duration = time.strftime('%H:%M', time.gmtime(flight["duration"]))
|
||||
maxalt = ''
|
||||
if flight["max_alt"]:
|
||||
maxalt = str(flight["max_alt"]) + 'm'
|
||||
|
||||
identity = f'{device.get("registration") or ""} {device.get("aircraft") or ""} {device.get("competition") or ""} {maxalt}'
|
||||
identity = ' '.join(identity.split())
|
||||
return f'{start} - {end} {duration} {identity}'
|
||||
|
||||
def print_flights(self, data, showtow=False):
|
||||
print(f'✈ Flights at {data["airfield"]["name"]} ({data["airfield"]["code"]}) {data["date"]}:')
|
||||
flights = data['flights']
|
||||
for flight in flights:
|
||||
if not showtow and flight["towing"]:
|
||||
continue
|
||||
print(self.flight2string(flight, data))
|
||||
|
||||
def test():
|
||||
fb = FlightBook()
|
||||
data = fb.get_flights('LFMX')
|
||||
fb.print_flights(data)
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.service_name = 'FLOG'
|
||||
self.station_rooms = dict() # Roomid -> ogn station
|
||||
self.live_rooms = [] # Roomid's with live enabled
|
||||
self.logged_flights = dict() # Station -> number of flights
|
||||
self.first_poll = True
|
||||
self.enabled = False
|
||||
self.fb = FlightBook()
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
self.add_module_aliases(bot, ['sar'])
|
||||
|
||||
async def matrix_poll(self, bot, pollcount):
|
||||
if pollcount % (6 * 5) == 0: # Poll every 5 min
|
||||
await self.poll_implementation(bot)
|
||||
|
||||
async def poll_implementation(self, bot):
|
||||
for roomid in self.live_rooms:
|
||||
station = self.station_rooms[roomid]
|
||||
data = self.fb.get_flights(station)
|
||||
if not data:
|
||||
self.logger.warning(f"FLOG: Failed to get flights at {station}!")
|
||||
return
|
||||
flights = data['flights']
|
||||
|
||||
if len(flights) == 0 or (not station in self.logged_flights):
|
||||
self.logged_flights[station] = []
|
||||
#print('Reset flight count for station ' + station)
|
||||
# else:
|
||||
# print(f'Got {len(flights)} flights at {station}')
|
||||
|
||||
flightindex = 0
|
||||
for flight in flights:
|
||||
if flight["towing"]:
|
||||
continue
|
||||
if flight["stop"]:
|
||||
if not flightindex in self.logged_flights[station]:
|
||||
if not self.first_poll:
|
||||
await bot.send_text(bot.get_room_by_id(roomid), self.fb.flight2string(flight, data))
|
||||
self.logged_flights[station].append(flightindex)
|
||||
# print(f'Logged flights at {station} now {self.logged_flights[station]}')
|
||||
flightindex = flightindex + 1
|
||||
self.first_poll = False
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 1 and args[0] == "!flog":
|
||||
if room.room_id in self.station_rooms:
|
||||
station = self.station_rooms[room.room_id]
|
||||
await self.show_flog(bot, room, station)
|
||||
else:
|
||||
await bot.send_text(room, 'No OGN station set for this room - set it first.')
|
||||
|
||||
elif len(args) == 2 and args[0] == "!flog":
|
||||
if args[1] == 'rmstation':
|
||||
bot.must_be_admin(room, event)
|
||||
del self.station_rooms[room.room_id]
|
||||
self.live_rooms.remove(room.room_id)
|
||||
await bot.send_text(room, f'Cleared OGN station for this room')
|
||||
|
||||
elif args[1] == 'status':
|
||||
print(self.logged_flights)
|
||||
print(self.fb.device_cache)
|
||||
bot.must_be_admin(room, event)
|
||||
await bot.send_text(room, f'OGN station for this room: {self.station_rooms.get(room.room_id)}, live updates enabled: {room.room_id in self.live_rooms}')
|
||||
|
||||
elif args[1] == 'poll':
|
||||
bot.must_be_admin(room, event)
|
||||
await self.poll_implementation(bot)
|
||||
|
||||
elif args[1] == 'live':
|
||||
bot.must_be_admin(room, event)
|
||||
self.live_rooms.append(room.room_id)
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Sending live updates for station {self.station_rooms.get(room.room_id)} to this room')
|
||||
|
||||
elif args[1] == 'rmlive':
|
||||
bot.must_be_admin(room, event)
|
||||
self.live_rooms.remove(room.room_id)
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Not sending live updates for station {self.station_rooms.get(room.room_id)} to this room anymore')
|
||||
|
||||
else:
|
||||
# Assume parameter is a station name
|
||||
station = args[1]
|
||||
await self.show_flog(bot, room, station)
|
||||
elif len(args) == 2 and args[0] == "!sar":
|
||||
registration = args[1]
|
||||
address = self.fb.address_for_registration(registration)
|
||||
if not address:
|
||||
cn = args[1]
|
||||
address = self.fb.address_for_cn(cn)
|
||||
|
||||
coords = None
|
||||
if address:
|
||||
coords = self.get_coords_for_address(address)
|
||||
if coords:
|
||||
await bot.send_location(room, f'{registration} ({coords["utc"]})', coords["lat"], coords["lng"])
|
||||
else:
|
||||
await bot.send_text(room, f'No Flarm ID found for {registration}!')
|
||||
|
||||
elif len(args) == 3 and args[0] == "!flog":
|
||||
if args[1] == 'station':
|
||||
bot.must_be_admin(room, event)
|
||||
|
||||
station = args[2]
|
||||
self.station_rooms[room.room_id] = station
|
||||
self.logger.info(f'Station now for this room {self.station_rooms.get(room.room_id)}')
|
||||
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Set OGN station {station} to this room')
|
||||
|
||||
|
||||
def get_coords_for_address(self, address):
|
||||
# https://flightbook.glidernet.org/api/live/address/~91DADF5B86
|
||||
url = f'{self.fb.base_url}/live/address/{address}'
|
||||
data = None
|
||||
with requests.Session() as session:
|
||||
response = session.get(url, headers={'Connection': 'close'}, verify=False)
|
||||
data = response.json()
|
||||
|
||||
# print(json.dumps(data, sort_keys=True, indent=4))
|
||||
return data
|
||||
|
||||
|
||||
def text_flog(self, data, showtow):
|
||||
out = ""
|
||||
if len(data["flights"]) == 0:
|
||||
out = f'No known flights today at {data["airfield"]["name"]}'
|
||||
else:
|
||||
out = f'Flights at {data["airfield"]["name"]} ({data["airfield"]["code"]}) {data["date"]}:' + "\n"
|
||||
flights = data['flights']
|
||||
for flight in flights:
|
||||
if not showtow and flight["towing"]:
|
||||
continue
|
||||
out = out + self.fb.flight2string(flight, data) + "\n"
|
||||
return out
|
||||
|
||||
def html_flog(self, data, showtow):
|
||||
out = ""
|
||||
if len(data["flights"]) == 0:
|
||||
out = f'No known flights today at {data["airfield"]["name"]}'
|
||||
else:
|
||||
out = f'<b>✈ Flights at {data["airfield"]["name"]} ({data["airfield"]["code"]}) {data["date"]}:' + "</b>\n"
|
||||
flights = data['flights']
|
||||
out = out + "<ul>"
|
||||
for flight in flights:
|
||||
if not showtow and flight["towing"]:
|
||||
continue
|
||||
out = out + "<li>" + self.fb.flight2string(flight, data) + "</li>\n"
|
||||
out = out + "</ul>"
|
||||
return out
|
||||
|
||||
async def show_flog(self, bot, room, station):
|
||||
data = self.fb.get_flights(station)
|
||||
if data:
|
||||
await bot.send_html(room, self.html_flog(data, False), self.text_flog(data, False))
|
||||
else:
|
||||
await bot.send_text(room, f"Failed to get flight log for {station}")
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['station_rooms'] = self.station_rooms
|
||||
data['live_rooms'] = self.live_rooms
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get('station_rooms'):
|
||||
self.station_rooms = data['station_rooms']
|
||||
if data.get('live_rooms'):
|
||||
self.live_rooms = data['live_rooms']
|
||||
|
||||
def help(self):
|
||||
return ('Open Glider Network Field Log')
|
|
@ -1,100 +0,0 @@
|
|||
import urllib.request
|
||||
import urllib.parse
|
||||
import urllib.error
|
||||
|
||||
import requests
|
||||
from nio import AsyncClient, UploadError
|
||||
from nio import UploadResponse
|
||||
|
||||
from collections import namedtuple
|
||||
from modules.common.module import BotModule
|
||||
|
||||
class gfycat(object):
|
||||
"""
|
||||
A very simple module that allows you to
|
||||
1. search a gif on gfycat from a remote location
|
||||
"""
|
||||
|
||||
# Urls
|
||||
url = "https://api.gfycat.com"
|
||||
|
||||
def __init__(self):
|
||||
super(gfycat, self).__init__()
|
||||
|
||||
def __fetch(self, url, param):
|
||||
import json
|
||||
try:
|
||||
# added simple User-Ajent string to avoid CloudFlare block this request
|
||||
headers = {'User-Agent': 'Mozilla/5.0'}
|
||||
req = urllib.request.Request(url+param, headers=headers)
|
||||
connection = urllib.request.urlopen(req).read()
|
||||
except urllib.error.HTTPError as err:
|
||||
raise ValueError(err.read())
|
||||
result = namedtuple("result", "raw json")
|
||||
return result(raw=connection, json=json.loads(connection))
|
||||
|
||||
def search(self, param):
|
||||
result = self.__fetch(self.url, "/v1/gfycats/search?search_text=%s" % urllib.parse.quote_plus(param))
|
||||
if "errorMessage" in result.json:
|
||||
raise ValueError("%s" % self.json["errorMessage"])
|
||||
return _gfycatSearch(result)
|
||||
|
||||
class _gfycatUtils(object):
|
||||
|
||||
"""
|
||||
A utility class that provides the necessary common
|
||||
for all the other classes
|
||||
"""
|
||||
|
||||
def __init__(self, param, json):
|
||||
super(_gfycatUtils, self).__init__()
|
||||
# This can be used for other functions related to this class
|
||||
self.res = param
|
||||
self.js = json
|
||||
|
||||
def raw(self):
|
||||
return self.res.raw
|
||||
|
||||
def json(self):
|
||||
return self.js
|
||||
|
||||
def __len__(self):
|
||||
return len(self.js)
|
||||
|
||||
def get(self, what):
|
||||
try:
|
||||
return self.js[what]
|
||||
except KeyError as error:
|
||||
return ("Sorry, can't find %s" % error)
|
||||
|
||||
class _gfycatSearch(_gfycatUtils):
|
||||
|
||||
"""
|
||||
This class will provide more information for an existing url
|
||||
"""
|
||||
|
||||
def __init__(self, param):
|
||||
super(_gfycatSearch, self).__init__(param, param.json["gfycats"])
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) > 1:
|
||||
gif_url = "No image found"
|
||||
query = event.body[len(args[0])+1:]
|
||||
try:
|
||||
gifs = gfycat().search(query)
|
||||
if len(gifs) < 1:
|
||||
await bot.send_text(room, gif_url)
|
||||
return
|
||||
|
||||
gif_url = gifs.get(0)["content_urls"]["largeGif"]["url"]
|
||||
await bot.upload_and_send_image(room, gif_url)
|
||||
except Exception as exc:
|
||||
gif_url = str(exc)
|
||||
await bot.send_text(room, gif_url)
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !gfycat <query>')
|
||||
|
||||
def help(self):
|
||||
return ('Gfycat bot')
|
|
@ -1,134 +0,0 @@
|
|||
from github import Github
|
||||
import re
|
||||
import json
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
# Helper class with reusable code for github project stuff
|
||||
class GithubProject:
|
||||
# New format to support array of colors: domains={"koneet":["#BFDADC","#0CBBF0","#0CBBF0","#E15D19","#ED49CF"],"tilat":["#0E8A16","#1E8A16"]}
|
||||
def get_domains(description):
|
||||
p = re.compile('domains=\{.*\}')
|
||||
matches = json.loads(p.findall(description)[0][8:])
|
||||
return matches
|
||||
|
||||
def get_domain(reponame, domain):
|
||||
g = Github()
|
||||
repo = g.get_repo(reponame)
|
||||
domains = GithubProject.get_domains(repo.description)
|
||||
if(not len(domains)):
|
||||
return None, None
|
||||
domain_colors = domains.get(domain, None)
|
||||
if not domain_colors:
|
||||
return None, None
|
||||
|
||||
open_issues = repo.get_issues(state='open')
|
||||
domain_labels = []
|
||||
labels = repo.get_labels()
|
||||
for label in labels:
|
||||
for domain_color in domain_colors:
|
||||
if label.color == domain_color[1:]:
|
||||
domain_labels.append(label)
|
||||
|
||||
domain_issues = dict()
|
||||
domain_ok = []
|
||||
for label in domain_labels:
|
||||
label_issues = []
|
||||
for issue in open_issues:
|
||||
if label in issue.labels:
|
||||
label_issues.append(issue)
|
||||
if len(label_issues):
|
||||
domain_issues[label.name] = label_issues
|
||||
else:
|
||||
domain_ok.append(label.name)
|
||||
|
||||
return domain_issues, domain_ok
|
||||
|
||||
def domain_to_string(reponame, issues, ok):
|
||||
text_out = reponame + ":\n"
|
||||
for label in issues.keys():
|
||||
text_out = text_out + f'{label}: '
|
||||
for issue in issues[label]:
|
||||
# todo: add {issue.html_url} when URL previews can be disabled
|
||||
text_out = text_out + f'[{issue.title}] '
|
||||
text_out = text_out + f'\n'
|
||||
|
||||
text_out = text_out + " OK : " + ', '.join(ok)
|
||||
return text_out
|
||||
|
||||
def domain_to_html(reponame, issues, ok):
|
||||
html_out = f'<b>{reponame}:</b> <br/>'
|
||||
for label in issues.keys():
|
||||
html_out = html_out + f'🚧 {label}: '
|
||||
for issue in issues[label]:
|
||||
# todo: add {issue.html_url} when URL previews can be disabled
|
||||
html_out = html_out + f'[{issue.title}] '
|
||||
html_out = html_out + f'<br/>'
|
||||
|
||||
html_out = html_out + " OK ☑️ " + ', '.join(ok)
|
||||
return html_out
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.repo_rooms = dict()
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
|
||||
if len(args) == 1:
|
||||
if args[0] == 'rmrepo':
|
||||
bot.must_be_admin(room, event)
|
||||
del self.repo_rooms[room.room_id]
|
||||
await bot.send_text(room, 'Github repo removed from this room.')
|
||||
bot.save_settings()
|
||||
return
|
||||
if args[0] == 'repo':
|
||||
await bot.send_text(room, f'Github repo for this room is {self.repo_rooms.get(room.room_id, "not set")}.')
|
||||
return
|
||||
|
||||
domain = args[0]
|
||||
reponame = self.repo_rooms.get(room.room_id, None)
|
||||
if reponame:
|
||||
issues, ok = GithubProject.get_domain(reponame, domain)
|
||||
if issues or ok:
|
||||
await self.send_domain_status(bot, room, reponame, issues, ok)
|
||||
else:
|
||||
await bot.send_text(room, f'No labels with domain {domain} found.')
|
||||
else:
|
||||
await bot.send_text(room, f'No github repo set for this room. Use setrepo to set it.')
|
||||
return
|
||||
|
||||
if len(args) == 2:
|
||||
if args[0] == 'setrepo':
|
||||
bot.must_be_admin(room, event)
|
||||
|
||||
reponame = args[1]
|
||||
self.logger.info(f'Adding repo {reponame} to room id {room.room_id}')
|
||||
|
||||
self.repo_rooms[room.room_id] = reponame
|
||||
await bot.send_text(room, f'Github repo {reponame} set to this room.')
|
||||
bot.save_settings()
|
||||
return
|
||||
|
||||
await bot.send_text(room, 'Unknown command')
|
||||
|
||||
async def send_domain_status(self, bot, room, reponame, issues, ok):
|
||||
text_out = GithubProject.domain_to_string(reponame, issues, ok)
|
||||
html_out = GithubProject.domain_to_html(reponame, issues, ok)
|
||||
await bot.send_html(room, html_out, text_out)
|
||||
|
||||
|
||||
def help(self):
|
||||
return 'Github asset management'
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data["repo_rooms"] = self.repo_rooms
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("repo_rooms"):
|
||||
self.repo_rooms = data["repo_rooms"]
|
|
@ -1,60 +0,0 @@
|
|||
import urllib.request
|
||||
import urllib.parse
|
||||
import urllib.error
|
||||
|
||||
import os
|
||||
import giphypop
|
||||
import requests
|
||||
from nio import AsyncClient, UploadError
|
||||
from nio import UploadResponse
|
||||
|
||||
from collections import namedtuple
|
||||
from modules.common.module import BotModule
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
api_key = None
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 3 and args[1] == 'apikey':
|
||||
bot.must_be_owner(event)
|
||||
|
||||
self.api_key = args[2]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, 'Api key set')
|
||||
elif len(args) > 1:
|
||||
gif_url = "No image found"
|
||||
query = event.body[len(args[0])+1:]
|
||||
try:
|
||||
g = giphypop.Giphy(api_key=self.api_key)
|
||||
gifs = []
|
||||
try:
|
||||
for x in g.search(phrase=query, limit=1):
|
||||
gifs.append(x)
|
||||
except Exception:
|
||||
pass
|
||||
if len(gifs) < 1:
|
||||
await bot.send_text(room, gif_url)
|
||||
return
|
||||
|
||||
gif_url = gifs[0].media_url
|
||||
await bot.upload_and_send_image(room, gif_url)
|
||||
return
|
||||
except Exception as exc:
|
||||
gif_url = str(exc)
|
||||
await bot.send_text(room, gif_url)
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !giphy <query>')
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data["api_key"] = self.api_key
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("api_key"):
|
||||
self.api_key = data["api_key"]
|
||||
|
||||
def help(self):
|
||||
return ('Giphy bot')
|
|
@ -6,6 +6,7 @@ class MatrixModule(BotModule):
|
|||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.msg_users = False
|
||||
self.info = "More information at https://github.com/vranki/hemppa"
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
|
|
|
@ -1,41 +0,0 @@
|
|||
import sys
|
||||
import traceback
|
||||
from datetime import datetime, timedelta
|
||||
from random import randrange
|
||||
|
||||
from igramscraper.exception.instagram_not_found_exception import \
|
||||
InstagramNotFoundException
|
||||
from igramscraper.instagram import Instagram
|
||||
|
||||
from modules.common.pollingservice import PollingService
|
||||
|
||||
|
||||
class MatrixModule(PollingService):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.instagram = Instagram()
|
||||
self.service_name = 'Instagram'
|
||||
self.enabled = False
|
||||
|
||||
async def poll_implementation(self, bot, account, roomid, send_messages):
|
||||
try:
|
||||
medias = self.instagram.get_medias(account, 5)
|
||||
self.logger.info(f'Polling instagram account {account} for room {roomid} - got {len(medias)} posts.')
|
||||
for media in medias:
|
||||
if send_messages:
|
||||
if media.identifier not in self.known_ids:
|
||||
await bot.send_html(bot.get_room_by_id(roomid),
|
||||
f'<a href="{media.link}">Instagram {account}:</a> {media.caption}',
|
||||
f'{account}: {media.caption} {media.link}')
|
||||
self.known_ids.add(media.identifier)
|
||||
|
||||
except InstagramNotFoundException:
|
||||
self.logger.error(f"{account} does not exist - deleting from room")
|
||||
self.account_rooms[roomid].remove(account)
|
||||
bot.save_settings()
|
||||
except Exception:
|
||||
self.logger.error('Polling instagram account failed:')
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
|
||||
polldelay = timedelta(minutes=30 + randrange(30))
|
||||
self.next_poll_time[roomid] = datetime.now() + polldelay
|
|
@ -1,54 +0,0 @@
|
|||
import html
|
||||
import requests
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.url_generator_url = "https://inspirobot.me/api?generate=true"
|
||||
self.matrix_uri_cache = dict()
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
self.logger.debug(f"room: {room.name} sender: {event.sender} wants to be inspired!")
|
||||
|
||||
args = event.body.split()
|
||||
|
||||
if len(args) == 1:
|
||||
await self.send_inspiration(bot, room, self.url_generator_url)
|
||||
return
|
||||
elif len(args) == 2:
|
||||
if args[1] == "help":
|
||||
await self.command_help(bot, room)
|
||||
return
|
||||
await bot.send_text(room, f"unknown command: {args}")
|
||||
await self.command_help(bot, room)
|
||||
|
||||
async def send_inspiration(self, bot, room, url_generator_url):
|
||||
self.logger.debug(f"Asking inspirobot for pic url at {url_generator_url}")
|
||||
response = requests.get(url_generator_url)
|
||||
|
||||
if response.status_code != 200:
|
||||
self.logger.error("unable to request inspirobot api. response: [status: %d text: %s]", response.status_code, response.text)
|
||||
return await bot.send_text(room, f"sorry. something went wrong accessing inspirobot: {response.status_code}: {response.text}")
|
||||
|
||||
pic_url = response.text
|
||||
self.logger.debug("Sending image with src='%s'", pic_url)
|
||||
await bot.upload_and_send_image(room, pic_url)
|
||||
|
||||
def help(self):
|
||||
return """I'm InspiroBot.
|
||||
I am an artificial intelligence dedicated to generating unlimited amounts of unique inspirational quotes
|
||||
for endless enrichment of pointless human existence.
|
||||
https://inspirobot.me/
|
||||
"""
|
||||
|
||||
async def command_help(self, bot, room):
|
||||
msg = """usage: !inspire [command]
|
||||
No command to generate an inspirational poster just for you!
|
||||
- help - show this. Useful, isn't it?
|
||||
"""
|
||||
await bot.send_html(room, f"<b>{html.escape(self.help())}</b>", self.help())
|
||||
await bot.send_text(room, msg)
|
|
@ -1,45 +0,0 @@
|
|||
from nio import RoomMessageUnknown, UnknownEvent
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
bot = None
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
self.bot = bot
|
||||
bot.client.add_event_callback(self.unknownevent_cb, (UnknownEvent,))
|
||||
|
||||
def matrix_stop(self, bot):
|
||||
super().matrix_stop(bot)
|
||||
bot.remove_callback(self.unknownevent_cb)
|
||||
|
||||
async def unknownevent_cb(self, room, event):
|
||||
try:
|
||||
if 'type' in event.source and event.source['type'] == 'im.vector.modular.widgets' and event.source['content']['type'] == 'jitsi':
|
||||
# Todo: Domain not found in Element Android events!
|
||||
domain = event.source['content']['data']['domain']
|
||||
conferenceId = event.source['content']['data']['conferenceId']
|
||||
isAudioOnly = event.source['content']['data']['isAudioOnly']
|
||||
sender = event.source['sender']
|
||||
sender_response = await self.bot.client.get_displayname(event.sender)
|
||||
sender = sender_response.displayname
|
||||
# This is just a guess - is this the proper way to generate URL? Probably not.
|
||||
jitsiUrl = f'https://{domain}/{conferenceId}'
|
||||
|
||||
calltype = 'video call'
|
||||
if isAudioOnly:
|
||||
calltype = 'audio call'
|
||||
|
||||
plainMessage = f'{sender} started a {calltype}: {jitsiUrl}'
|
||||
htmlMessage = f'{sender} started a <a href="{jitsiUrl}">{calltype}</a>'
|
||||
await self.bot.send_html(room, htmlMessage, plainMessage)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed parsing Jitsi event. Error: {e}")
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
pass
|
||||
|
||||
def help(self):
|
||||
return 'Sends text links when user starts a Jitsi video or audio call in room'
|
|
@ -0,0 +1,19 @@
|
|||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
async def matrix_message(self, bot, room, event):
|
||||
rawlist = event.body.split(' ', 1)
|
||||
rawlist.pop(0)
|
||||
|
||||
mdlist = list()
|
||||
rawlist = rawlist[0].splitlines()
|
||||
for item in rawlist:
|
||||
mdlist.append('- [ ] ' + item)
|
||||
|
||||
self.logger.debug(mdlist)
|
||||
|
||||
await bot.send_text(room, "\n".join(mdlist), event)
|
||||
|
||||
def help(self):
|
||||
return 'Formats the list given as arguments to a Markdown checkbox list'
|
148
modules/loc.py
148
modules/loc.py
|
@ -1,148 +0,0 @@
|
|||
from geopy.geocoders import Nominatim
|
||||
from nio import RoomMessageUnknown
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.bot = None
|
||||
self.enabled_rooms = []
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
self.bot = bot
|
||||
bot.client.add_event_callback(self.unknown_cb, RoomMessageUnknown)
|
||||
|
||||
def matrix_stop(self, bot):
|
||||
super().matrix_stop(bot)
|
||||
bot.remove_callback(self.unknown_cb)
|
||||
|
||||
'''
|
||||
Location events are like: https://spec.matrix.org/v1.2/client-server-api/#mlocation
|
||||
{
|
||||
"content": {
|
||||
"body": "geo:61.49342512194717,23.765914658307736",
|
||||
"geo_uri": "geo:61.49342512194717,23.765914658307736",
|
||||
"msgtype": "m.location",
|
||||
"org.matrix.msc1767.text": "geo:61.49342512194717,23.765914658307736",
|
||||
"org.matrix.msc3488.asset": {
|
||||
"type": "m.pin"
|
||||
},
|
||||
"org.matrix.msc3488.location": {
|
||||
"description": "geo:61.49342512194717,23.765914658307736",
|
||||
"uri": "geo:61.49342512194717,23.765914658307736"
|
||||
},
|
||||
"org.matrix.msc3488.ts": 1653837929839
|
||||
},
|
||||
"room_id": "!xsBGdLYGrfYhGfLtHG:hacklab.fi",
|
||||
"type": "m.room.message"
|
||||
}
|
||||
|
||||
BUT sometimes there's ; separating altitude??
|
||||
{
|
||||
"content": {
|
||||
"body": "geo:61.4704211,23.4864855;36.900001525878906",
|
||||
"geo_uri": "geo:61.4704211,23.4864855;36.900001525878906",
|
||||
"msgtype": "m.location",
|
||||
"org.matrix.msc1767.text": "geo:61.4704211,23.4864855;36.900001525878906",
|
||||
"org.matrix.msc3488.asset": {
|
||||
"type": "m.self"
|
||||
},
|
||||
"org.matrix.msc3488.location": {
|
||||
"description": "geo:61.4704211,23.4864855;36.900001525878906",
|
||||
"uri": "geo:61.4704211,23.4864855;36.900001525878906"
|
||||
},
|
||||
"org.matrix.msc3488.ts": 1653931683087
|
||||
},
|
||||
"origin_server_ts": 1653931683998,
|
||||
"sender": "@cos:hacklab.fi",
|
||||
"type": "m.room.message",
|
||||
"unsigned": {
|
||||
"age": 70
|
||||
},
|
||||
"event_id": "$6xXutKF9EppPMMdc4aQLZjHyd8My0rIZuNZEcuSIPws",
|
||||
"room_id": "!CLofqdurVWZCMpFnqM:hacklab.fi"
|
||||
}
|
||||
'''
|
||||
|
||||
async def unknown_cb(self, room, event):
|
||||
if event.msgtype != 'm.location':
|
||||
return
|
||||
if room.room_id not in self.enabled_rooms:
|
||||
return
|
||||
location_text = event.content['body']
|
||||
|
||||
# Fallback if body is empty
|
||||
if (len(location_text) == 0) or ('geo:' in location_text):
|
||||
location_text = 'location'
|
||||
|
||||
sender_response = await self.bot.client.get_displayname(event.sender)
|
||||
sender = sender_response.displayname
|
||||
|
||||
geo_uri = event.content['geo_uri']
|
||||
try:
|
||||
geo_uri = geo_uri[4:] # Strip geo:
|
||||
|
||||
if ';' in geo_uri: # Strip altitude, if present
|
||||
geo_uri = geo_uri.split(';')[0]
|
||||
latlon = geo_uri.split(',')
|
||||
|
||||
# Sanity checks to avoid url manipulation
|
||||
float(latlon[0])
|
||||
float(latlon[1])
|
||||
except Exception:
|
||||
self.bot.send_text(room, "Error: Invalid location " + geo_uri)
|
||||
return
|
||||
|
||||
osm_link = f"https://www.openstreetmap.org/?mlat={latlon[0]}&mlon={latlon[1]}"
|
||||
|
||||
plain = f'{sender} sent {location_text} {osm_link} 🚩'
|
||||
html = f'{sender} sent <a href="{osm_link}">{location_text}</a> 🚩'
|
||||
|
||||
await self.bot.send_html(room, html, plain)
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
if len(args) == 0:
|
||||
await bot.send_text(room, 'Usage: !loc <location name>')
|
||||
return
|
||||
elif len(args) == 1:
|
||||
if args[0] == 'enable':
|
||||
bot.must_be_admin(room, event)
|
||||
self.enabled_rooms.append(room.room_id)
|
||||
self.enabled_rooms = list(dict.fromkeys(self.enabled_rooms)) # Deduplicate
|
||||
await bot.send_text(room, "Ok, sending locations events here as text versions")
|
||||
bot.save_settings()
|
||||
return
|
||||
if args[0] == 'disable':
|
||||
bot.must_be_admin(room, event)
|
||||
self.enabled_rooms.remove(room.room_id)
|
||||
await bot.send_text(room, "Ok, disabled here")
|
||||
bot.save_settings()
|
||||
return
|
||||
|
||||
query = event.body[4:]
|
||||
geolocator = Nominatim(user_agent=bot.appid)
|
||||
self.logger.info('loc: looking up %s ..', query)
|
||||
location = geolocator.geocode(query)
|
||||
self.logger.info('loc rx %s', location)
|
||||
if location:
|
||||
await bot.send_location(room, location.address, location.latitude, location.longitude, "m.pin")
|
||||
else:
|
||||
await bot.send_text(room, "Can't find " + query + " on map!")
|
||||
|
||||
def help(self):
|
||||
return 'Search for locations and display Matrix location events as OSM links'
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data["enabled_rooms"] = self.enabled_rooms
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("enabled_rooms"):
|
||||
self.enabled_rooms = data["enabled_rooms"]
|
149
modules/md.py
149
modules/md.py
|
@ -1,149 +0,0 @@
|
|||
from mastodon import Mastodon
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
apps = dict() # instance url <-> [app_id, app_secret]
|
||||
logins = dict() # mxid <-> [username, accesstoken, instanceurl]
|
||||
roomlogins = dict() # roomid <-> [username, accesstoken, instanceurl]
|
||||
public = False
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
if len(args) >= 1:
|
||||
if args[0] == "toot":
|
||||
toot_body = " ".join(args[1:])
|
||||
accesstoken = None
|
||||
if room.room_id in self.roomlogins.keys():
|
||||
bot.must_be_admin(room, event)
|
||||
username = self.roomlogins[room.room_id][0]
|
||||
accesstoken = self.roomlogins[room.room_id][1]
|
||||
instanceurl = self.roomlogins[room.room_id][2]
|
||||
elif event.sender in self.logins.keys():
|
||||
if not self.public:
|
||||
bot.must_be_owner(event)
|
||||
username = self.logins[event.sender][0]
|
||||
accesstoken = self.logins[event.sender][1]
|
||||
instanceurl = self.logins[event.sender][2]
|
||||
if accesstoken:
|
||||
toottodon = Mastodon(
|
||||
access_token = accesstoken,
|
||||
api_base_url = instanceurl
|
||||
)
|
||||
tootdict = toottodon.toot(toot_body)
|
||||
await bot.send_text(room, tootdict['url'])
|
||||
else:
|
||||
await bot.send_text(room, f'{event.sender} has not logged in yet with the bot. Please do so.')
|
||||
return
|
||||
|
||||
if len(args) == 4:
|
||||
if args[0] == "login":
|
||||
if not self.public:
|
||||
bot.must_be_owner(event)
|
||||
mxid = event.sender
|
||||
instanceurl = args[1]
|
||||
username = args[2]
|
||||
password = args[3]
|
||||
await self.register_app_if_necessary(bot, room, instanceurl)
|
||||
await self.login_to_account(bot, room, mxid, None, instanceurl, username, password)
|
||||
return
|
||||
if len(args) == 5:
|
||||
if args[0] == "roomlogin":
|
||||
if not self.public:
|
||||
bot.must_be_owner(event)
|
||||
roomalias = args[1]
|
||||
instanceurl = args[2]
|
||||
username = args[3]
|
||||
password = args[4]
|
||||
roomid = await bot.get_room_by_alias(roomalias)
|
||||
if roomid:
|
||||
await self.register_app_if_necessary(bot, room, instanceurl)
|
||||
await self.login_to_account(bot, room, None, roomid, instanceurl, username, password)
|
||||
else:
|
||||
await bot.send_text(room, f'Unknown room alias {roomalias} - invite bot to the room first.')
|
||||
return
|
||||
if len(args) == 1:
|
||||
if args[0] == "status":
|
||||
out = f'App registered on {len(self.apps)} instances, public use enabled: {self.public}\n'
|
||||
out = out + f'{len(self.logins)} users logged in:\n'
|
||||
for login in self.logins.keys():
|
||||
out = out + f' - {login} as {self.logins[login][0]} on {self.logins[login][2]}\n'
|
||||
out = out + f'{len(self.roomlogins)} per-room logins:\n'
|
||||
for roomlogin in self.roomlogins:
|
||||
out = out + f' - {roomlogin} as {self.roomlogins[roomlogin][0]} on {self.roomlogins[roomlogin][2]}\n'
|
||||
|
||||
await bot.send_text(room, out)
|
||||
if args[0] == "logout":
|
||||
if event.sender in self.logins.keys():
|
||||
# TODO: Is there a way to invalidate the access token with API?
|
||||
del self.logins[event.sender]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'{event.sender} login data removed from the bot.')
|
||||
if args[0] == "roomlogout":
|
||||
bot.must_be_admin(room, event)
|
||||
if room.room_id in self.roomlogins.keys():
|
||||
del self.roomlogins[room.room_id]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Login data for this room removed from the bot.')
|
||||
else:
|
||||
await bot.send_text(room, f'No login found for room id {room.room_id}.')
|
||||
if args[0] == "clear":
|
||||
bot.must_be_owner(event)
|
||||
self.logins = dict()
|
||||
self.roomlogins = dict()
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'All Mastodon logins cleared')
|
||||
if args[0] == "setpublic":
|
||||
bot.must_be_owner(event)
|
||||
self.public = True
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Mastodon usage is now public use')
|
||||
if args[0] == "setprivate":
|
||||
bot.must_be_owner(event)
|
||||
self.public = False
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Mastodon usage is now restricted to bot owners')
|
||||
|
||||
async def register_app_if_necessary(self, bot, room, instanceurl):
|
||||
if not instanceurl in self.apps.keys():
|
||||
app = Mastodon.create_app(f'Hemppa The Bot - {bot.client.user}', api_base_url = instanceurl)
|
||||
self.apps[instanceurl] = [app[0], app[1]]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Registered Mastodon app on {instanceurl}')
|
||||
|
||||
async def login_to_account(self, bot, room, mxid, roomid, instanceurl, username, password):
|
||||
mastodon = Mastodon(client_id = self.apps[instanceurl][0], client_secret = self.apps[instanceurl][1], api_base_url = instanceurl)
|
||||
access_token = mastodon.log_in(username, password)
|
||||
print('login_To_account', mxid, roomid)
|
||||
if mxid:
|
||||
self.logins[mxid] = [username, access_token, instanceurl]
|
||||
await bot.send_text(room, f'Logged Matrix user {mxid} into {instanceurl} as {username}')
|
||||
elif roomid:
|
||||
self.roomlogins[roomid] = [username, access_token, instanceurl]
|
||||
await bot.send_text(room, f'Set room {roomid} Mastodon user to {username} on {instanceurl}')
|
||||
|
||||
bot.save_settings()
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['apps'] = self.apps
|
||||
data['logins'] = self.logins
|
||||
data['roomlogins'] = self.roomlogins
|
||||
data['public'] = self.public
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("apps"):
|
||||
self.apps = data["apps"]
|
||||
if data.get("logins"):
|
||||
self.logins = data["logins"]
|
||||
if data.get("roomlogins"):
|
||||
self.roomlogins = data["roomlogins"]
|
||||
if data.get("public"):
|
||||
self.public = data["public"]
|
||||
|
||||
def help(self):
|
||||
return ('Mastodon')
|
|
@ -1,20 +0,0 @@
|
|||
import urllib.request
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 2:
|
||||
icao = args[1]
|
||||
metar_url = "https://tgftp.nws.noaa.gov/data/observations/metar/stations/" + \
|
||||
icao.upper() + ".TXT"
|
||||
response = urllib.request.urlopen(metar_url)
|
||||
lines = response.readlines()
|
||||
await bot.send_text(room, lines[1].decode("utf-8").strip())
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !metar <icao code>')
|
||||
|
||||
def help(self):
|
||||
return ('Metar data access (usage: !metar <icao code>)')
|
|
@ -1,96 +0,0 @@
|
|||
from modules.common.module import BotModule
|
||||
import random
|
||||
import socket
|
||||
from struct import pack, unpack
|
||||
import time
|
||||
|
||||
# Modified from https://gist.github.com/azlux/315c924af4800ffbc2c91db3ab8a59bc
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.host = None
|
||||
self.port = 64738
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get('host'):
|
||||
self.host = data['host']
|
||||
if data.get('port'):
|
||||
self.port = data['port']
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['host'] = self.host
|
||||
data['port'] = self.port
|
||||
return data
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
|
||||
if len(args) > 1 and args[1] in ['set', 'setserver']:
|
||||
bot.must_be_owner(event)
|
||||
self.logger.info(f"room: {room.name} sender: {event.sender} is setting the server settings")
|
||||
if len(args) < 3:
|
||||
self.host = None
|
||||
return await bot.send_text(room, f'Usage: !{args[0]} {args[1]} [host] ([port])')
|
||||
self.host = args[2]
|
||||
if len(args) > 3:
|
||||
self.port = int(args[3])
|
||||
if not self.port:
|
||||
self.port = 64738
|
||||
bot.save_settings()
|
||||
return await bot.send_text(room, f'Set server settings: host: {self.host} port: {self.port}')
|
||||
|
||||
self.logger.info(f"room: {room.name} sender: {event.sender} wants mumble info")
|
||||
if not self.host:
|
||||
return await bot.send_text(room, f'No mumble host info set!')
|
||||
|
||||
try:
|
||||
ret = self.mumble_ping()
|
||||
# https://wiki.mumble.info/wiki/Protocol
|
||||
# [0,1,2,3] = version
|
||||
version = '.'.join(map(str, ret[1:4]))
|
||||
# [4] = identifier passed to the server (used here to get ping time)
|
||||
ping = int(time.time() * 1000) - ret[4]
|
||||
# [7] = bandwidth
|
||||
# [5] = users
|
||||
# [6] = max users
|
||||
await bot.send_text(room, f'{self.host}:{self.port} (v{version}): {ret[5]} / {ret[6]} (ping: {ping}ms)')
|
||||
except socket.gaierror as e:
|
||||
self.logger.error(f"room: {room.name}: mumble_ping failed: {e}")
|
||||
await bot.send_text(room, f'Could not get get mumble server info: {e}')
|
||||
|
||||
def mumble_ping(self):
|
||||
addrinfo = socket.getaddrinfo(self.host, self.port, 0, 0, socket.SOL_UDP)
|
||||
|
||||
for (family, socktype, proto, canonname, sockaddr) in addrinfo:
|
||||
s = socket.socket(family, socktype, proto=proto)
|
||||
s.settimeout(2)
|
||||
|
||||
buf = pack(">iQ", 0, int(time.time() * 1000))
|
||||
try:
|
||||
s.sendto(buf, sockaddr)
|
||||
except (socket.gaierror, socket.timeout) as e:
|
||||
continue
|
||||
|
||||
try:
|
||||
data, addr = s.recvfrom(1024)
|
||||
except socket.timeout:
|
||||
continue
|
||||
|
||||
return unpack(">bbbbQiii", data)
|
||||
|
||||
def help(self):
|
||||
return 'Show info about a mumble server'
|
||||
|
||||
def long_help(self):
|
||||
text = self.help() + (
|
||||
'\n- "!mumble": Get the status of the configured mumble server')
|
||||
|
||||
if bot and event and bot.is_owner(event):
|
||||
text += (
|
||||
'\nOwner commands:'
|
||||
'\n- "!mumble set [host] ([port])": Set use the following host and port'
|
||||
'\n- If no port is given, defaults to 64738')
|
||||
return text
|
|
@ -1,30 +0,0 @@
|
|||
from modules.common.module import BotModule
|
||||
import requests, json
|
||||
import traceback
|
||||
|
||||
from modules.common.pollingservice import PollingService
|
||||
|
||||
class MatrixModule(PollingService):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.service_name = 'MXMA'
|
||||
self.poll_interval_min = 5
|
||||
self.poll_interval_random = 2
|
||||
self.owner_only = True
|
||||
self.send_all = True
|
||||
self.enabled = False
|
||||
|
||||
async def poll_implementation(self, bot, account, roomid, send_messages):
|
||||
try:
|
||||
response = requests.get(url=account, timeout=5)
|
||||
if response.status_code == 200:
|
||||
if 'messages' in response.json():
|
||||
messages = response.json()['messages']
|
||||
for message in messages:
|
||||
success = await bot.send_msg(message['to'], message['title'], message['message'])
|
||||
except Exception:
|
||||
self.logger.error('Polling MXMA failed:')
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
|
||||
def help(self):
|
||||
return 'Matrix messaging API'
|
|
@ -1,87 +0,0 @@
|
|||
import re
|
||||
|
||||
from modules.common.module import BotModule
|
||||
from nio import RoomMessageText
|
||||
|
||||
|
||||
# This module reads matrix messages and converts twitter.com links to nitter.nl
|
||||
# Module will only target messages that contain only the twitter link
|
||||
# Additionally module will target only profile links or post links, query parameters are removed
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.regex = re.compile(r'https://twitter.com/([^?]*)')
|
||||
self.bot = None
|
||||
self.enabled_rooms = []
|
||||
|
||||
def matrix_start(self, bot):
|
||||
"""
|
||||
Register callback for all RoomMessageText events on startup
|
||||
"""
|
||||
super().matrix_start(bot)
|
||||
self.bot = bot
|
||||
bot.client.add_event_callback(self.text_cb, RoomMessageText)
|
||||
|
||||
def matrix_stop(self, bot):
|
||||
super().matrix_stop(bot)
|
||||
bot.remove_callback(self.text_cb)
|
||||
|
||||
async def text_cb(self, room, event):
|
||||
"""
|
||||
Handle client callbacks for all room text events
|
||||
"""
|
||||
if room.room_id not in self.enabled_rooms:
|
||||
return
|
||||
|
||||
if self.bot.should_ignore_event(event):
|
||||
return
|
||||
|
||||
# no content at all?
|
||||
if len(event.body) < 1:
|
||||
return
|
||||
|
||||
if event.body.startswith('!'):
|
||||
return
|
||||
|
||||
if len(event.body.split()) <= 1:
|
||||
if event.body.startswith('https://twitter.com/'):
|
||||
for link in self.regex.findall(event.body):
|
||||
await self.bot.send_text(room, f'https://nitter.net/{link}')
|
||||
return
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
"""
|
||||
Required for initialization of the module
|
||||
"""
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
if len(args) == 0:
|
||||
await bot.send_text(room, 'Usage: !nitter <enable|disable>')
|
||||
return
|
||||
if len(args) == 1:
|
||||
if args[0] == 'enable':
|
||||
bot.must_be_admin(room, event)
|
||||
self.enabled_rooms.append(room.room_id)
|
||||
self.enabled_rooms = list(dict.fromkeys(self.enabled_rooms)) # Deduplicate
|
||||
await bot.send_text(room, "Ok, enabling conversion of twitter links to nitter links here")
|
||||
bot.save_settings()
|
||||
return
|
||||
if args[0] == 'disable':
|
||||
bot.must_be_admin(room, event)
|
||||
self.enabled_rooms.remove(room.room_id)
|
||||
await bot.send_text(room, "Ok, disabling conversion of twitter links to nitter links here")
|
||||
bot.save_settings()
|
||||
return
|
||||
|
||||
def help(self):
|
||||
return 'Converts Twitter links to nitter links.'
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data["enabled_rooms"] = self.enabled_rooms
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("enabled_rooms"):
|
||||
self.enabled_rooms = data["enabled_rooms"]
|
|
@ -1,47 +0,0 @@
|
|||
import re
|
||||
import urllib.request
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 2 and len(args[1]) == 4:
|
||||
icao = args[1].upper()
|
||||
notam = self.get_notam(icao)
|
||||
await bot.send_text(room, notam)
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !notam <icao code>')
|
||||
|
||||
def help(self):
|
||||
return ('NOTAM data access (usage: !notam <icao code>) - Currently Finnish airports only')
|
||||
|
||||
# TODO: This handles only finnish airports. Implement support for other countries.
|
||||
def get_notam(self, icao):
|
||||
if not icao.startswith('EF'):
|
||||
return ('Only Finnish airports supported currently, sorry.')
|
||||
|
||||
icao_first_letter = icao[2]
|
||||
if icao_first_letter < 'M':
|
||||
notam_url = "https://www.ais.fi/ais/bulletins/envfra.htm"
|
||||
else:
|
||||
notam_url = "https://www.ais.fi/ais/bulletins/envfrm.htm"
|
||||
|
||||
response = urllib.request.urlopen(notam_url)
|
||||
lines = response.readlines()
|
||||
lines = b''.join(lines)
|
||||
lines = lines.decode("ISO-8859-1")
|
||||
# Strip EN-ROUTE from end
|
||||
lines = lines[0:lines.find('<a name="EN-ROUTE">')]
|
||||
|
||||
startpos = lines.find('<a name="' + icao + '">')
|
||||
if startpos > -1:
|
||||
endpos = lines.find('<h3>', startpos)
|
||||
if endpos == -1:
|
||||
endpos = len(lines)
|
||||
notam = lines[startpos:endpos]
|
||||
notam = re.sub('<[^<]+?>', ' ', notam)
|
||||
if len(notam) > 4:
|
||||
return notam
|
||||
return f'Cannot parse notam for {icao} at {notam_url}'
|
|
@ -1,116 +0,0 @@
|
|||
from modules.common.module import BotModule
|
||||
from nio import RoomMessageMedia
|
||||
from typing import Optional
|
||||
import sys
|
||||
import traceback
|
||||
import cups
|
||||
import httpx
|
||||
import aiofiles
|
||||
import os
|
||||
|
||||
# Credit: https://medium.com/swlh/how-to-boost-your-python-apps-using-httpx-and-asynchronous-calls-9cfe6f63d6ad
|
||||
async def download_file(url: str, filename: Optional[str] = None) -> str:
|
||||
filename = filename or url.split("/")[-1]
|
||||
filename = f"/tmp/{filename}"
|
||||
client = httpx.AsyncClient()
|
||||
async with client.stream("GET", url) as resp:
|
||||
resp.raise_for_status()
|
||||
async with aiofiles.open(filename, "wb") as f:
|
||||
async for data in resp.aiter_bytes():
|
||||
if data:
|
||||
await f.write(data)
|
||||
await client.aclose()
|
||||
return filename
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.printers = dict() # roomid <-> printername
|
||||
self.bot = None
|
||||
self.paper_size = 'A4' # Todo: configurable
|
||||
self.enabled = False
|
||||
|
||||
async def file_cb(self, room, event):
|
||||
try:
|
||||
if self.bot.should_ignore_event(event):
|
||||
return
|
||||
if room.room_id in self.printers:
|
||||
printer = self.printers[room.room_id]
|
||||
self.logger.debug(f'RX file - MXC {event.url} - from {event.sender}')
|
||||
https_url = await self.bot.client.mxc_to_http(event.url)
|
||||
self.logger.debug(f'HTTPS URL {https_url}')
|
||||
filename = await download_file(https_url)
|
||||
self.logger.debug(f'RX filename {filename}')
|
||||
conn = cups.Connection ()
|
||||
conn.printFile(printer, filename, f"Printed from Matrix - {filename}", {'fit-to-page': 'TRUE', 'PageSize': self.paper_size})
|
||||
await self.bot.send_text(room, f'Printing file on {printer}..')
|
||||
os.remove(filename) # Not sure if we should wait first?
|
||||
else:
|
||||
self.logger.debug(f'No printer configured for room {room.room_id}')
|
||||
except:
|
||||
self.logger.warning(f"File callback failure")
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
await self.bot.send_text(room, f'Printing failed, sorry. See log for details.')
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
bot.client.add_event_callback(self.file_cb, RoomMessageMedia)
|
||||
self.bot = bot
|
||||
|
||||
def matrix_stop(self, bot):
|
||||
super().matrix_stop(bot)
|
||||
bot.remove_callback(self.file_cb)
|
||||
self.bot = None
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
bot.must_be_owner(event)
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
conn = cups.Connection ()
|
||||
printers = conn.getPrinters ()
|
||||
|
||||
if len(args) == 1:
|
||||
if args[0] == 'list':
|
||||
msg = f"Available printers:\n"
|
||||
for printer in printers:
|
||||
print(printer, printers[printer]["device-uri"])
|
||||
msg += f' - {printer} / {printers[printer]["device-uri"]}'
|
||||
for roomid, printerid in self.printers.items():
|
||||
if printerid == printer:
|
||||
msg += f' <- room {roomid}'
|
||||
msg += '\n'
|
||||
await bot.send_text(room, msg)
|
||||
elif args[0] == 'rmroomprinter':
|
||||
del self.printers[room.room_id]
|
||||
await bot.send_text(room, f'Deleted printer from this room.')
|
||||
bot.save_settings()
|
||||
|
||||
if len(args) == 2:
|
||||
if args[0] == 'setroomprinter':
|
||||
printer = args[1]
|
||||
if printer in printers:
|
||||
await bot.send_text(room, f'Printing with {printer} here.')
|
||||
self.printers[room.room_id] = printer
|
||||
bot.save_settings()
|
||||
else:
|
||||
await bot.send_text(room, f'No printer called {printer} in your CUPS.')
|
||||
if args[0] == 'setpapersize':
|
||||
self.paper_size = args[1]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f'Paper size set to {self.paper_size}.')
|
||||
|
||||
def help(self):
|
||||
return 'Print files from Matrix'
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data["printers"] = self.printers
|
||||
data["paper_size"] = self.paper_size
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("printers"):
|
||||
self.printers = data["printers"]
|
||||
if data.get("paper_size"):
|
||||
self.paper_size = data["paper_size"]
|
|
@ -1,80 +0,0 @@
|
|||
from typing import Text
|
||||
import urllib
|
||||
import urllib.request
|
||||
from urllib.parse import urlencode, quote_plus
|
||||
import json
|
||||
import time
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
class PeerTubeClient:
|
||||
def __init__(self):
|
||||
self.instance_url = 'https://sepiasearch.org/'
|
||||
|
||||
def search(self, search_string, count=0):
|
||||
if count == 0:
|
||||
count = 15 # Pt default, could also remove from params..
|
||||
params = urlencode({'search': search_string, 'count': count}, quote_via=quote_plus)
|
||||
search_url = self.instance_url + 'api/v1/search/videos?' + params
|
||||
response = urllib.request.urlopen(search_url)
|
||||
data = json.loads(response.read().decode("utf-8"))
|
||||
return data
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.instance_url = 'https://sepiasearch.org/'
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
self.add_module_aliases(bot, ['ptall'])
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 3:
|
||||
if args[1] == "setinstance":
|
||||
bot.must_be_owner(event)
|
||||
self.instance_url = args[2]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, 'Instance url set to ' + self.instance_url, bot_ignore=True)
|
||||
return
|
||||
|
||||
if len(args) == 2:
|
||||
if args[1] == "showinstance":
|
||||
await bot.send_text(room, 'Using instance at ' + self.instance_url, bot_ignore=True)
|
||||
return
|
||||
|
||||
if len(args) > 1:
|
||||
query = event.body[len(args[0])+1:]
|
||||
p = PeerTubeClient()
|
||||
p.instance_url = self.instance_url
|
||||
count = 1
|
||||
if args[0] == '!ptall':
|
||||
count = 0
|
||||
data = p.search(query, count)
|
||||
if len(data['data']) > 0:
|
||||
for video in data['data']:
|
||||
video_url = video.get("url") or self.instance_url + 'videos/watch/' + video["uuid"]
|
||||
duration = time.strftime('%H:%M:%S', time.gmtime(video["duration"]))
|
||||
instancedata = video["account"]["host"]
|
||||
html = f'<a href="{video_url}">{video["name"]}</a> {video["description"] or ""} [{duration}] @ {instancedata}'
|
||||
text = f'{video_url} : {video["name"]} {video.get("description") or ""} [{duration}]'
|
||||
await bot.send_html(room, html, text, bot_ignore=True)
|
||||
else:
|
||||
await bot.send_text(room, 'Sorry, no videos found found.', bot_ignore=True)
|
||||
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !pt <query> or !ptall <query> to return all results')
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['instance_url'] = self.instance_url
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("instance_url"):
|
||||
self.instance_url = data["instance_url"]
|
||||
|
||||
def help(self):
|
||||
return ('PeerTube search')
|
|
@ -1,19 +0,0 @@
|
|||
from modules.common.module import BotModule
|
||||
import urllib.request
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
day = 0
|
||||
hour = 12
|
||||
if len(args) >= 1:
|
||||
day = int(args[0]) - 1
|
||||
if len(args) == 2:
|
||||
hour = int(args[1])
|
||||
|
||||
imgurl = 'http://ennuste.ilmailuliitto.fi/' + str(day) + '/wstar_bsratio.curr.' + str(hour) + '00lst.d2.png'
|
||||
await bot.upload_and_send_image(room, imgurl, f"RASP Day {day+1} at {hour}:00", no_cache=True)
|
||||
|
||||
def help(self):
|
||||
return 'RASP Gliding Weather forecast, Finland only'
|
109
modules/relay.py
109
modules/relay.py
|
@ -1,109 +0,0 @@
|
|||
from modules.common.module import BotModule
|
||||
from nio import RoomMessageText
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.bridges = dict()
|
||||
self.bot = None
|
||||
self.enabled = False
|
||||
|
||||
async def message_cb(self, room, event):
|
||||
if self.bot.should_ignore_event(event):
|
||||
return
|
||||
|
||||
if event.body.startswith('!'):
|
||||
return
|
||||
|
||||
source_id = None
|
||||
target_id = None
|
||||
|
||||
for src_id, tgt_id in self.bridges.items():
|
||||
if room.room_id == src_id:
|
||||
source_id = src_id
|
||||
target_id = tgt_id
|
||||
elif room.room_id == tgt_id:
|
||||
source_id = tgt_id
|
||||
target_id = src_id
|
||||
|
||||
if not source_id or not target_id:
|
||||
return
|
||||
|
||||
target_room = self.bot.get_room_by_id(target_id)
|
||||
if(target_room):
|
||||
sendernick = target_room.user_name(event.sender)
|
||||
if not sendernick:
|
||||
sendernick = event.sender
|
||||
await self.bot.send_text(target_room, f'<{sendernick}> {event.body}', msgtype="m.text", bot_ignore=True)
|
||||
else:
|
||||
self.logger.warning(f"Bot doesn't seem to be in bridged room {target_id}")
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
bot.client.add_event_callback(self.message_cb, RoomMessageText)
|
||||
self.bot = bot
|
||||
|
||||
def matrix_stop(self, bot):
|
||||
super().matrix_stop(bot)
|
||||
bot.remove_callback(self.message_cb)
|
||||
self.bot = None
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
bot.must_be_admin(room, event)
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
if len(args) == 1:
|
||||
if args[0] == 'list':
|
||||
i = 1
|
||||
msg = f"Active relay bridges ({len(self.bridges)}):\n"
|
||||
for src_id, tgt_id in self.bridges.items():
|
||||
srcroom = self.bot.get_room_by_id(src_id)
|
||||
tgtroom = self.bot.get_room_by_id(tgt_id)
|
||||
|
||||
if srcroom:
|
||||
srcroom = srcroom.display_name
|
||||
else:
|
||||
srcroom = f'??? {src_id}'
|
||||
|
||||
if tgtroom:
|
||||
tgtroom = tgtroom.display_name
|
||||
else:
|
||||
tgtroom = f'??? {tgt_id}'
|
||||
|
||||
msg += f'{i}: {srcroom} <-> {tgtroom}'
|
||||
i = i + 1
|
||||
await bot.send_text(room, msg)
|
||||
|
||||
if len(args) == 2:
|
||||
if args[0] == 'bridge':
|
||||
roomid = args[1]
|
||||
room_to_bridge = bot.get_room_by_id(roomid)
|
||||
if room_to_bridge:
|
||||
await bot.send_text(room, f'Bridging {room_to_bridge.display_name} here.')
|
||||
self.bridges[room.room_id] = roomid
|
||||
bot.save_settings()
|
||||
else:
|
||||
await bot.send_text(room, f'I am not on room with id {roomid} (note: use id, not alias)!')
|
||||
elif args[0] == 'unbridge':
|
||||
idx = int(args[1]) - 1
|
||||
i = 0
|
||||
for src_id, tgt_id in self.bridges.items():
|
||||
if i == idx:
|
||||
del self.bridges[src_id]
|
||||
await bot.send_text(room, f'Unbridged {src_id} and {tgt_id}.')
|
||||
bot.save_settings()
|
||||
return
|
||||
i = i + 1
|
||||
|
||||
def help(self):
|
||||
return 'Simple relaybot between two Matrix rooms'
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data["bridges"] = self.bridges
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("bridges"):
|
||||
self.bridges = data["bridges"]
|
|
@ -1,30 +0,0 @@
|
|||
from modules.common.module import BotModule
|
||||
import d20
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
|
||||
if args[0] == 'help':
|
||||
await bot.send_text(room, self.long_help())
|
||||
else:
|
||||
try:
|
||||
result = d20.roll(' '.join(args), stringifier=d20.SimpleStringifier())
|
||||
await bot.send_text(room, str(result), event=event)
|
||||
except:
|
||||
await bot.send_text(room, 'Invalid roll syntax', event=event)
|
||||
|
||||
def help(self):
|
||||
return 'Rolls dice in XdY format'
|
||||
|
||||
def long_help(self, bot=None, event=None, **kwargs):
|
||||
text = self.help() + (
|
||||
'\n- "!roll 1d20": roll a single d20'
|
||||
'\n- "!roll 1d20+4": A skill check or attack roll'
|
||||
'\n- "!roll 1d20+1 adv": A skill check or attack roll with advantage'
|
||||
'\n- "!roll 1d20-1 dis": A skill check or attack roll with disadvantage'
|
||||
'\n- "!roll help": show this help'
|
||||
'\n'
|
||||
'\nFor more syntax help, see https://d20.readthedocs.io/en/latest/start.html#dice-syntax')
|
||||
return text
|
|
@ -1,66 +0,0 @@
|
|||
from modules.common.pollingservice import PollingService
|
||||
from urllib.request import urlopen
|
||||
import json
|
||||
import time
|
||||
|
||||
class MatrixModule(PollingService):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.accountroomid_laststatus = {}
|
||||
self.template = '{spacename} is now {open_closed}'
|
||||
self.i18n = {'open': 'open 🔓', 'closed': 'closed 🔒'}
|
||||
|
||||
async def poll_implementation(self, bot, account, roomid, send_messages):
|
||||
self.logger.debug(f'polling space api {account}.')
|
||||
spacename, is_open = MatrixModule.open_status(account)
|
||||
|
||||
open_str = self.i18n['open'] if is_open else self.i18n['closed']
|
||||
text = self.template.format(spacename=spacename, open_closed=open_str)
|
||||
self.logger.debug(text)
|
||||
|
||||
last_status = self.accountroomid_laststatus.get(account+roomid, False)
|
||||
if send_messages and last_status != is_open:
|
||||
await bot.send_text(bot.get_room_by_id(roomid), text)
|
||||
self.accountroomid_laststatus[account+roomid] = is_open
|
||||
bot.save_settings()
|
||||
|
||||
@staticmethod
|
||||
def open_status(spaceurl):
|
||||
with urlopen(spaceurl, timeout=5) as response:
|
||||
js = json.load(response)
|
||||
|
||||
return js['space'], js['state']['open']
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['laststatus'] = self.accountroomid_laststatus
|
||||
data['template'] = self.template
|
||||
data['i18n'] = self.i18n
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get('laststatus'):
|
||||
self.accountroomid_laststatus = data['laststatus']
|
||||
if data.get('template'):
|
||||
self.template = data['template']
|
||||
if data.get('i18n'):
|
||||
self.i18n = data['i18n']
|
||||
|
||||
def help(self):
|
||||
return "Notify about Space-API status changes (open or closed)."
|
||||
|
||||
def long_help(self, bot, event, **kwargs):
|
||||
text = self.help() + \
|
||||
' This is a polling service. Therefore there are additional ' + \
|
||||
'commands: list, debug, poll, clear, add URL, del URL\n' + \
|
||||
'!spaceapi add URL: to add a space-api endpoint\n' + \
|
||||
'!spaceapi list: to list the endpoint configured for this room.\n' + \
|
||||
f'I will look for changes roughly every {self.poll_interval_min} ' + \
|
||||
'minutes. Find out more about Space-API at https://spaceapi.io/.'
|
||||
if bot.is_owner(event):
|
||||
text += '\nA template and I18N can be configured via settings of ' + \
|
||||
'the module. Use "!bot export spacepi", then change the ' + \
|
||||
'settings and import again with "!bot import spacepi SETTINGS".'
|
||||
|
||||
return text
|
|
@ -1,64 +0,0 @@
|
|||
import html
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
"""
|
||||
This is a substitute for matrix' (element's?) missing user status feature.
|
||||
Save a custom (status) message for users and allows to query them.
|
||||
"""
|
||||
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.status = dict()
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
args.pop(0)
|
||||
if len(args) < 1 or args[0] == "help":
|
||||
await bot.send_text(room, self.help())
|
||||
elif args[0] == "show":
|
||||
if len(args) > 1:
|
||||
await self.send_status(bot=bot, room=room, user=args[1])
|
||||
else:
|
||||
await self.send_status(bot=bot, room=room)
|
||||
elif args[0] == "clear":
|
||||
self.status.pop(event.sender)
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, f"Cleared status of {event.sender}")
|
||||
else:
|
||||
self.status[event.sender] = " ".join(args)
|
||||
bot.save_settings()
|
||||
await self.send_status(bot=bot, room=room, user=event.sender)
|
||||
|
||||
async def send_status(self, bot, room, user=None):
|
||||
if user:
|
||||
if user in self.status:
|
||||
await bot.send_text(room, f"Status message of {user}: {self.status[user]}")
|
||||
else:
|
||||
await bot.send_text(room, f"No status known for {user}")
|
||||
else:
|
||||
await bot.send_html(room, "<b>All status messages:</b><br/><ul><li>" +
|
||||
"</li><li>".join([f"<b>{html.escape(key)}:</b> {html.escape(value)}" for key, value in self.status.items()]) +
|
||||
"</li></ul>", f"All status messages:\n{self.status}")
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data["user_status_list"] = self.status
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("user_status_list"):
|
||||
self.status = data["user_status_list"]
|
||||
|
||||
def help(self):
|
||||
return """
|
||||
Store a status message per user and display them.
|
||||
Usage:
|
||||
!status clear - clear my status
|
||||
!status show [user] - show the status of user. If no user is given, show all status messages
|
||||
!status help - show this text
|
||||
!status [status] - set your status
|
||||
"""
|
|
@ -1,23 +0,0 @@
|
|||
import urllib.request
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 2:
|
||||
icao = args[1]
|
||||
taf_url = "https://aviationweather.gov/adds/dataserver_current/httpparam?dataSource=tafs&requestType=retrieve&format=csv&hoursBeforeNow=3&timeType=issue&mostRecent=true&stationString=" + icao.upper()
|
||||
response = urllib.request.urlopen(taf_url)
|
||||
lines = response.readlines()
|
||||
if len(lines) > 6:
|
||||
taf = lines[6].decode("utf-8").split(',')[0]
|
||||
await bot.send_text(room, taf.strip())
|
||||
else:
|
||||
await bot.send_text(room, 'Cannot find taf for ' + icao)
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !taf <icao code>')
|
||||
|
||||
def help(self):
|
||||
return ('Taf data access (usage: !taf <icao code>)')
|
|
@ -1,247 +0,0 @@
|
|||
import time
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
import urllib.error
|
||||
|
||||
import aiohttp.web
|
||||
import requests
|
||||
import os
|
||||
import json
|
||||
import asyncio
|
||||
from aiohttp import web
|
||||
from future.moves.urllib.parse import urlencode
|
||||
from nio import MatrixRoom
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
import nest_asyncio
|
||||
nest_asyncio.apply()
|
||||
|
||||
rooms = dict()
|
||||
global_bot = None
|
||||
|
||||
send_entry_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def send_entry(blob, content_type, fmt_params, rooms):
|
||||
async with send_entry_lock:
|
||||
for room_id in rooms:
|
||||
room = MatrixRoom(room_id=room_id, own_user_id=os.getenv("BOT_OWNERS"),
|
||||
encrypted=rooms[room_id])
|
||||
if blob and content_type:
|
||||
await global_bot.upload_and_send_image(room, blob, text="", blob=True, blob_content_type=content_type)
|
||||
|
||||
await global_bot.send_html(room, msg_template_html.format(**fmt_params),
|
||||
msg_template_plain.format(**fmt_params))
|
||||
|
||||
|
||||
def get_image(img=None, width=1000, height=1500):
|
||||
"""
|
||||
Return image data as array.
|
||||
Array contains the image content type and image binary
|
||||
|
||||
Parameters required: img { Plex image location }
|
||||
Optional parameters: width { the image width }
|
||||
height { the image height }
|
||||
Output: array
|
||||
"""
|
||||
|
||||
pms_url = os.getenv("PLEX_MEDIA_SERVER_URL")
|
||||
pms_token = os.getenv("PLEX_MEDIA_SERVER_TOKEN")
|
||||
if not pms_url or not pms_token:
|
||||
return None
|
||||
|
||||
width = width or 1000
|
||||
height = height or 1500
|
||||
|
||||
if img:
|
||||
params = {'url': 'http://127.0.0.1:32400%s' % (img), 'width': width, 'height': height, 'format': "png"}
|
||||
|
||||
uri = pms_url + '/photo/:/transcode?%s' % urlencode(params)
|
||||
|
||||
headers = {'X-Plex-Token': pms_token}
|
||||
|
||||
session = requests.Session()
|
||||
try:
|
||||
r = session.request("GET", uri, headers=headers)
|
||||
r.raise_for_status()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
response_status = r.status_code
|
||||
response_content = r.content
|
||||
response_headers = r.headers
|
||||
if response_status in (200, 201):
|
||||
return response_content, response_headers['Content-Type']
|
||||
|
||||
|
||||
def get_from_entry(entry):
|
||||
blob = None
|
||||
content_type = ""
|
||||
if "art" in entry:
|
||||
pms_image = get_image(entry["art"], 600, 300)
|
||||
if pms_image:
|
||||
(blob, content_type) = pms_image
|
||||
|
||||
fmt_params = {
|
||||
"title": entry["title"],
|
||||
"year": entry["year"],
|
||||
"audience_rating": entry["audience_rating"],
|
||||
"directors": ", ".join(entry["directors"]),
|
||||
"actors": ", ".join(entry["actors"]),
|
||||
"summary": entry["summary"],
|
||||
"tagline": entry["tagline"],
|
||||
"genres": ", ".join(entry["genres"])
|
||||
}
|
||||
|
||||
return (blob, content_type, fmt_params)
|
||||
|
||||
|
||||
msg_template_html = """
|
||||
<b>{title} -({year})- Rating: {audience_rating}</b><br>
|
||||
Director(s): {directors}<br>
|
||||
Actors: {actors}<br>
|
||||
<I>{summary}</I><br>
|
||||
{tagline}<br>
|
||||
Genre(s): {genres}<br><br>"""
|
||||
|
||||
msg_template_plain = """*{title} -({year})- Rating: {audience_rating}*
|
||||
Director(s): {directors}
|
||||
Actors: {actors}
|
||||
{summary}
|
||||
{tagline}
|
||||
Genre(s): {genres}
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class WebServer:
|
||||
def __init__(self, host, port):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.app = web.Application()
|
||||
self.app.router.add_post('/notify', self.notify)
|
||||
|
||||
async def run(self):
|
||||
if not self.host or not self.port:
|
||||
return
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
runner = web.AppRunner(self.app)
|
||||
loop.run_until_complete(runner.setup())
|
||||
site = web.TCPSite(runner, host=self.host, port=self.port)
|
||||
loop.run_until_complete(site.start())
|
||||
|
||||
async def notify(self, request: web.Request) -> web.Response:
|
||||
try:
|
||||
data = await request.json()
|
||||
if "genres" in data:
|
||||
data["genres"] = data["genres"].split(",")
|
||||
|
||||
if "actors" in data:
|
||||
data["actors"] = data["actors"].split(",")
|
||||
|
||||
if "directors" in data:
|
||||
data["directors"] = data["directors"].split(",")
|
||||
|
||||
global rooms
|
||||
(blob, content_type, fmt_params) = get_from_entry(data)
|
||||
await send_entry(blob, content_type, fmt_params, rooms)
|
||||
|
||||
except Exception as exc:
|
||||
message = str(exc)
|
||||
return web.HTTPBadRequest(body=message)
|
||||
|
||||
return web.Response()
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
httpd = None
|
||||
rooms = dict()
|
||||
api_key = None
|
||||
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.httpd = WebServer(os.getenv("TAUTULLI_NOTIFIER_ADDR"), os.getenv("TAUTULLI_NOTIFIER_PORT"))
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
global global_bot
|
||||
global_bot = bot
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(self.httpd.run())
|
||||
|
||||
def matrix_stop(self, bot):
|
||||
super().matrix_stop(bot)
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 3 and args[1] == 'apikey':
|
||||
bot.must_be_owner(event)
|
||||
|
||||
self.api_key = args[2]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, 'Api key set')
|
||||
elif len(args) == 2:
|
||||
media_type = args[1]
|
||||
if media_type != "movie" and media_type != "show" and media_type != "artist":
|
||||
await bot.send_text(room, "media type '%s' provided not valid" % media_type)
|
||||
return
|
||||
|
||||
try:
|
||||
url = "{}/api/v2?apikey={}&cmd=get_recently_added&count=10".format(os.getenv("TAUTULLI_URL"), self.api_key)
|
||||
req = urllib.request.Request(url + "&media_type=" + media_type)
|
||||
connection = urllib.request.urlopen(req).read()
|
||||
entries = json.loads(connection)
|
||||
if "response" not in entries and "data" not in entries["response"] and "recently_added" not in entries["response"]["data"]:
|
||||
await bot.send_text(room, "no recently added for %s" % media_type)
|
||||
return
|
||||
|
||||
for entry in entries["response"]["data"]["recently_added"]:
|
||||
(blob, content_type, fmt_params) = get_from_entry(entry)
|
||||
await send_entry(blob, content_type, fmt_params, {room.room_id: room})
|
||||
|
||||
except urllib.error.HTTPError as err:
|
||||
raise ValueError(err.read())
|
||||
except Exception as exc:
|
||||
message = str(exc)
|
||||
await bot.send_text(room, message)
|
||||
elif len(args) == 4:
|
||||
if args[1] == "add" or args[1] == "remove":
|
||||
room_id = args[2]
|
||||
encrypted = args[3]
|
||||
if args[1] == "add":
|
||||
self.rooms[room_id] = encrypted == "encrypted"
|
||||
await bot.send_text(room, f"Added {room_id} to rooms notification list")
|
||||
else:
|
||||
del self.rooms[room_id]
|
||||
await bot.send_text(room, f"Removed {room_id} to rooms notification list")
|
||||
|
||||
bot.save_settings()
|
||||
global rooms
|
||||
rooms = self.rooms
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !tautulli <movie|show|artist>|<add|remove> %room_id% %encrypted%')
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !tautulli <movie|show|artist>|<add|remove> %room_id% %encrypted%')
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data["api_key"] = self.api_key
|
||||
data["rooms"] = self.rooms
|
||||
global rooms
|
||||
rooms = self.rooms
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("rooms"):
|
||||
self.rooms = data["rooms"]
|
||||
global rooms
|
||||
rooms = self.rooms
|
||||
if data.get("api_key"):
|
||||
self.api_key = data["api_key"]
|
||||
|
||||
def help(self):
|
||||
return ('Tautulli recently added bot')
|
||||
|
|
@ -1,165 +0,0 @@
|
|||
import time
|
||||
from datetime import datetime
|
||||
|
||||
from pyteamup import Calendar
|
||||
|
||||
#
|
||||
# TeamUp calendar notifications
|
||||
#
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.api_key = None
|
||||
self.calendar_rooms = dict() # Roomid -> [calid, calid..]
|
||||
self.calendars = dict() # calid -> Calendar
|
||||
self.enabled = False
|
||||
|
||||
async def matrix_poll(self, bot, pollcount):
|
||||
if self.api_key:
|
||||
if pollcount % (6 * 5) == 0: # Poll every 5 min
|
||||
await self.poll_all_calendars(bot)
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 1:
|
||||
if self.calendar_rooms.get(room.room_id):
|
||||
for calendarid in self.calendar_rooms.get(room.room_id):
|
||||
calendar = self.calendars[calendarid]
|
||||
events = calendar.get_event_collection()
|
||||
for event in events:
|
||||
s = '<b>' + str(event.start_dt.day) + \
|
||||
'.' + str(event.start_dt.month)
|
||||
if not event.all_day:
|
||||
s = s + ' ' + \
|
||||
event.start_dt.strftime(
|
||||
"%H:%M") + ' (' + str(event.duration) + ' min)'
|
||||
s = s + '</b> ' + event.title + \
|
||||
" " + (event.notes or '')
|
||||
await bot.send_html(room, s, s)
|
||||
elif len(args) == 2:
|
||||
if args[1] == 'list':
|
||||
await bot.send_text(room, f'Calendars in this room: {self.calendar_rooms.get(room.room_id) or []}')
|
||||
elif args[1] == 'poll':
|
||||
bot.must_be_owner(event)
|
||||
await self.poll_all_calendars(bot)
|
||||
elif len(args) == 3:
|
||||
if args[1] == 'add':
|
||||
bot.must_be_admin(room, event)
|
||||
|
||||
calid = args[2]
|
||||
self.logger.info(f'Adding calendar {calid} to room id {room.room_id}')
|
||||
|
||||
if self.calendar_rooms.get(room.room_id):
|
||||
if calid not in self.calendar_rooms[room.room_id]:
|
||||
self.calendar_rooms[room.room_id].append(calid)
|
||||
else:
|
||||
await bot.send_text(room, 'This teamup calendar already added in this room!')
|
||||
return
|
||||
else:
|
||||
self.calendar_rooms[room.room_id] = [calid]
|
||||
|
||||
self.logger.info(f'Calendars now for this room {self.calendar_rooms.get(room.room_id)}')
|
||||
|
||||
bot.save_settings()
|
||||
self.setup_calendars()
|
||||
await bot.send_text(room, 'Added new teamup calendar to this room')
|
||||
if args[1] == 'del':
|
||||
bot.must_be_admin(room, event)
|
||||
|
||||
calid = args[2]
|
||||
self.logger.info(f'Removing calendar {calid} from room id {room.room_id}')
|
||||
|
||||
if self.calendar_rooms.get(room.room_id):
|
||||
self.calendar_rooms[room.room_id].remove(calid)
|
||||
|
||||
self.logger.info(f'Calendars now for this room {self.calendar_rooms.get(room.room_id)}')
|
||||
|
||||
bot.save_settings()
|
||||
self.setup_calendars()
|
||||
await bot.send_text(room, 'Removed teamup calendar from this room')
|
||||
if args[1] == 'apikey':
|
||||
bot.must_be_owner(event)
|
||||
|
||||
self.api_key = args[2]
|
||||
bot.save_settings()
|
||||
self.setup_calendars()
|
||||
await bot.send_text(room, 'Api key set')
|
||||
|
||||
def help(self):
|
||||
return ('Polls teamup calendar.')
|
||||
|
||||
async def poll_all_calendars(self, bot):
|
||||
delete_rooms = []
|
||||
for roomid in self.calendar_rooms:
|
||||
if roomid in bot.client.rooms:
|
||||
calendars = self.calendar_rooms[roomid]
|
||||
for calendarid in calendars:
|
||||
events, timestamp = self.poll_server(
|
||||
self.calendars[calendarid])
|
||||
self.calendars[calendarid].timestamp = timestamp
|
||||
for event in events:
|
||||
await bot.send_text(bot.get_room_by_id(roomid), 'Calendar: ' + self.eventToString(event))
|
||||
else:
|
||||
delete_rooms.append(roomid)
|
||||
|
||||
for roomid in delete_rooms:
|
||||
self.calendar_rooms.pop(roomid, None)
|
||||
|
||||
def poll_server(self, calendar):
|
||||
events, timestamp = calendar.get_changed_events(calendar.timestamp)
|
||||
return events, timestamp
|
||||
|
||||
def to_datetime(self, dts):
|
||||
try:
|
||||
return datetime.strptime(dts, '%Y-%m-%dT%H:%M:%S')
|
||||
except ValueError:
|
||||
pos = len(dts) - 3
|
||||
dts = dts[:pos] + dts[pos + 1:]
|
||||
return datetime.strptime(dts, '%Y-%m-%dT%H:%M:%S%z')
|
||||
|
||||
def eventToString(self, event):
|
||||
startdt = self.to_datetime(event['start_dt'])
|
||||
if len(event['title']) == 0:
|
||||
event['title'] = '(empty name)'
|
||||
|
||||
if (event['delete_dt']):
|
||||
s = event['title'] + ' deleted.'
|
||||
else:
|
||||
s = event['title'] + " " + (event['notes'] or '') + \
|
||||
' ' + str(startdt.day) + '.' + str(startdt.month)
|
||||
if not event['all_day']:
|
||||
s = s + ' ' + \
|
||||
startdt.strftime("%H:%M") + \
|
||||
' (' + str(event['duration']) + ' min)'
|
||||
# todo: proper html stripper..
|
||||
s = s.replace('<p>', '')
|
||||
s = s.replace('</p>', '\n')
|
||||
return s
|
||||
|
||||
def setup_calendars(self):
|
||||
self.calendars = dict()
|
||||
if self.api_key:
|
||||
for roomid in self.calendar_rooms:
|
||||
calendars = self.calendar_rooms[roomid]
|
||||
for calid in calendars:
|
||||
self.calendars[calid] = Calendar(calid, self.api_key)
|
||||
self.calendars[calid].timestamp = int(time.time())
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['apikey'] = self.api_key
|
||||
data['calendar_rooms'] = self.calendar_rooms
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get('calendar_rooms'):
|
||||
self.calendar_rooms = data['calendar_rooms']
|
||||
if data.get('apikey'):
|
||||
self.api_key = data['apikey']
|
||||
if self.api_key and len(self.api_key) == 0:
|
||||
self.api_key = None
|
||||
self.setup_calendars()
|
134
modules/wa.py
134
modules/wa.py
|
@ -1,134 +0,0 @@
|
|||
import urllib.request
|
||||
import wolframalpha
|
||||
from html import escape
|
||||
import json
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
app_id = ''
|
||||
|
||||
def matrix_start(self, bot):
|
||||
super().matrix_start(bot)
|
||||
self.add_module_aliases(bot, ['wafull'])
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
if len(args) == 3:
|
||||
if args[1] == "appid":
|
||||
bot.must_be_owner(event)
|
||||
self.app_id = args[2]
|
||||
bot.save_settings()
|
||||
await bot.send_text(room, 'App id set')
|
||||
return
|
||||
|
||||
if len(args) > 1:
|
||||
if self.app_id == '':
|
||||
await bot.send_text(room, 'Please get and set a appid: https://products.wolframalpha.com/simple-api/documentation/')
|
||||
return
|
||||
|
||||
query = event.body[len(args[0])+1:]
|
||||
client = wolframalpha.Client(self.app_id)
|
||||
res = client.query(query)
|
||||
result = "?SYNTAX ERROR"
|
||||
if res['@success']:
|
||||
self.logger.debug(f"room: {room.name} sender: {event.sender} sent a valid query to wa")
|
||||
else:
|
||||
self.logger.info(f"wa error: {res['@error']}")
|
||||
short, full = self.parse_api_response(res)
|
||||
if full[0] and 'full' in args[0]:
|
||||
html, plain = full
|
||||
elif short[0]:
|
||||
html, plain = short
|
||||
else:
|
||||
plain = 'Could not find response for ' + query
|
||||
html = plain
|
||||
await bot.send_html(room, html, plain)
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !wa <query>')
|
||||
|
||||
def get_settings(self):
|
||||
data = super().get_settings()
|
||||
data['app_id'] = self.app_id
|
||||
return data
|
||||
|
||||
def set_settings(self, data):
|
||||
super().set_settings(data)
|
||||
if data.get("app_id"):
|
||||
self.app_id = data["app_id"]
|
||||
|
||||
def parse_api_response(self, res):
|
||||
"""Parses the pods from wa and prepares texts to send to matrix
|
||||
|
||||
:param res: the result from wolframalpha.Client
|
||||
:type res: dict
|
||||
:return: a tuple of tuples: ((primary_html, primary_plaintext), (full_html, full_plaintext))
|
||||
:rtype: tuple
|
||||
"""
|
||||
htmls = []
|
||||
texts = []
|
||||
primary = None
|
||||
fallback = None
|
||||
|
||||
pods = res.get('pod')
|
||||
if not pods:
|
||||
return (('<em>(data not available)</em>', '(data not available)'), ) * 2
|
||||
|
||||
# workaround for bug(?) in upstream wa package
|
||||
if hasattr(pods, 'get'):
|
||||
pods = [pods]
|
||||
for pod in res['pod']:
|
||||
pod_htmls = []
|
||||
pod_texts = []
|
||||
spods = pod.get('subpod')
|
||||
if not spods:
|
||||
continue
|
||||
|
||||
# workaround for bug(?) in upstream wa package
|
||||
if hasattr(spods, 'get'):
|
||||
spods = [spods]
|
||||
for spod in spods:
|
||||
title = spod.get('@title')
|
||||
text = spod.get('plaintext')
|
||||
if not text:
|
||||
continue
|
||||
|
||||
if title:
|
||||
html = f'<strong>{escape(title)}</strong>: {escape(text)}'
|
||||
text = f'{title}: {text}'
|
||||
else:
|
||||
html = escape(text)
|
||||
pod_htmls += html.split('\n')
|
||||
pod_texts += text.split('\n')
|
||||
|
||||
if pod_texts:
|
||||
title = pod.get('@title')
|
||||
pod_html = '\n'.join([f'<p><strong>{escape(title)}</strong>\n<ul>']
|
||||
+ [f'<li>{s}</li>' for s in pod_htmls]
|
||||
+ ['</ul></p>'])
|
||||
pod_text = '\n'.join([title]
|
||||
+ [f'- {s}' for s in pod_texts])
|
||||
htmls.append(pod_html)
|
||||
texts.append(pod_text)
|
||||
if not primary and self.is_primary(pod):
|
||||
primary = (f'<strong>{escape(title)}</strong>: ' + ' | '.join(pod_htmls),
|
||||
f'{title}: ' + ' | '.join(pod_texts))
|
||||
else:
|
||||
fallback = fallback or (' | '.join(pod_htmls), ' | '.join(pod_texts))
|
||||
|
||||
return (primary or fallback, ('\n'.join(htmls), '\n'.join(texts)))
|
||||
|
||||
def is_primary(self, pod):
|
||||
return pod.get('@primary') or 'Definition' in pod.get('@title') or 'Result' in pod.get('@title')
|
||||
|
||||
def help(self):
|
||||
return ('Wolfram Alpha search')
|
||||
|
||||
def long_help(self, bot=None, event=None, **kwargs):
|
||||
text = self.help() + (
|
||||
'\n- "!wa [query]": Query WolframAlpha and return the primary pod'
|
||||
'\n- "!wafull [query]": Query WolframAlpha and return all pods'
|
||||
)
|
||||
if bot and event and bot.is_owner(event):
|
||||
text += '\n- "!wa appid [appid]": Set appid'
|
||||
return text
|
|
@ -1,79 +0,0 @@
|
|||
import re
|
||||
|
||||
import requests
|
||||
|
||||
from modules.common.module import BotModule
|
||||
|
||||
|
||||
# This module searches wikipedia for query, returns page summary and link.
|
||||
class MatrixModule(BotModule):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.api_url = 'https://en.wikipedia.org/w/api.php'
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
args = event.body.split()
|
||||
|
||||
if len(args) > 1:
|
||||
query = event.body[len(args[0]) + 1:]
|
||||
try:
|
||||
response = requests.get(self.api_url, params={
|
||||
'action': 'query',
|
||||
'format': 'json',
|
||||
'exintro': True,
|
||||
'explaintext': True,
|
||||
'prop': 'extracts',
|
||||
'redirects': 1,
|
||||
'titles': query,
|
||||
})
|
||||
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
# Get the page id
|
||||
page_id = list(data['query']['pages'].keys())[0]
|
||||
|
||||
if page_id == '-1':
|
||||
await bot.send_text(room, 'No results found')
|
||||
return
|
||||
|
||||
# Get the page title
|
||||
title = data['query']['pages'][page_id]['title']
|
||||
|
||||
# Get the page summary
|
||||
summary = data['query']['pages'][page_id]['extract']
|
||||
|
||||
# Remove all html tags
|
||||
extract = re.sub('<[^<]+?>', '', summary)
|
||||
# Remove any multiple spaces
|
||||
extract = re.sub(' +', ' ', extract)
|
||||
# Remove any new lines
|
||||
extract = re.sub('', '', extract)
|
||||
# Remove any tabs
|
||||
extract = re.sub('\t', '', extract)
|
||||
|
||||
# Truncate the extract, Element URL preview contains nonsense Wikipedia meta content
|
||||
if len(extract) <= 256:
|
||||
pass
|
||||
else:
|
||||
extract = ' '.join(extract[:256 + 1].split(' ')[0:-1]) + '...'
|
||||
|
||||
# Get the page url
|
||||
url = f'https://en.wikipedia.org/wiki/{title}'
|
||||
|
||||
# Convert all spaces to underscores in url
|
||||
url = re.sub(r'\s', '_', url)
|
||||
|
||||
# Format the response
|
||||
response = f'{title}: {extract} \n{url}'
|
||||
|
||||
# Send the response
|
||||
await bot.send_text(room, response)
|
||||
return
|
||||
except Exception as exc:
|
||||
await bot.send_text(room, str(exc))
|
||||
else:
|
||||
await bot.send_text(room, 'Usage: !wikipedia <query>')
|
||||
|
||||
def help(self):
|
||||
return ('Wikipedia bot')
|
|
@ -1,92 +0,0 @@
|
|||
import re
|
||||
import html
|
||||
|
||||
import requests
|
||||
|
||||
from modules.common.module import BotModule
|
||||
from modules.common.exceptions import UploadFailed
|
||||
|
||||
|
||||
class Xkcd:
|
||||
"""
|
||||
Uses the XKCD (json) api https://xkcd.com/json.html to fetch web comics and metadata and display them in chats.
|
||||
"""
|
||||
def __init__(self, title, img, alt, num):
|
||||
self.title = title
|
||||
self.img = img
|
||||
self.alt = alt
|
||||
self.num = num
|
||||
|
||||
@staticmethod
|
||||
def create_from_json(json):
|
||||
return Xkcd(json.get("title"), json.get("img"), json.get("alt"), json.get("num"))
|
||||
|
||||
def __str__(self):
|
||||
return "title: {} || explanation: {} || date: {} || original-url: {}".format(self.title,
|
||||
self.explanation,
|
||||
self.date,
|
||||
self.hdurl)
|
||||
|
||||
|
||||
class MatrixModule(BotModule):
|
||||
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.uri_get_latest = 'https://xkcd.com/info.0.json'
|
||||
|
||||
async def matrix_message(self, bot, room, event):
|
||||
self.logger.debug(f"room: {room.name} sender: {event.sender} queried the xkcd module with body: {event.body}")
|
||||
|
||||
args = event.body.split()
|
||||
|
||||
if len(args) == 1:
|
||||
await self.send_xkcd(bot, room, self.uri_get_latest)
|
||||
elif len(args) == 2:
|
||||
if args[1] == "help":
|
||||
await self.command_help(bot, room)
|
||||
else:
|
||||
xkcd_id = args[1]
|
||||
if re.match("\\d+", xkcd_id) is not None:
|
||||
await self.send_xkcd(bot, room, self.uri_get_by_id(xkcd_id))
|
||||
else:
|
||||
await bot.send_text(room, "Invalid comic id. ids must be positive integers.")
|
||||
|
||||
async def send_xkcd(self, bot, room, uri):
|
||||
self.logger.debug(f"send request using uri {uri}")
|
||||
response = requests.get(uri)
|
||||
|
||||
if response.status_code != 200:
|
||||
self.logger.error("unable to request api. response: [status: %d text: %s]", response.status_code, response.text)
|
||||
return await bot.send_text(room, "sorry. something went wrong accessing the api")
|
||||
|
||||
xkcd = Xkcd.create_from_json(response.json())
|
||||
self.logger.debug(xkcd)
|
||||
|
||||
img_url = xkcd.img
|
||||
try:
|
||||
matrix_uri = None
|
||||
matrix_uri, mimetype, w, h, size = bot.get_uri_cache(img_url)
|
||||
except (TypeError, ValueError):
|
||||
self.logger.debug(f"Not found in cache: {img_url}")
|
||||
try:
|
||||
matrix_uri, mimetype, w, h, size = await bot.upload_image(img_url)
|
||||
except (UploadFailed, TypeError, ValueError):
|
||||
await bot.send_text(room, f"Something went wrong uploading {img_url}.")
|
||||
|
||||
await bot.send_html(room, f"<b>{html.escape(xkcd.title)} ({html.escape(str(xkcd.num))})</b>", f"{xkcd.title} ({str(xkcd.num)})")
|
||||
await bot.send_image(room, matrix_uri, img_url, None, mimetype, w, h, size)
|
||||
await bot.send_text(room, f"{xkcd.alt}")
|
||||
|
||||
def uri_get_by_id(self, id):
|
||||
return 'https://xkcd.com/' + str(int(id)) + '/info.0.json'
|
||||
|
||||
def help(self):
|
||||
return 'Sends latest/any specified XCKD web comic to the room. See https://xkcd.com/ '
|
||||
|
||||
async def command_help(self, bot, room):
|
||||
msg = """commands:
|
||||
- no arguments: fetch latest xkcd comic
|
||||
- (\\d+) fetch and display the xkcd comic with the given id
|
||||
- help - show command help
|
||||
"""
|
||||
await bot.send_text(room, msg)
|
Loading…
Reference in New Issue