采用异步Async

采用异步Async

问题分析

我们上文中获取天气信息的性能不佳,主要在给网站发送请求的时候,需要等很久才能获得回复。

这节我们需要想办法提高性能。

我们先把关键代码复制到本节。

import pandas as pd
city_file = '../data/worldcities.csv'
city_df = pd.read_csv(city_file,encoding='utf-8')
city_df
city city_ascii lat lng country iso2 iso3 admin_name capital population id
0 Tokyo Tokyo 35.6897 139.6922 Japan JP JPN Tōkyō primary 37977000.0 1392685764
1 Jakarta Jakarta -6.2146 106.8451 Indonesia ID IDN Jakarta primary 34540000.0 1360771077
2 Delhi Delhi 28.6600 77.2300 India IN IND Delhi admin 29617000.0 1356872604
3 Mumbai Mumbai 18.9667 72.8333 India IN IND Mahārāshtra admin 23355000.0 1356226629
4 Manila Manila 14.6000 120.9833 Philippines PH PHL Manila primary 23088000.0 1608618140
... ... ... ... ... ... ... ... ... ... ... ...
40996 Tukchi Tukchi 57.3670 139.5000 Russia RU RUS Khabarovskiy Kray NaN 10.0 1643472801
40997 Numto Numto 63.6667 71.3333 Russia RU RUS Khanty-Mansiyskiy Avtonomnyy Okrug-Yugra NaN 10.0 1643985006
40998 Nord Nord 81.7166 -17.8000 Greenland GL GRL Sermersooq NaN 10.0 1304217709
40999 Timmiarmiut Timmiarmiut 62.5333 -42.2167 Greenland GL GRL Kujalleq NaN 10.0 1304206491
41000 Nordvik Nordvik 74.0165 111.5100 Russia RU RUS Krasnoyarskiy Kray NaN 0.0 1643587468

41001 rows × 11 columns

capital_china = city_df[(city_df['country']=='China') & (city_df['capital'] is not None)]
capital_china
city city_ascii lat lng country iso2 iso3 admin_name capital population id
5 Shanghai Shanghai 31.1667 121.4667 China CN CHN Shanghai admin 22120000.0 1156073548
9 Guangzhou Guangzhou 23.1288 113.2590 China CN CHN Guangdong admin 20902000.0 1156237133
10 Beijing Beijing 39.9050 116.3914 China CN CHN Beijing primary 19433000.0 1156228865
17 Shenzhen Shenzhen 22.5350 114.0540 China CN CHN Guangdong minor 15929000.0 1156158707
29 Nanyang Nanyang 32.9987 112.5292 China CN CHN Henan NaN 12010000.0 1156192287
... ... ... ... ... ... ... ... ... ... ... ...
40725 Taoyan Taoyan 34.7706 103.7903 China CN CHN Gansu NaN 5329.0 1156019900
40744 Jingping Jingping 33.7844 104.3652 China CN CHN Gansu NaN 5149.0 1156005145
40776 Dayi Dayi 33.8312 104.0362 China CN CHN Gansu NaN 5114.0 1156108713
40782 Biancang Biancang 33.9007 104.0321 China CN CHN Gansu NaN 5040.0 1156724811
40938 Nichicun Nichicun 29.5333 94.4167 China CN CHN Tibet NaN 100.0 1156860651

1498 rows × 11 columns

def generate_url(longitude,latitude):
    url = f'https://www.7timer.info/bin/api.pl?lon={longitude}&lat={latitude}&product=civil&output=json'
    return url

def transform_weather_raw(text_j):
    weather_info = pd.DataFrame(text_j['dataseries'])
    start_time = pd.to_datetime(text_j['init'],format='%Y%m%d%H')
    weather_info['timepoint'] = pd.to_timedelta(weather_info['timepoint'],unit='h')
    weather_info['timestamp'] = start_time+ weather_info['timepoint']
    weather_info.drop('timepoint',axis=1,inplace=True)
    # more clean data steps
    wind_df = pd.json_normalize(weather_info['wind10m'])
    wind_df.columns = ['wind_'+col for col in wind_df.columns]
    weather_info = pd.concat([weather_info,wind_df],axis=1)
    weather_info.drop('wind10m',axis=1,inplace=True)
    weather_info['rh2m'] = weather_info['rh2m'].str.rstrip('%')
    #['']
    return weather_info

def add_city_info(weather_info,longitude,latitude,city):
    weather_info['longitude'] = longitude
    weather_info['latitude'] = latitude
    weather_info['city'] = city
    return weather_info

采用异步请求

我们已经简单分析过,发送一个请求,我们需要等网站很久(2秒)才能得到回复。对于上千个城市,我们等不起啊。其实很多网站API都是接受多个客户端请求的,也就是同时可以接收多个get request。

所以我们不用等第一个request返回结果,就把第二个,第三个,第N个request 发送出去,这样等第一个request 返回结果时,我们再处理就可以了。 这样就不用傻等了,这就是异步的思想。

这里我们就采用requests的异步升级版本grequests来进行异步操作。

安装依然很容易:pip install grequests

入门稍微有点难,主要涉及到一些异步的思想和map的思想。不过对于我们入门来说,只需要调用一个函数即可: grequests.imap(rs, size=50)

这里的size就是每次发送请求的数量。

import grequests
import json
from tqdm.notebook import tqdm
city_list = []
lon_list = []
lat_list = []
url_list = []
for index,city_info in capital_china.iterrows():
    city = city_info['city']
    city_list.append(city)
    lon = city_info['lng']
    lon_list.append(lon)
    lat = city_info['lat']
    lat_list.append(lat)
    url = generate_url(longitude=lon,latitude=lat)
    url_list.append(url)
    
rs = (grequests.get(u) for u in url_list)
all_cities_df = pd.DataFrame()
for i,r in tqdm(enumerate(grequests.imap(rs, size=50))):
    text_j= json.loads(r.text)
    weather_info_df = transform_weather_raw(text_j)
    weather_info_df = add_city_info(weather_info_df,lon_list[i],lat_list[i],city_list[i])
    all_cities_df = pd.concat([all_cities_df,weather_info_df],axis=0)
c:\users\renb\pycharmprojects\weather_dashapp\dash\lib\site-packages\gevent\hub.py:161: UserWarning: libuv only supports millisecond timer resolution; all times less will be set to 1 ms
  with loop.timer(seconds, ref=ref) as t:

获取的数据,我们需要保存到数据库里。

import os
import sys

module_path = os.path.abspath(os.path.join('../..'))
print(module_path)
if module_path not in sys.path:
    sys.path.append(module_path)
from weather_book.weather_app.models.db_models import engine,WeatherInfo
all_cities_df['id'] = [i for i in range(all_cities_df.shape[0])]
all_cities_df.to_sql('weather',engine,if_exists='append',index=False) # without index
C:\Users\renb\PycharmProjects\weather_dashapp
83392
c:\users\renb\pycharmprojects\weather_dashapp\dash\lib\site-packages\gevent\hub.py:161: UserWarning: libuv only supports millisecond timer resolution; all times less will be set to 1 ms
  with loop.timer(seconds, ref=ref) as t: