Add marudor api for departures for stations in whole germany

Succeeds vvs command which uses an api which is no longer reachable
This commit is contained in:
JuliusFreudenberger 2021-02-12 20:38:08 +01:00
parent d734f355f7
commit 47b438bd0b
3 changed files with 82 additions and 141 deletions

12
bot.py
View file

@ -5,11 +5,11 @@ import time
import schedule import schedule
from telegram import InlineKeyboardMarkup, ParseMode from telegram import InlineKeyboardMarkup, ParseMode
from telegram.error import BadRequest from telegram.error import BadRequest
from telegram.ext import Updater, CommandHandler, InlineQueryHandler, CallbackQueryHandler from telegram.ext import Updater, CommandHandler
from exceptions import * from exceptions import *
from marudor_departures import handle_marudor_departures
from push_information import __init__ as push_init from push_information import __init__ as push_init
from vvs import inline_station_search, handle_vvs, handle_multiple_stations_reply
from weather_meteomedia import handle_meteomedia from weather_meteomedia import handle_meteomedia
from weather_openweathermap import __init__ as openweathermap_init from weather_openweathermap import __init__ as openweathermap_init
from weather_openweathermap import handle_weather from weather_openweathermap import handle_weather
@ -53,19 +53,15 @@ class ScheduleThread(threading.Thread):
def __main__(): def __main__():
inline_station_search_handler = InlineQueryHandler(inline_station_search)
dispatcher.add_handler(inline_station_search_handler)
dispatcher.add_handler(CommandHandler('vvs', handle_vvs))
dispatcher.add_handler(CommandHandler('meteomedia', handle_meteomedia)) dispatcher.add_handler(CommandHandler('meteomedia', handle_meteomedia))
dispatcher.add_handler(CommandHandler("weather", handle_weather)) dispatcher.add_handler(CommandHandler('weather', handle_weather))
dispatcher.add_handler(CommandHandler('departure', handle_marudor_departures))
openweathermap_init() openweathermap_init()
dispatcher.add_handler(push_init()) dispatcher.add_handler(push_init())
dispatcher.add_error_handler(error_callback) dispatcher.add_error_handler(error_callback)
dispatcher.add_handler(CallbackQueryHandler(handle_multiple_stations_reply, pattern="^\/vvs"))
continuous_thread = ScheduleThread() continuous_thread = ScheduleThread()
continuous_thread.start() continuous_thread.start()

78
marudor_departures.py Normal file
View file

@ -0,0 +1,78 @@
import re
from datetime import timezone, timedelta
import requests
from dateutil.parser import parse
from exceptions import ServerCommunicationError, StationNotFoundError
parse_regex = r' to | times | in '
def handle_marudor_departures(update, context):
command = ' '.join(context.args)
arguments = parse_arguments(command)
eva_id = get_eva_id_for_station_name(arguments['station'])
departures = get_departures(eva_id)
if ' in ' in arguments:
departures = filter_line_names(departures, arguments[' in '])
if ' to ' in arguments:
departures = filter_destination(departures, arguments[' to '])
departure_count = 4
if ' times ' in arguments:
departure_count = int(arguments[' times '])
departures = departures[:departure_count]
for reply in build_reply(departures):
update.message.reply_text(reply)
def parse_arguments(command: str) -> dict:
included_commands = re.findall(parse_regex, command)
parameters = re.split(parse_regex, command)
return dict(zip(['station'] + included_commands, parameters))
def get_eva_id_for_station_name(station_name: str) -> str:
request = requests.get(f'https://marudor.de/api/hafas/v1/station/{station_name}')
if request.status_code != 200:
raise ServerCommunicationError
response = request.json()
if len(response) == 0 or 'id' not in response[0]:
raise StationNotFoundError
return response[0]['id']
def get_departures(eva_id: str) -> list:
request = requests.get(f'https://marudor.de/api/hafas/experimental/irisCompatibleAbfahrten/{eva_id}?lookbehind=1')
if request.status_code != 200:
raise ServerCommunicationError
response = request.json()
return response['departures']
def filter_destination(departures: list, destination: str) -> list:
departures[:] = [departure for departure in departures if
destination.casefold() in departure['destination'].casefold() or destination.casefold() in departure[
'scheduledDestination'].casefold()]
return departures
def filter_line_names(departures: list, line_name: str) -> list:
departures[:] = [departure for departure in departures if
line_name in departure['train']['name']]
return departures
def build_reply(departures: list) -> list:
if len(departures) == 0:
return ['No departures found!']
reply = [f'Next departures for station: {departures[0]["currentStation"]["title"]}']
for departure in departures:
reply_text = f'{departure["train"]["name"]} to "{departure["destination"]}" at ' \
f'{parse(departure["departure"]["time"]).astimezone(timezone(timedelta(hours=1))).strftime("%H:%M")}'
if 'delay' in departure['departure']:
reply_text += f' (+{str(departure["departure"]["delay"])})'
reply.append(reply_text)
return reply

