Merge branch 'restructuring' into master

This commit is contained in:
JuliusFreudenberger 2020-09-24 11:17:24 +02:00
commit 0ad11a4a20

169
bot.py
View file

@ -1,8 +1,5 @@
import logging
import re
import requests
from telegram import InlineQueryResultArticle, InputTextMessageContent, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Updater, CommandHandler, InlineQueryHandler, CallbackQueryHandler
from exceptions import *
@ -18,170 +15,6 @@ dispatcher = updater.dispatcher
logging.basicConfig(format='%(acstime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
sent_multiple_station_message_ids = dict()
class Query:
station_id = -1
departure_count = 4
line = ''
destination = ""
def __init__(self, args):
request_tmp = ' '.join(args)
argument_names = re.findall(r' to | in | times ', request_tmp)
arguments = re.split(r' to | in | times ', request_tmp)
if not arguments[0].isdigit():
reply = search_vvs_station(arguments[0])
if len(reply) == 1:
self.station_id = reply[0]['stationId']
else:
raise MultipleStationsFoundError(request_tmp, arguments[0], reply)
else:
self.station_id = arguments[0]
if ' to ' in argument_names:
self.destination = arguments[argument_names.index(' to ') + 1]
if ' in ' in argument_names:
self.line = arguments[argument_names.index(' in ') + 1]
if ' times ' in argument_names:
self.departure_count = int(arguments[argument_names.index(' times ') + 1])
def build_menu(buttons,
n_cols,
header_buttons=None,
footer_buttons=None):
menu = [buttons[i:i + n_cols] for i in range(0, len(buttons), n_cols)]
if header_buttons:
menu.insert(0, [header_buttons])
if footer_buttons:
menu.append([footer_buttons])
return menu
def reply_multiple_stations(message, message_text, queried_station, station_list):
button_list = []
for station in station_list:
button_list.append(InlineKeyboardButton(station['fullName'],
callback_data="/vvs " + message_text
.replace(queried_station, station['stationId'])))
reply_markup = InlineKeyboardMarkup(build_menu(button_list, n_cols=2))
sent_multiple_station_message_ids[message.chat_id] = message.reply_text("Multiple stations found:",
reply_markup=reply_markup).message_id
def handle_multiple_stations_reply(update, context):
query = parse_station(update.callback_query.data.split(' ')[1:])
departures = get_vvs_departures(query)
for reply in departures:
context.bot.send_message(update.effective_chat['id'], reply)
context.bot.delete_message(chat_id=update.effective_chat['id'],
message_id=sent_multiple_station_message_ids[update.effective_chat['id']])
del sent_multiple_station_message_ids[update.effective_chat['id']]
def search_vvs_station(query):
request = requests.get("https://efa-api.asw.io/api/v1/station/?search=" + query)
if request.status_code != 200:
raise ServerCommunicationError
if request.status_code == 200 and len(request.text) <= 2:
raise StationNotFoundError
else:
return request.json()
def handle_vvs(update, context):
try:
query = parse_station(context.args)
except MultipleStationsFoundError as error:
reply_multiple_stations(update.message, error.message_text, error.queried_station, error.station_list)
return
departures = get_vvs_departures(query)
for reply in departures:
update.message.reply_text(reply, disable_notification=True)
def parse_station(args):
if len(args) == 0:
raise NoArgError
try:
query = Query(args)
except StationNotFoundError:
raise
except MultipleStationsFoundError:
raise
if query.station_id == -1:
raise StationNotFoundError
return query
def get_vvs_departures(query):
reply = []
request = requests.get("https://efa-api.asw.io/api/v1/station/" + query.station_id + "/departures")
if request.status_code != 200:
raise ServerCommunicationError
reply.append("Next departures for station: " + request.json()[0]['stopName'])
printed_departures = 0
for station in request.json():
if station['direction'].casefold().find(query.destination.casefold()) != -1:
if query.line == '' or (query.line != -1 and station['number'] == query.line):
reply.append(
"Line " + station['number'] + " to \"" + station['direction'] + "\" at "
+ station['departureTime']['hour'].zfill(2) + ':' + station['departureTime']['minute'].zfill(2)
+ " (+" + str(station['delay']) + ")")
printed_departures += 1
if printed_departures >= query.departure_count:
break
return reply
def inline_station_search(update, context):
query = update.inline_query.query
if len(query) < 5 or not query:
return
results = list()
for station in get_station_id_list(query):
results.append(
InlineQueryResultArticle(
id=station['stationId'],
title=station['name'],
input_message_content=InputTextMessageContent("/vvs " + station['stationId'])
)
)
context.bot.answer_inline_query(update.inline_query.id, results)
def get_station_id_list(name):
request = requests.get("https://efa-api.asw.io/api/v1/station/?search=" + name)
return request.json()
def search_weather_station(query: str):
station_file = open('stations', 'r')
for line in station_file:
if line.casefold().find(query.casefold()) != -1:
station_file.close()
return line
return 'none'
def error_callback(update, context):
try:
raise context.error
except NoArgError:
update.message.reply_text('No argument specified!')
return
except StationNotFoundError:
update.message.reply_text('No station matching this name found!')
return
except WeatherStationNotFoundError:
update.message.reply_text('No weather station matching this name found!')
except ServerCommunicationError:
update.message.reply_text('Error with server communication')
return
def __main__():
inline_station_search_handler = InlineQueryHandler(inline_station_search)
@ -194,7 +27,7 @@ def __main__():
dispatcher.add_error_handler(error_callback)
dispatcher.add_handler(CallbackQueryHandler(handle_multiple_stations_reply))
dispatcher.add_handler(CallbackQueryHandler(handle_multiple_stations_reply, pattern="^\/vvs"))
updater.start_polling()
updater.idle()