220 lines
8.1 KiB
Python
220 lines
8.1 KiB
Python
import json
|
|
from datetime import datetime
|
|
|
|
import requests
|
|
|
|
from exceptions import ServerCommunicationError, NoArgError, WeatherStationNotFoundError
|
|
|
|
token: str
|
|
|
|
|
|
def __init__():
|
|
with open("openweathermap_token") as openweathermap_token:
|
|
global token
|
|
token = openweathermap_token.read()
|
|
|
|
|
|
class HourlyWeather:
|
|
time: datetime
|
|
temperature: float
|
|
feels_like: float
|
|
wind_speed: float
|
|
probability_of_precipitation: float
|
|
rain: float
|
|
main: str
|
|
description: str
|
|
|
|
def __init__(self, json_data: json):
|
|
self.time = datetime.fromtimestamp(json_data['dt'])
|
|
self.temperature = json_data['temp']
|
|
self.feels_like = json_data['feels_like']
|
|
self.wind_speed = json_data['wind_speed']
|
|
self.probability_of_precipitation = json_data['pop']
|
|
if 'rain' in json_data:
|
|
self.rain = json_data['rain']
|
|
else:
|
|
self.rain = 0
|
|
self.main = json_data['weather'][0]['main']
|
|
self.description = json_data['weather'][0]['description']
|
|
|
|
def __str__(self):
|
|
if self.probability_of_precipitation == 0:
|
|
return "*{time}*: {main} | {temperature}°C 🧑: {feels_like}°C" \
|
|
" 💨: {wind_speed}m/s" \
|
|
.format(time=self.time.strftime("%Hh"), main=self.main,
|
|
temperature=self.temperature,
|
|
feels_like=self.feels_like,
|
|
wind_speed=self.wind_speed)
|
|
else:
|
|
return "*{time}*: {main} | {temperature}°C 🧑: {feels_like}°C ☔: {pop:.0f}% {rain}mm 💨: {wind_speed}m/s" \
|
|
.format(time=self.time.strftime("%Hh"), main=self.main,
|
|
temperature=self.temperature,
|
|
feels_like=self.feels_like,
|
|
pop=(100 * self.probability_of_precipitation),
|
|
rain=self.rain,
|
|
wind_speed=self.wind_speed)
|
|
|
|
|
|
class DailyWeather:
|
|
time: datetime
|
|
temp = dict()
|
|
feels_like: float
|
|
wind_speed: float
|
|
probability_of_precipitation: float
|
|
rain: float
|
|
main: str
|
|
description: str
|
|
|
|
def __init__(self, json_data: json):
|
|
self.time = datetime.fromtimestamp(json_data['dt'])
|
|
self.temp['min'] = json_data['temp']['min']
|
|
self.temp['day'] = json_data['temp']['day']
|
|
self.temp['max'] = json_data['temp']['max']
|
|
self.feels_like = json_data['feels_like']['day']
|
|
self.wind_speed = json_data['wind_speed']
|
|
self.probability_of_precipitation = json_data['pop']
|
|
if 'rain' in json_data:
|
|
self.rain = json_data['rain']
|
|
else:
|
|
self.rain = 0
|
|
self.main = json_data['weather'][0]['main']
|
|
self.description = json_data['weather'][0]['description']
|
|
|
|
def __str__(self):
|
|
if self.probability_of_precipitation == 0:
|
|
return "*{time}*: {main} | {min} - {day} - {max} 🧑: {feels_like_day}" \
|
|
" 💨: {wind_speed}m/s" \
|
|
.format(time=self.time.strftime("%d"),
|
|
main=self.main,
|
|
min=self.temp['min'],
|
|
day=self.temp['day'],
|
|
max=self.temp['max'],
|
|
feels_like_day=self.feels_like,
|
|
wind_speed=self.wind_speed)
|
|
else:
|
|
return "*{time}*: {main} | {min} - {day} - {max} 🧑: {feels_like_day}" \
|
|
" ☔: {pop:.0f}% {rain}mm 💨: {wind_speed}m/s" \
|
|
.format(time=self.time.strftime("%d"),
|
|
main=self.main,
|
|
min=self.temp['min'],
|
|
day=self.temp['day'],
|
|
max=self.temp['max'],
|
|
feels_like_day=self.feels_like,
|
|
pop=(100 * self.probability_of_precipitation),
|
|
rain=self.rain,
|
|
wind_speed=self.wind_speed)
|
|
|
|
|
|
class TotalWeather:
|
|
hourly_weather = []
|
|
daily_weather = []
|
|
|
|
def __init__(self, json_data: json):
|
|
for single_daily_weather in json_data['daily']:
|
|
self.daily_weather.append(DailyWeather(single_daily_weather))
|
|
for single_hourly_weather in json_data['hourly']:
|
|
self.hourly_weather.append(HourlyWeather(single_hourly_weather))
|
|
|
|
def __str__(self):
|
|
string = ""
|
|
short_hourly_weather = self.hourly_weather[:5] + self.hourly_weather[6:20:2] + self.hourly_weather[21::4]
|
|
day_before = short_hourly_weather[0].time.strftime("%d")
|
|
for weather in short_hourly_weather:
|
|
if day_before != weather.time.strftime("%d"):
|
|
string += '\n'
|
|
string += (str(weather) + '\n')
|
|
day_before = weather.time.strftime("%d")
|
|
string += '\n'
|
|
for weather in self.daily_weather:
|
|
string += (str(weather) + '\n')
|
|
return string
|
|
|
|
def clear(self):
|
|
self.hourly_weather.clear()
|
|
self.daily_weather.clear()
|
|
|
|
|
|
class City:
|
|
cid: int
|
|
name: str
|
|
lat: int
|
|
lon: int
|
|
weather = TotalWeather
|
|
|
|
def __init__(self, cid: int, name: str, lat: int, lon: int):
|
|
self.cid = cid
|
|
self.name = name
|
|
self.lat = lat
|
|
self.lon = lon
|
|
|
|
def get_weather(self):
|
|
request = requests.get(
|
|
"http://api.openweathermap.org/data/2.5/onecall?\
|
|
lang=de&units=metric&exclude=minutely&lat={lat}&lon={lon}&appid={token}".format(
|
|
lat=self.lat,
|
|
lon=self.lon, token=token))
|
|
if request.status_code != 200:
|
|
raise ServerCommunicationError
|
|
self.weather = TotalWeather(request.json())
|
|
|
|
def print_weather(self):
|
|
print(self.weather)
|
|
|
|
|
|
def get_city_by_name(city_name: str) -> City:
|
|
with open("city.list.json", "r") as city_list_file:
|
|
city_data = city_list_file.read()
|
|
city_json = json.loads(city_data)
|
|
city_list = list(filter(lambda x: x['name'].casefold() == city_name.casefold(), city_json))
|
|
if len(city_list) == 0:
|
|
city_list = list(filter(lambda x: (x['name'].casefold().find(city_name.casefold())) != -1, city_json))
|
|
if len(city_list) == 0:
|
|
return City(cid=-1, name="none", lat=0, lon=0)
|
|
return City(cid=city_list[0]['id'], name=city_list[0]['name'], lat=city_list[0]['coord']['lat'],
|
|
lon=city_list[0]['coord']['lon'])
|
|
|
|
# city_lat = city['coord']['lat']
|
|
# city_lon = city['coord']['lon']
|
|
#
|
|
# print(str(city_lat) + " " + str(city_lon))
|
|
|
|
|
|
def check_city_list():
|
|
with open("city.list.json", "r") as city_list_file:
|
|
city_data = city_list_file.read()
|
|
city_json = json.loads(city_data)
|
|
de_city_list = list()
|
|
for city in city_json:
|
|
# if city['country'] == "DE":
|
|
de_city_list.append(city)
|
|
already_mentioned_city_list = []
|
|
for current_city in de_city_list:
|
|
matched_city_list = list(
|
|
filter(
|
|
lambda x: x['name'] == current_city['name'] and current_city['coord']['lat'] - 1 > x['coord'][
|
|
'lat'] >
|
|
current_city['coord']['lat'] + 1 and current_city['coord']['lon'] - 1 > x['coord'][
|
|
'lon'] >
|
|
current_city['coord']['lon'] + 1, de_city_list))
|
|
if len(matched_city_list) > 1 and current_city['name'] not in already_mentioned_city_list:
|
|
print(matched_city_list)
|
|
already_mentioned_city_list.append(current_city['name'])
|
|
|
|
# city_list = get_city_list_by_name("Berlin")
|
|
# print(city_list[0])
|
|
# for city in city_list:
|
|
# if city['country'] == "DE":
|
|
# print(city['name'] + " in " + city['country'] + " is " + str(city['id']) + " at " + str(
|
|
# city['coord']['lat']) + "," + str(city['coord']['lon']))
|
|
|
|
|
|
def handle_weather(update, context):
|
|
if len(context.args) == 0:
|
|
raise NoArgError
|
|
requested_city = get_city_by_name(' '.join(context.args))
|
|
if requested_city.name == "none":
|
|
raise WeatherStationNotFoundError
|
|
requested_city.get_weather()
|
|
update.message.reply_text(str(requested_city.weather), disable_notification=True, parse_mode="Markdown")
|
|
|
|
requested_city.weather.clear()
|