242 lines
8.7 KiB
Python
242 lines
8.7 KiB
Python
import json
|
|
from datetime import datetime
|
|
|
|
import requests
|
|
from telegram import ParseMode
|
|
|
|
from exceptions import ServerCommunicationError, NoArgError, WeatherStationNotFoundError
|
|
|
|
token: str
|
|
|
|
EMOJIS = {'feels_like': '🧑', 'rain': '☔', 'snow': '🌨️', 'wind': '💨', 'sun': '☀️'}
|
|
|
|
|
|
def __init__():
|
|
with open("openweathermap_token") as openweathermap_token:
|
|
global token
|
|
token = openweathermap_token.read()
|
|
|
|
|
|
class CurrentWeather:
|
|
sunrise: datetime
|
|
sunset: datetime
|
|
temperature: float
|
|
feels_like: float
|
|
wind_speed: float
|
|
rain: float
|
|
main: str
|
|
description: str
|
|
|
|
def __init__(self, json_data: json):
|
|
self.sunrise = datetime.fromtimestamp(json_data['sunrise'])
|
|
self.sunset = datetime.fromtimestamp(json_data['sunset'])
|
|
self.temperature = json_data['temp']
|
|
self.feels_like = json_data['feels_like']
|
|
self.wind_speed = json_data['wind_speed']
|
|
if 'rain' in json_data:
|
|
self.rain = json_data['rain']['1h']
|
|
else:
|
|
self.rain = -1
|
|
self.main = json_data['weather'][0]['main']
|
|
self.description = json_data['weather'][0]['description']
|
|
|
|
def __str__(self):
|
|
string = f"*Now*: {self.main} | {self.temperature:.0f}°C {EMOJIS['feels_like']}: {self.feels_like:.0f}°C "
|
|
if self.rain != -1:
|
|
if self.main == 'Snow':
|
|
string += EMOJIS['snow']
|
|
else:
|
|
string += EMOJIS['rain']
|
|
string += f": {self.rain:.0f}mm"
|
|
|
|
string += f" {EMOJIS['wind']}: {3.6 * self.wind_speed:.1f}km/h" \
|
|
f" {EMOJIS['sun']}: {self.sunrise.strftime('%H:%M')} - {self.sunset.strftime('%H:%M')}"
|
|
return string
|
|
|
|
|
|
class HourlyWeather:
|
|
time: datetime
|
|
temperature: float
|
|
feels_like: float
|
|
wind_speed: float
|
|
probability_of_precipitation: float
|
|
rain: float
|
|
main: str
|
|
description: str
|
|
|
|
def __init__(self, json_data: json):
|
|
self.time = datetime.fromtimestamp(json_data['dt'])
|
|
self.temperature = json_data['temp']
|
|
self.feels_like = json_data['feels_like']
|
|
self.wind_speed = json_data['wind_speed']
|
|
self.probability_of_precipitation = json_data['pop']
|
|
if 'rain' in json_data:
|
|
self.rain = json_data['rain']['1h']
|
|
else:
|
|
self.rain = 0
|
|
self.main = json_data['weather'][0]['main']
|
|
self.description = json_data['weather'][0]['description']
|
|
|
|
def __str__(self):
|
|
string = f"*{self.time.strftime('%Hh')}*: {self.main} | {self.temperature:.0f}°C" \
|
|
f" {EMOJIS['feels_like']}: {self.feels_like:.0f}°C "
|
|
if self.probability_of_precipitation != 0:
|
|
if self.main == 'Snow':
|
|
string += EMOJIS['snow']
|
|
else:
|
|
string += EMOJIS['rain']
|
|
string += f": {100 * self.probability_of_precipitation:.0f}% {self.rain}mm"
|
|
|
|
string += f" {EMOJIS['wind']}: {3.6 * self.wind_speed:.1f}km/h"
|
|
return string
|
|
|
|
|
|
class DailyWeather:
|
|
time: datetime
|
|
temp: dict
|
|
feels_like: float
|
|
wind_speed: float
|
|
probability_of_precipitation: float
|
|
rain: float
|
|
main: str
|
|
description: str
|
|
|
|
def __init__(self, json_data: json):
|
|
self.time = datetime.fromtimestamp(json_data['dt'])
|
|
self.temp = {'min': json_data['temp']['min'], 'day': json_data['temp']['day'], 'max': json_data['temp']['max']}
|
|
self.feels_like = json_data['feels_like']['day']
|
|
self.wind_speed = json_data['wind_speed']
|
|
self.probability_of_precipitation = json_data['pop']
|
|
if 'rain' in json_data:
|
|
self.rain = json_data['rain']
|
|
else:
|
|
self.rain = 0
|
|
self.main = json_data['weather'][0]['main']
|
|
self.description = json_data['weather'][0]['description']
|
|
|
|
def __str__(self):
|
|
string = f"*{self.time.strftime('%d')}*: {self.main} | {self.temp['min']:.0f}°C - {self.temp['day']:.0f}°C" \
|
|
f" - {self.temp['max']:.0f}°C {EMOJIS['feels_like']}: {self.feels_like:.0f}°C "
|
|
if self.probability_of_precipitation != 0:
|
|
if self.main == 'Snow':
|
|
string += EMOJIS['snow']
|
|
else:
|
|
string += EMOJIS['rain']
|
|
string += f": {100 * self.probability_of_precipitation:.0f}% {self.rain}mm"
|
|
string += f" {EMOJIS['wind']}: {3.6 * self.wind_speed:.1f}km/h"
|
|
return string
|
|
|
|
|
|
class TotalWeather:
|
|
current_weather: CurrentWeather
|
|
hourly_weather = []
|
|
daily_weather = []
|
|
|
|
def __init__(self, json_data: json):
|
|
for single_daily_weather in json_data['daily'][2:]:
|
|
self.daily_weather.append(DailyWeather(single_daily_weather))
|
|
for single_hourly_weather in json_data['hourly'][1:]:
|
|
self.hourly_weather.append(HourlyWeather(single_hourly_weather))
|
|
self.current_weather = CurrentWeather(json_data['current'])
|
|
|
|
def __str__(self):
|
|
string = ""
|
|
string += (str(self.current_weather) + '\n')
|
|
short_hourly_weather = self.hourly_weather[:5] + self.hourly_weather[6:20:2] + self.hourly_weather[21::4]
|
|
day_before = short_hourly_weather[0].time.strftime("%d")
|
|
for weather in short_hourly_weather:
|
|
if day_before != weather.time.strftime("%d"):
|
|
string += '\n'
|
|
string += (str(weather) + '\n')
|
|
day_before = weather.time.strftime("%d")
|
|
string += '\n'
|
|
for weather in self.daily_weather:
|
|
string += (str(weather) + '\n')
|
|
return string
|
|
|
|
def clear(self):
|
|
self.hourly_weather.clear()
|
|
self.daily_weather.clear()
|
|
|
|
|
|
class City:
|
|
cid: int
|
|
name: str
|
|
lat: int
|
|
lon: int
|
|
weather = TotalWeather
|
|
|
|
def __init__(self, cid: int, name: str, lat: int, lon: int):
|
|
self.cid = cid
|
|
self.name = name
|
|
self.lat = lat
|
|
self.lon = lon
|
|
|
|
def query_weather(self):
|
|
request = requests.get(
|
|
f"http://api.openweathermap.org/data/2.5/onecall?\
|
|
lang=de&units=metric&exclude=minutely&lat={self.lat}&lon={self.lon}&appid={token}")
|
|
if request.status_code != 200:
|
|
raise ServerCommunicationError
|
|
self.weather = TotalWeather(request.json())
|
|
|
|
def print_weather(self):
|
|
print(self.weather)
|
|
|
|
def get_weather_str(self) -> str:
|
|
return f"Weather for *{self.name}*:\n" + str(self.weather)
|
|
|
|
|
|
def get_city_by_name(city_name: str) -> City:
|
|
with open("city.list.json", "r") as city_list_file:
|
|
city_data = city_list_file.read()
|
|
city_json = json.loads(city_data)
|
|
city_list = list(filter(lambda x: x['name'].casefold() == city_name.casefold(), city_json))
|
|
if len(city_list) == 0:
|
|
city_list = list(filter(lambda x: (x['name'].casefold().find(city_name.casefold())) != -1, city_json))
|
|
if len(city_list) == 0:
|
|
return City(cid=-1, name="none", lat=0, lon=0)
|
|
return City(cid=city_list[0]['id'], name=city_list[0]['name'], lat=city_list[0]['coord']['lat'],
|
|
lon=city_list[0]['coord']['lon'])
|
|
|
|
|
|
def check_city_list():
|
|
with open("city.list.json", "r") as city_list_file:
|
|
city_data = city_list_file.read()
|
|
city_json = json.loads(city_data)
|
|
de_city_list = list()
|
|
for city in city_json:
|
|
# if city['country'] == "DE":
|
|
de_city_list.append(city)
|
|
already_mentioned_city_list = []
|
|
for current_city in de_city_list:
|
|
matched_city_list = list(
|
|
filter(
|
|
lambda x: x['name'] == current_city['name'] and current_city['coord']['lat'] - 1 > x['coord'][
|
|
'lat'] >
|
|
current_city['coord']['lat'] + 1 and current_city['coord']['lon'] - 1 > x['coord'][
|
|
'lon'] >
|
|
current_city['coord']['lon'] + 1, de_city_list))
|
|
if len(matched_city_list) > 1 and current_city['name'] not in already_mentioned_city_list:
|
|
print(matched_city_list)
|
|
already_mentioned_city_list.append(current_city['name'])
|
|
|
|
# city_list = get_city_list_by_name("Berlin")
|
|
# print(city_list[0])
|
|
# for city in city_list:
|
|
# if city['country'] == "DE":
|
|
# print(city['name'] + " in " + city['country'] + " is " + str(city['id']) + " at " + str(
|
|
# city['coord']['lat']) + "," + str(city['coord']['lon']))
|
|
|
|
|
|
def handle_weather(update, context):
|
|
if len(context.args) == 0:
|
|
raise NoArgError
|
|
requested_city = get_city_by_name(' '.join(context.args))
|
|
if requested_city.name == "none":
|
|
raise WeatherStationNotFoundError
|
|
requested_city.query_weather()
|
|
update.message.reply_text(requested_city.get_weather_str(), disable_notification=True,
|
|
parse_mode=ParseMode.MARKDOWN)
|
|
|
|
requested_city.weather.clear()
|