133
vvs.py
View file

@ -1,133 +0,0 @@
import re
import requests
from telegram import InlineQueryResultArticle, InputTextMessageContent, InlineKeyboardButton, InlineKeyboardMarkup
from exceptions import MultipleStationsFoundError, NoArgError, ServerCommunicationError, StationNotFoundError
from telegram_helpers import build_menu
sent_multiple_station_message_ids = dict()
class Query:
station_id = -1
departure_count = 4
line = ''
destination = ""
def __init__(self, args):
request_tmp = ' '.join(args)
argument_names = re.findall(r' to | in | times ', request_tmp)
arguments = re.split(r' to | in | times ', request_tmp)
if not arguments[0].isdigit():
reply = search_vvs_station(arguments[0])
if len(reply) == 1:
self.station_id = reply[0]['stationId']
else:
raise MultipleStationsFoundError(request_tmp, arguments[0], reply)
else:
self.station_id = arguments[0]
if ' to ' in argument_names:
self.destination = arguments[argument_names.index(' to ') + 1]
if ' in ' in argument_names:
self.line = arguments[argument_names.index(' in ') + 1]
if ' times ' in argument_names:
self.departure_count = int(arguments[argument_names.index(' times ') + 1])
def reply_multiple_stations(message, message_text, queried_station, station_list):
button_list = []
for station in station_list:
button_list.append(InlineKeyboardButton(station['fullName'],
callback_data="/vvs " + message_text
.replace(queried_station, station['stationId'])))
reply_markup = InlineKeyboardMarkup(build_menu(button_list, n_cols=2))
sent_multiple_station_message_ids[message.chat_id] = message.reply_text("Multiple stations found:",
reply_markup=reply_markup).message_id
def handle_multiple_stations_reply(update, context):
query = parse_station(update.callback_query.data.split(' ')[1:])
departures = get_vvs_departures(query)
for reply in departures:
context.bot.send_message(update.effective_chat['id'], reply)
context.bot.delete_message(chat_id=update.effective_chat['id'],
message_id=sent_multiple_station_message_ids[update.effective_chat['id']])
del sent_multiple_station_message_ids[update.effective_chat['id']]
def search_vvs_station(query):
request = requests.get("https://efa-api.asw.io/api/v1/station/?search=" + query)
if request.status_code != 200:
raise ServerCommunicationError
if request.status_code == 200 and len(request.text) <= 2:
raise StationNotFoundError
else:
return request.json()
def handle_vvs(update, context):
try:
query = parse_station(context.args)
except MultipleStationsFoundError as error:
reply_multiple_stations(update.message, error.message_text, error.queried_station, error.station_list)
return
departures = get_vvs_departures(query)
for reply in departures:
update.message.reply_text(reply, disable_notification=True)
def parse_station(args):
if len(args) == 0:
raise NoArgError
try:
query = Query(args)
except StationNotFoundError:
raise
except MultipleStationsFoundError:
raise
if query.station_id == -1:
raise StationNotFoundError
return query
def get_vvs_departures(query):
reply = []
request = requests.get("https://efa-api.asw.io/api/v1/station/" + query.station_id + "/departures")
if request.status_code != 200:
raise ServerCommunicationError
reply.append("Next departures for station: " + request.json()[0]['stopName'])
printed_departures = 0
for station in request.json():
if station['direction'].casefold().find(query.destination.casefold()) != -1:
if query.line == '' or (query.line != -1 and station['number'] == query.line):
reply.append(
"Line " + station['number'] + " to \"" + station['direction'] + "\" at "
+ station['departureTime']['hour'].zfill(2) + ':' + station['departureTime']['minute'].zfill(2)
+ " (+" + str(station['delay']) + ")")
printed_departures += 1
if printed_departures >= query.departure_count:
break
return reply
def inline_station_search(update, context):
query = update.inline_query.query
if len(query) < 5 or not query:
return
results = list()
for station in get_station_id_list(query):
results.append(
InlineQueryResultArticle(
id=station['stationId'],
title=station['name'],
input_message_content=InputTextMessageContent("/vvs " + station['stationId'])
)
)
context.bot.answer_inline_query(update.inline_query.id, results)
def get_station_id_list(name):
request = requests.get("https://efa-api.asw.io/api/v1/station/?search=" + name)
return request.json()