import logging import requests import re from telegram import InlineQueryResultArticle, InputTextMessageContent, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Updater, CommandHandler, InlineQueryHandler, CallbackQueryHandler from exceptions import * token_file = open("token", "r") updater = Updater(token=token_file.read(), use_context=True) token_file.close() dispatcher = updater.dispatcher logging.basicConfig(format='%(acstime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) class Query: station_id = -1 departure_count = 4 line = '' destination = "" def __init__(self, args): request_tmp = ' '.join(args) argument_names = re.findall(r' to | in | times ', request_tmp) arguments = re.split(r' to | in | times ', request_tmp) if not arguments[0].isdigit(): reply = search_station(arguments[0]) if len(reply) == 1: self.station_id = reply[0]['stationId'] else: raise MultipleStationsFoundError(request_tmp, arguments[0], reply) else: self.station_id = arguments[0] if ' to ' in argument_names: self.destination = arguments[argument_names.index(' to ') + 1] if ' in ' in argument_names: self.line = arguments[argument_names.index(' in ') + 1] if ' times ' in argument_names: self.departure_count = int(arguments[argument_names.index(' times ') + 1]) def build_menu(buttons, n_cols, header_buttons=None, footer_buttons=None): menu = [buttons[i:i + n_cols] for i in range(0, len(buttons), n_cols)] if header_buttons: menu.insert(0, [header_buttons]) if footer_buttons: menu.append([footer_buttons]) return menu def reply_multiple_stations(message, message_text, queried_station, station_list): button_list = [] for station in station_list: button_list.append(InlineKeyboardButton(station['fullName'], callback_data="/vvs " + message_text .replace(queried_station, station['stationId']))) reply_markup = InlineKeyboardMarkup(build_menu(button_list, n_cols=2)) message.reply_text("Multiple stations found:", reply_markup=reply_markup) def handle_multiple_stations_reply(update, context): query = parse_station(update.callback_query.data.split(' ')[1:]) departures = get_vvs_departures(query) for reply in departures: context.bot.send_message(update.effective_chat['id'], reply) def search_station(query): request = requests.get("https://efa-api.asw.io/api/v1/station/?search=" + query) if request.status_code != 200: raise ServerCommunicationError if request.status_code == 200 and len(request.text) <= 2: raise StationNotFoundError else: return request.json() def handle_vvs(update, context): try: query = parse_station(context.args) except MultipleStationsFoundError as error: reply_multiple_stations(update.message, error.message_text, error.queried_station, error.station_list) return departures = get_vvs_departures(query) for reply in departures: update.message.reply_text(reply) def parse_station(args): if len(args) == 0: raise NoArgError try: query = Query(args) except StationNotFoundError: raise except MultipleStationsFoundError: raise if query.station_id == -1: raise StationNotFoundError return query def get_vvs_departures(query): reply = [] request = requests.get("https://efa-api.asw.io/api/v1/station/" + query.station_id + "/departures") if request.status_code != 200: raise ServerCommunicationError reply.append("Next departures:") printed_departures = 0 for station in request.json(): if station['direction'].casefold().find(query.destination) != -1 or station['direction'].find( query.destination) != -1: if query.line == '' or (query.line != -1 and station['number'] == query.line): reply.append( "Line " + station['number'] + " to \"" + station['direction'] + "\" at " + station['departureTime']['hour'].zfill(2) + ':' + station['departureTime']['minute'].zfill(2) + " +" + str(station['delay'])) printed_departures += 1 if printed_departures >= query.departure_count: break return reply def inline_station_search(update, context): query = update.inline_query.query if len(query) < 5 or not query: return results = list() for station in get_station_id_list(query): results.append( InlineQueryResultArticle( id=station['stationId'], title=station['name'], input_message_content=InputTextMessageContent("/vvs " + station['stationId']) ) ) context.bot.answer_inline_query(update.inline_query.id, results) def get_station_id_list(name): request = requests.get("https://efa-api.asw.io/api/v1/station/?search=" + name) return request.json() def error_callback(update, context): try: raise context.error except NoArgError: update.message.reply_text('No argument specified!') return except StationNotFoundError: update.message.reply_text('No station matching this name found!') return except ServerCommunicationError: update.message.reply_text('Error with server communication') return inline_station_search_handler = InlineQueryHandler(inline_station_search) dispatcher.add_handler(inline_station_search_handler) dispatcher.add_handler(CommandHandler('vvs', handle_vvs)) dispatcher.add_error_handler(error_callback) dispatcher.add_handler(CallbackQueryHandler(handle_multiple_stations_reply)) updater.start_polling() updater.idle()