学习如何从全球获取丰富、实时的空气质量数据
本文详细介绍了如何使用Python中的Google Maps空气质量API来获取和探索实时空气污染数据、时间序列和地图。查看完整代码请点击这里。
1. 背景
Google在2023年8月宣布将空气质量服务添加到其映射API列表中。您可以在这里阅读更多相关信息。虽然从API获取的数据确实更丰富,但现在这些信息似乎也可以从Google地图应用程序内获取。
根据公告,Google将结合来自不同分辨率的多个来源的信息-地面污染传感器、卫星数据、实时交通信息和来自数值模型的预测-以生成更新动态的、分辨率高达500m的100个国家空气质量数据集。这听起来是一个非常有趣和潜在有用的数据集,适用于各种映射、医疗保健和规划应用!
首次阅读这些内容时,我计划在“与数据交流”应用程序中尝试它,使用从构建此旅行地图工具中学到的一些知识,例如绘制您最喜欢城市空气污染浓度的时间序列,或者帮助人们计划当地的徒步旅行以避免有害空气的工具。
这里有三个API工具可以帮助解决这个问题-一个“当前状况”服务,提供给定位置的当前空气质量指数值和污染物浓度;一个“历史状况”服务,以每小时为间隔提供过去30天的相同数据;以及一个“热图”服务,提供给定区域的当前条件作为图像。
以前,我曾经使用过优秀的googlemaps
包来调用Python中的Google地图API,但这些新的API尚未支持。令人惊讶的是,在官方文档之外,我几乎找不到人们使用这些新工具的示例,也没有预先设计用于调用它们的Python包。但如果有人知道的话,我会乐意纠正!
因此,我自己构建了一些快速工具,在本文中我们将了解它们的工作原理以及如何使用它们。我希望对任何想在Python中尝试这些新API并寻找入门位置的人有所帮助。本项目的所有代码都可以在这里找到,并且我可能会随着时间的推移不断扩展这个存储库,增加更多功能并构建某种利用空气质量数据的映射应用程序。
2. 获取给定位置的当前空气质量
让我们开始吧!在本节中,我们将介绍如何使用Google Maps在给定位置获取空气质量数据。您首先需要一个API密钥,可以通过您的Google Cloud账户生成。他们提供90天的免费试用期,之后您将根据使用的API服务付费。在开始大量调用之前,请确保您启用了“Air Quality API”,并了解价格政策!
我通常将API密钥保存在.env
文件中,并使用类似下面的函数使用dotenv
加载它
from dotenv import load_dotenvfrom pathlib import Pathdef load_secets(): load_dotenv() env_path = Path(".") / ".env" load_dotenv(dotenv_path=env_path) google_maps_key = os.getenv("GOOGLE_MAPS_API_KEY") return { "GOOGLE_MAPS_API_KEY": google_maps_key, }
获取当前条件需要一个POST请求,具体信息请参考这里。我们将从googlemaps包中获得灵感,以一种可以泛化的方式进行操作。首先,我们构建一个使用requests
进行调用的客户端类。目标非常简单-我们想要构建一个像下面这样的URL,并包括用户查询特定的所有请求选项。
https://airquality.googleapis.com/v1/currentConditions:lookup?key=YOUR_API_KEY
Client 类接收我们的 API 密钥作为 key
,然后根据查询构建 request_url
。它接受请求选项作为 params
字典,然后将它们放入 JSON 请求体中,并通过调用 self.session.post()
处理。
import requestsimport ioclass Client(object): DEFAULT_BASE_URL = "https://airquality.googleapis.com" def __init__(self, key): self.session = requests.Session() self.key = key def request_post(self, url, params): request_url = self.compose_url(url) request_header = self.compose_header() request_body = params response = self.session.post( request_url, headers=request_header, json=request_body, ) return self.get_body(response) def compose_url(self, path): return self.DEFAULT_BASE_URL + path + "?" + "key=" + self.key @staticmethod def get_body(response): body = response.json() if "error" in body: return body["error"] return body @staticmethod def compose_header(): return { "Content-Type": "application/json", }
现在我们可以编写一个函数,帮助用户组装当前情况 API 的有效请求选项,然后使用此 Client 类来进行请求。这受到了 googlemaps 包设计的启发。
def current_conditions( client, location, include_local_AQI=True, include_health_suggestion=False, include_all_pollutants=True, include_additional_pollutant_info=False, include_dominent_pollutant_conc=True, language=None,): """ 查看此 API 的文档 https://developers.google.com/maps/documentation/air-quality/reference/rest/v1/currentConditions/lookup """ params = {} if isinstance(location, dict): params["location"] = location else: raise ValueError( "位置参数必须是包含纬度和经度的字典" ) extra_computations = [] if include_local_AQI: extra_computations.append("LOCAL_AQI") if include_health_suggestion: extra_computations.append("HEALTH_RECOMMENDATIONS") if include_additional_pollutant_info: extra_computations.append("POLLUTANT_ADDITIONAL_INFO") if include_all_pollutants: extra_computations.append("POLLUTANT_CONCENTRATION") if include_dominent_pollutant_conc: extra_computations.append("DOMINANT_POLLUTANT_CONCENTRATION") if language: params["language"] = language params["extraComputations"] = extra_computations return client.request_post("/v1/currentConditions:lookup", params)
此 API 的选项相对简单明了。它需要一个包含所需点的经度和纬度的字典,并可根据需要接收多个其他参数以控制返回的信息量。让我们使用所有参数设置为 True
来演示一下。
# 设置客户端client = Client(key=GOOGLE_MAPS_API_KEY)# 洛杉矶的一个位置location = {"longitude":-118.3,"latitude":34.1}# 获取 JSON 响应current_conditions_data = current_conditions( client, location, include_health_suggestion=True, include_additional_pollutant_info=True)
返回了许多有趣的信息!不仅提供了来自通用 AQI 和美国 AQI 指数的空气质量指数值,而且还提供了主要污染物的浓度、每个污染物的描述以及当前空气质量的健康建议。
{'dateTime': '2023-10-12T05:00:00Z', 'regionCode': 'us', 'indexes': [{'code': 'uaqi', 'displayName': '通用 AQI', 'aqi': 60, 'aqiDisplay': '60', 'color': {'red': 0.75686276, 'green': 0.90588236, 'blue': 0.09803922}, 'category': '良好空气质量', 'dominantPollutant': 'pm10'}, {'code': 'usa_epa', 'displayName': 'AQI(美国)', 'aqi': 39, 'aqiDisplay': '39', 'color': {'green': 0.89411765}, 'category': '良好空气质量', 'dominantPollutant': 'pm10'}], 'pollutants': [{'code': 'co', 'displayName': 'CO', 'fullName': '一氧化碳', 'concentration': {'value': 292.61, 'units': 'PARTS_PER_BILLION'}, 'additionalInfo': {'sources': '通常由碳燃料不完全燃烧(如汽车发动机和发电厂)引起。', 'effects': '吸入一氧化碳可阻止血液携带氧气。接触可能导致头晕、恶心和头痛。极高浓度的接触可能导致失去意识。'}}, {'code': 'no2', 'displayName': 'NO2', 'fullName': '二氧化氮', 'concentration': {'value': 22.3, 'units': 'PARTS_PER_BILLION'}, 'additionalInfo': {'sources': '主要来源是工业和交通中的燃料燃烧过程。', 'effects': '接触可能导致哮喘患者的支气管反应增强,慢性阻塞性肺疾病(COPD)患者的肺功能下降,并增加儿童呼吸道感染的风险。'}}, {'code': 'o3', 'displayName': 'O3', 'fullName': '臭氧', 'concentration': {'value': 24.17, 'units': 'PARTS_PER_BILLION'}, 'additionalInfo': {'sources': '臭氧是在日光下大气氧气、氮氧化物、一氧化碳和有机化合物之间发生的化学反应中形成的。', 'effects': '臭氧可以刺激呼吸道,引起咳嗽、灼热感、喘息和呼吸急促。此外,臭氧是光化学烟雾的主要成分之一。'}}, {'code': 'pm10', 'displayName': 'PM10', 'fullName': '可吸入颗粒物(<10µm)', 'concentration': {'value': 44.48, 'units': 'MICROGRAMS_PER_CUBIC_METER'}, 'additionalInfo': {'sources': '主要来源是燃烧过程(例如室内供暖、野火)、机械过程(例如建筑、矿物尘、农业)和生物颗粒物(如花粉、细菌、霉菌)。', 'effects': '可3. 获取给定位置的空气质量时间序列
如果能够获取给定位置的空气质量指数(AQI)和污染物数值的时间序列岂不是很好吗?这可能会揭示出污染物之间的相关性或者由交通或与天气相关因素引起的每日波动等有趣的模式。
我们可以通过向历史情况API发送另一个POST请求来实现这一功能,这将给我们提供一个小时历史记录。这个过程与获取当前情况的方式非常相似,唯一的主要区别是,由于结果可能很长,它们以多个页面的形式返回,需要一些额外的逻辑来处理。
我们来修改Client
类的request_post
方法以处理这个情况。
def request_post(self,url,params): request_url = self.compose_url(url) request_header = self.compose_header() request_body = params response = self.session.post( request_url, headers=request_header, json=request_body, ) response_body = self.get_body(response) # 将第一页放入响应字典中 page = 1 final_response = { "page_{}".format(page) : response_body } # 获取所有页面(如果需要) while "nextPageToken" in response_body: # 使用下一页的令牌再次调用API request_body.update({ "pageToken":response_body["nextPageToken"] }) response = self.session.post( request_url, headers=request_header, json=request_body, ) response_body = self.get_body(response) page += 1 final_response["page_{}".format(page)] = response_body return final_response
这段代码处理了response_body
中包含了名为nextPageToken
的字段的情况,该字段是已生成并准备提取的下一页数据的标识符。当这个信息存在时,我们只需要使用一个名为pageToken
的新参数再次调用API,将其定向到相应的页面。我们在一个while循环中重复这个过程,直到没有更多的页面为止。因此,我们的final_response
字典现在包含了另一个由页码表示的层级。对于对current_conditions
的调用,通常只有一个页面,但对于对historical_conditions
的调用,可能会有多个页面。
处理完这一点后,我们可以以与current_conditions
非常相似的方式编写一个historical_conditions
函数。
def historical_conditions( client, location, specific_time=None, lag_time=None, specific_period=None, include_local_AQI=True, include_health_suggestion=False, include_all_pollutants=True, include_additional_pollutant_info=False, include_dominant_pollutant_conc=True, language=None,): """ 请参阅此API的文档:https://developers.google.com/maps/documentation/air-quality/reference/rest/v1/history/lookup """ params = {} if isinstance(location, dict): params["location"] = location else: raise ValueError( "位置参数必须是一个包含纬度和经度的字典" ) if isinstance(specific_period, dict) and not specific_time and not lag_time: assert "startTime" in specific_period assert "endTime" in specific_period params["period"] = specific_period elif specific_time and not lag_time and not isinstance(specific_period, dict): # 注意,时间必须以"Zulu"格式提供 # 例如:datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%dT%H:%M:%SZ") params["dateTime"] = specific_time # 延迟时间以小时为单位 elif lag_time and not specific_time and not isinstance(specific_period, dict): params["hours"] = lag_time else: raise ValueError( "必须提供specific_time、specific_period或lag_time参数" ) extra_computations = [] if include_local_AQI: extra_computations.append("LOCAL_AQI") if include_health_suggestion: extra_computations.append("HEALTH_RECOMMENDATIONS") if include_additional_pollutant_info: extra_computations.append("POLLUTANT_ADDITIONAL_INFO") if include_all_pollutants: extra_computations.append("POLLUTANT_CONCENTRATION") if include_dominant_pollutant_conc: extra_computations.append("DOMINANT_POLLUTANT_CONCENTRATION") if language: params["language"] = language params["extraComputations"] = extra_computations # 这里将页面大小默认设置为100 params["pageSize"] = 100 # 如果需要,页面令牌将会被request_post方法填充 params["pageToken"] = "" return client.request_post("/v1/history:lookup", params)
为了定义历史时期,API可以接受一个lag_time
,单位为小时,最多可达720小时(30天)。它还可以接受一个specific_period
字典,其中定义了起始时间和结束时间的格式如上方的注释所述。最后,为了获取单个小时的数据,它可以只接受一个时间戳,由specific_time
提供。还请注意pageSize
参数的使用,该参数控制每次调用API时返回多少时间点。默认值为100。
让我们试一试。
# 设置客户端client = Client(key=GOOGLE_MAPS_API_KEY)# 洛杉矶的一个位置CAlocation = {"longitude":-118.3,"latitude":34.1}# 一个JSON响应history_conditions_data = 历史条件(client, location, lag_time=720)
我们应该获得一个长、嵌套的JSON响应,其中包含过去720小时内每小时增加的AQI指数和特定污染物值。有许多方法可以将其格式化成更适合可视化和分析的结构,下面的函数将其转换为一个"长"格式的pandas数据帧,这在使用seaborn
进行绘图时非常有效。
from itertools import chainimport pandas as pddef 历史条件转为数据框(response_dict): chained_pages = list(chain(*[response_dict[p]["hoursInfo"] for p in [*response_dict]])) all_indexes = [] all_pollutants = [] for i in range(len(chained_pages)): # 需要进行这个检查,以防其中一个时间戳丢失数据,有时会发生 if "indexes" in chained_pages[i]: this_element = chained_pages[i] # 获取时间 time = this_element["dateTime"] # 获取所有指数值并添加元数据 all_indexes += [(time , x["code"],x["displayName"],"index",x["aqi"],None) for x in this_element['indexes']] # 获取所有污染物值并添加元数据 all_pollutants += [(time , x["code"],x["fullName"],"pollutant",x["concentration"]["value"],x["concentration"]["units"]) for x in this_element['pollutants']] all_results = all_indexes + all_pollutants # 生成"长格式"的数据帧 res = pd.DataFrame(all_results,columns=["time","code","name","type","value","unit"]) res["time"]=pd.to_datetime(res["time"]) return res
在历史条件
的输出上运行此函数将生成一个数据框,其列格式便于进行分析。
df = 历史条件转为数据框(history_conditions_data)
现在我们可以在seaborn
或其他可视化工具中绘制结果。
import seaborn as snsns.relplot绘制( x="time", y="value", data=df[df["code"].isin(["uaqi","usa_epa","pm25","pm10"])], kind="line", col="name", col_wrap=4, hue="type", height=4, facet_kws={'sharey': False, 'sharex': False})g.set_xticklabels(rotation=90)
这已经非常有趣了!污染物时间序列中明显存在几种周期性,并且预期的是美国AQI与pm25和pm10浓度密切相关。我对Google在这里提供的通用AQI了解得不多,所以无法解释它为什么与pm25和p10呈相反的相关性。较小的UAQI是否意味着空气质量更好?尽管我在搜索中找了很多,但仍然找不到一个好的答案。
4. 获取空气质量热力图瓷砖
现在是使用谷歌地图空气质量 API 的最后一个用例 — 生成热力图瓷砖。关于这个的文档非常稀疏,这很可惜,因为这些瓷砖是用于可视化当前空气质量的强大工具,尤其是与 Folium
地图相结合时。
我们使用 GET 请求获取它们,这涉及构建以以下格式为 URL 的 URL,其中瓷砖的位置由 zoom
、x
和 y
指定。
GET https://airquality.googleapis.com/v1/mapTypes/{mapType}/heatmapTiles/{zoom}/{x}/{y}
zoom
、x
和 y
是什么意思?我们可以通过了解谷歌地图如何将经纬度坐标转换为 “瓷砖坐标”,在这里详细描述了这一点。基本上,谷歌地图将图像存储在网格中,每个单元格测量 256 x 256 像素,并且单元格的实际尺寸是缩放级别的函数。当我们调用 API 时,我们需要指定从哪个网格绘制 —— 这由缩放级别决定 —— 以及从网格的哪个位置绘制 —— 这由 x
和 y
瓷砖坐标决定。返回的是一个字节数组,可以被 Python Imaging Library (PIL) 或类似的图像处理包读取。
在上述格式中形成了我们的 url
后,我们可以为 Client
类添加一些方法,使我们能够获取相应的图像。
def request_get(self,url): request_url = self.compose_url(url) response = self.session.get(request_url) # 用于从热力图瓷砖服务中获取的图像 return self.get_image(response) @staticmethod def get_image(response): if response.status_code == 200: image_content = response.content # 注意在这里使用了 PIL 的 Image # 需要 from PIL import Image image = Image.open(io.BytesIO(image_content)) return image else: print("获取图像的 GET 请求返回了一个错误") return None
这很好,但我们真正需要的是将经度和纬度的一组坐标转换为瓷砖坐标的能力。文档解释了如何做到这一点 —— 我们首先将坐标转换为 Mercator 投影,然后再使用指定的缩放级别将其转换为 “像素坐标”。最后将其转换为瓷砖坐标。为了处理所有这些转换,我们可以使用下面的 TileHelper
类。
import mathimport numpy as npclass TileHelper(object): def __init__(self, tile_size=256): self.tile_size = tile_size def location_to_tile_xy(self,location,zoom_level=4): # 基于这个函数 # https://developers.google.com/maps/documentation/javascript/examples/map-coordinates#maps_map_coordinates-javascript lat = location["latitude"] lon = location["longitude"] world_coordinate = self._project(lat,lon) scale = 1 << zoom_level pixel_coord = (math.floor(world_coordinate[0]*scale), math.floor(world_coordinate[1]*scale)) tile_coord = (math.floor(world_coordinate[0]*scale/self.tile_size),math.floor(world_coordinate[1]*scale/self.tile_size)) return world_coordinate, pixel_coord, tile_coord def tile_to_bounding_box(self,tx,ty,zoom_level): # 参见 https://developers.google.com/maps/documentation/javascript/coordinates # 了解详情 box_north = self._tiletolat(ty,zoom_level) # 瓷砖号朝南增加 box_south = self._tiletolat(ty+1,zoom_level) box_west = self._tiletolon(tx,zoom_level) # 瓷砖号朝东增加 box_east = self._tiletolon(tx+1,zoom_level) # (latmin, latmax, lonmin, lonmax) return (box_south, box_north, box_west, box_east) @staticmethod def _tiletolon(x,zoom): return x / math.pow(2.0,zoom) * 360.0 - 180.0 @staticmethod def _tiletolat(y,zoom): n = math.pi - (2.0 * math.pi * y)/math.pow(2.0,zoom) return math.atan(math.sinh(n))*(180.0/math.pi) def _project(self,lat,lon): siny = math.sin(lat*math.pi/180.0) siny = min(max(siny,-0.9999), 0.9999) return (self.tile_size*(0.5 + lon/360), self.tile_size*(0.5 - math.log((1 + siny) / (1 - siny)) / (4 * math.pi))) @staticmethod def find_nearest_corner(location,bounds): corner_lat_idx = np.argmin([ np.abs(bounds[0]-location["latitude"]), np.abs(bounds[1]-location["latitude"]) ]) corner_lon_idx = np.argmin([ np.abs(bounds[2]-location["longitude"]), np.abs(bounds[3]-location["longitude"]) ]) if (corner_lat_idx == 0) and (corner_lon_idx == 0): # closests 是 latmin,lonmin direction = "西南" elif (corner_lat_idx == 0) and (corner_lon_idx == 1): direction = "东南" elif (corner_lat_idx == 1) and (corner_lon_idx == 0): direction = "西北" else: direction = "东北" corner_coords = (bounds[corner_lat_idx],bounds[corner_lon_idx+2]) return corner_coords, direction @staticmethod def get_ajoining_tiles(tx,ty,direction): if direction == "西南": return [(tx-1,ty),(tx-1,ty+1),(tx,ty+1)] elif direction == "东南": return [(tx+1,ty),(tx+1,ty-1),(tx,ty-1)] elif direction == "西北": return [(tx-1,ty-1),(tx-1,ty),(tx,ty-1)] else: return [(tx+1,ty-1),(tx+1,ty),(tx,ty-1)]
我们可以看到location_to_tile_xy
函数接受一个位置字典和缩放级别作为参数,并返回该点所在的瓦片。另一个有用的函数是tile_to_bounding_box
,它会找到指定网格单元的边界坐标。如果我们要对该单元进行地理定位并在地图上标记出来,就需要这个函数。
让我们来看看下面的air_quality_tile
函数是如何工作的。这个函数会接收我们的client
、location
和一个指示要获取哪种类型瓦片的字符串作为参数。我们还需要指定一个缩放级别,初次选择可能比较困难,需要进行一些试错。我们稍后会讨论get_adjoining_tiles
参数。
def air_quality_tile(client, location, pollutant="UAQI_INDIGO_PERSIAN", zoom=4, get_adjoining_tiles = True): # 参见 https://developers.google.com/maps/documentation/air-quality/reference/rest/v1/mapTypes.heatmapTiles/lookupHeatmapTile assert pollutant in [ "UAQI_INDIGO_PERSIAN", "UAQI_RED_GREEN", "PM25_INDIGO_PERSIAN", "GBR_DEFRA", "DEU_UBA", "CAN_EC", "FRA_ATMO", "US_AQI" ] # 包含处理瓦片坐标的有用方法 helper = TileHelper() # 获取位置所在的瓦片 world_coordinate, pixel_coord, tile_coord = helper.location_to_tile_xy(location, zoom_level=zoom) # 获取该瓦片的边界框 bounding_box = helper.tile_to_bounding_box(tx=tile_coord[0], ty=tile_coord[1], zoom_level=zoom) if get_adjoining_tiles: nearest_corner, nearest_corner_direction = helper.find_nearest_corner(location, bounding_box) adjoining_tiles = helper.get_ajoining_tiles(tile_coord[0], tile_coord[1], nearest_corner_direction) else: adjoining_tiles = [] tiles = [] # 获取所有相邻的瓦片,以及所研究的瓦片 for tile in adjoining_tiles + [tile_coord]: bounding_box = helper.tile_to_bounding_box(tx=tile[0], ty=tile[1], zoom_level=zoom) image_response = client.request_get( "/v1/mapTypes/" + pollutant + "/heatmapTiles/" + str(zoom) + '/' + str(tile[0]) + '/' + str(tile[1]) ) # 将PIL图像转换为numpy数组 try: image_response = np.array(image_response) except: image_response = None tiles.append({ "bounds": bounding_box, "image": image_response }) return tiles
从代码中我们可以看出,流程如下:首先,找到感兴趣位置的瓦片坐标,这确定了我们要获取的网格单元。然后,找到这个网格单元的边界坐标。如果我们要获取周围的瓦片,就找到边界框的最近角,并使用它来计算三个相邻网格单元的瓦片坐标。然后调用API,并将每个瓦片作为带有相应边界框的图像返回。
我们可以按照以下方式运行,
client = Client(key=GOOGLE_MAPS_API_KEY)location = {"longitude":-118.3,"latitude":34.1}zoom = 7tiles = air_quality_tile( client, location, pollutant="UAQI_INDIGO_PERSIAN", zoom=zoom, get_adjoining_tiles=False)
然后使用folium绘制可缩放地图!请注意,这里使用了leafmap,因为该软件包可以生成与gradio兼容的Folium地图,gradio是用于生成python应用程序简单用户界面的强大工具。参考这篇文章了解示例。
import leafmap.foliumap as leafmap
import folium
lat = location["latitude"]
lon = location["longitude"]
map = leafmap.Map(location=[lat, lon], tiles="OpenStreetMap", zoom_start=zoom)
for tile in tiles:
latmin, latmax, lonmin, lonmax = tile["bounds"]
AQ_image = tile["image"]
folium.raster_layers.ImageOverlay(
image=AQ_image,
bounds=[[latmin, lonmin], [latmax, lonmax]],
opacity=0.7
).add_to(map)
或许有些令人失望的是,在这个缩放级别上,包含我们位置的瓷砖大部分是海洋,尽管在详细地图的顶部绘制空气污染仍然很不错。如果你放大地图,你会发现交通道路信息被用来提供城市地区的空气质量信号。
设置get_adjoining_tiles=True
可以得到一个更好的地图,因为它在该缩放级别上获取了三个最接近、不重叠的瓷砖。在我们的案例中,这对于使地图更具展示性有很大帮助。
我个人更喜欢使用pollutant=US_AQI
生成的图像,但还有其他几种选择。不幸的是,该API没有返回一个颜色比例尺,尽管可以使用图像中的像素值和对颜色的了解来生成一个。
结论
感谢您一直阅读到这里!在这里,我们探讨了如何使用Google Maps空气质量API在Python中交付结果,这些结果可以用于各种有趣的应用程序。未来,我希望能够撰写另一篇关于air_quality_mapper工具的文章,随着它的进一步发展,但我希望在本文中讨论的脚本本身就能有所帮助。如往常一样,对于进一步发展的任何建议都将不胜感激!