telegram-bot/bot.py

204 lines
7.2 KiB
Python
Raw Normal View History

import logging
import re
import requests
from telegram import InlineQueryResultArticle, InputTextMessageContent, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Updater, CommandHandler, InlineQueryHandler, CallbackQueryHandler
from exceptions import *
from vvs import inline_station_search, handle_vvs, handle_multiple_stations_reply
from weather_meteomedia import handle_meteomedia
from weather_openweathermap import handle_weather, __init__
token_file = open("token", "r")
updater = Updater(token=token_file.read(), use_context=True)
token_file.close()
dispatcher = updater.dispatcher
logging.basicConfig(format='%(acstime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
sent_multiple_station_message_ids = dict()
class Query:
station_id = -1
departure_count = 4
line = ''
destination = ""
def __init__(self, args):
request_tmp = ' '.join(args)
argument_names = re.findall(r' to | in | times ', request_tmp)
arguments = re.split(r' to | in | times ', request_tmp)
if not arguments[0].isdigit():
reply = search_vvs_station(arguments[0])
if len(reply) == 1:
self.station_id = reply[0]['stationId']
else:
raise MultipleStationsFoundError(request_tmp, arguments[0], reply)
else:
self.station_id = arguments[0]
if ' to ' in argument_names:
self.destination = arguments[argument_names.index(' to ') + 1]
if ' in ' in argument_names:
self.line = arguments[argument_names.index(' in ') + 1]
if ' times ' in argument_names:
self.departure_count = int(arguments[argument_names.index(' times ') + 1])
def build_menu(buttons,
n_cols,
header_buttons=None,
footer_buttons=None):
menu = [buttons[i:i + n_cols] for i in range(0, len(buttons), n_cols)]
if header_buttons:
menu.insert(0, [header_buttons])
if footer_buttons:
menu.append([footer_buttons])
return menu
def reply_multiple_stations(message, message_text, queried_station, station_list):
button_list = []
for station in station_list:
button_list.append(InlineKeyboardButton(station['fullName'],
callback_data="/vvs " + message_text
.replace(queried_station, station['stationId'])))
reply_markup = InlineKeyboardMarkup(build_menu(button_list, n_cols=2))
sent_multiple_station_message_ids[message.chat_id] = message.reply_text("Multiple stations found:",
reply_markup=reply_markup).message_id
def handle_multiple_stations_reply(update, context):
query = parse_station(update.callback_query.data.split(' ')[1:])
departures = get_vvs_departures(query)
for reply in departures:
context.bot.send_message(update.effective_chat['id'], reply)
context.bot.delete_message(chat_id=update.effective_chat['id'],
message_id=sent_multiple_station_message_ids[update.effective_chat['id']])
del sent_multiple_station_message_ids[update.effective_chat['id']]
def search_vvs_station(query):
request = requests.get("https://efa-api.asw.io/api/v1/station/?search=" + query)
if request.status_code != 200:
raise ServerCommunicationError
if request.status_code == 200 and len(request.text) <= 2:
raise StationNotFoundError
else:
return request.json()
def handle_vvs(update, context):
try:
query = parse_station(context.args)
except MultipleStationsFoundError as error:
reply_multiple_stations(update.message, error.message_text, error.queried_station, error.station_list)
return
departures = get_vvs_departures(query)
for reply in departures:
update.message.reply_text(reply, disable_notification=True)
def parse_station(args):
if len(args) == 0:
raise NoArgError
try:
query = Query(args)
except StationNotFoundError:
raise
except MultipleStationsFoundError:
raise
if query.station_id == -1:
raise StationNotFoundError
return query
def get_vvs_departures(query):
reply = []
request = requests.get("https://efa-api.asw.io/api/v1/station/" + query.station_id + "/departures")
if request.status_code != 200:
raise ServerCommunicationError
reply.append("Next departures for station: " + request.json()[0]['stopName'])
printed_departures = 0
for station in request.json():
if station['direction'].casefold().find(query.destination.casefold()) != -1:
if query.line == '' or (query.line != -1 and station['number'] == query.line):
reply.append(
"Line " + station['number'] + " to \"" + station['direction'] + "\" at "
+ station['departureTime']['hour'].zfill(2) + ':' + station['departureTime']['minute'].zfill(2)
+ " (+" + str(station['delay']) + ")")
printed_departures += 1
if printed_departures >= query.departure_count:
break
return reply
def inline_station_search(update, context):
query = update.inline_query.query
if len(query) < 5 or not query:
return
results = list()
for station in get_station_id_list(query):
results.append(
InlineQueryResultArticle(
id=station['stationId'],
title=station['name'],
input_message_content=InputTextMessageContent("/vvs " + station['stationId'])
)
)
context.bot.answer_inline_query(update.inline_query.id, results)
def get_station_id_list(name):
request = requests.get("https://efa-api.asw.io/api/v1/station/?search=" + name)
return request.json()
def search_weather_station(query: str):
station_file = open('stations', 'r')
for line in station_file:
if line.casefold().find(query.casefold()) != -1:
station_file.close()
return line
return 'none'
def error_callback(update, context):
try:
raise context.error
except NoArgError:
update.message.reply_text('No argument specified!')
return
except StationNotFoundError:
update.message.reply_text('No station matching this name found!')
return
except WeatherStationNotFoundError:
update.message.reply_text('No weather station matching this name found!')
except ServerCommunicationError:
update.message.reply_text('Error with server communication')
return
def __main__():
inline_station_search_handler = InlineQueryHandler(inline_station_search)
dispatcher.add_handler(inline_station_search_handler)
dispatcher.add_handler(CommandHandler('vvs', handle_vvs))
dispatcher.add_handler(CommandHandler('meteomedia', handle_meteomedia))
dispatcher.add_handler(CommandHandler("weather", handle_weather))
__init__()
dispatcher.add_error_handler(error_callback)
dispatcher.add_handler(CallbackQueryHandler(handle_multiple_stations_reply))
updater.start_polling()
updater.idle()
if __name__ == "__main__":
__main__()