深圳幻海软件技术有限公司 欢迎您!

用Python获取和存储时间序列数据

2023-02-28

译者| 布加迪审校| 孙淑娟本教程将介绍如何使用Python从OpenWeatherMapAPI获取时间序列数据,并将其转换成PandasDataFrame。接下来,我们将使用InfluxDBPythonClient,将该数据写入到时间序列数据平台InfluxDB。我们会将来自A

译者 | 布加迪

审校 | 孙淑娟

本教程将介绍如何使用Python从OpenWeatherMap API获取时间序列数据,并将其转换成Pandas DataFrame。接下来,我们将使用InfluxDB Python Client,将该数据写入到时间序列数据平台InfluxDB。

我们会将来自API调用的JSON响应转换成Pandas DataFrame,因为这是将数据写入到InfluxDB的最简单方法。由于InfluxDB是一个专门构建的数据库,我们写入到InfluxDB旨在满足时间序列数据在摄取方面的高要求。

要求

本教程在通过Homebrew已安装Python 3的macOS系统上完成。建议安装额外的工具,比如virtualenv、pyenv或conda-env,以简化Python和Client的安装。完整的要求在这里:

txt
influxdb-client=1.30.0
pandas=1.4.3
requests>=2.27.1
  • 1.
  • 2.
  • 3.
  • 4.

本教程还假设您已经创建Free Tier InfluxDB云帐户或正在使用InfluxDB OSS,您也已经:

  • 创建了存储桶。您可以将存储桶视为数据库或InfluxDB中最高层次的数据组织。
  • 创建了令牌。

最后,该教程要求您已经使用OpenWeatherMap创建了一个帐户,并已创建了令牌。

请求天气数据

首先,我们需要请求数据。我们将使用请求库,通过OpenWeatherMap API从指定的经度和纬度返回每小时的天气数据。

# Get time series data from OpenWeatherMap API
params = {'lat':openWeatherMap_lat, 'lon':openWeatherMap_lon, 'exclude': 
"minutely,daily", 'appid':openWeatherMap_token}
r = requests.get(openWeather_url, params = params).json()
hourly = r['hourly']
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

将数据转换成Pandas DataFrame

接下来,将JSON数据转换成Pandas DataFrame。我们还将时间戳从秒精度的Unix时间戳转换成日期时间对象。之所以进行这种转换,是由于InfluxDB写入方法要求时间戳为日期时间对象格式。接下来,我们将使用这种方法,将数据写入到InfluxDB。我们还删除了不想写入到InfluxDB的列。

python
# Convert data to Pandas DataFrame and convert timestamp to datetime 
object
df = pd.json_normalize(hourly)
df = df.drop(columns=['weather', 'pop'])
df['dt'] = pd.to_datetime(df['dt'], unit='s')
print(df.head)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.

将Pandas DataFrame写入到InfluxDB

现在为InfluxDB Python客户端库创建实例,并将DataFrame写入到InfluxDB。我们指定了测量名称。测量含有存储桶中的数据。您可以将其视为InfluxDB的数据组织中仅次于存储桶的第二高层次结构。

您还可以使用data_frame__tag_columns参数指定将哪些列转换成标签。

由于我们没有将任何列指定为标签,我们的所有列都将转换成InfluxDB中的字段。标签用于写入有关您的时间序列数据的元数据,可用于更有效地查询数据子集。字段是您在 InfluxDB中存储实际时间序列数据的位置。该文档(https://docs.influxdata.com/influxdb/cloud/reference/key-concepts/?utm_source=vendor&utm_medium=referral&utm_campaign=2022-07_spnsr-ctn_obtaining-storing-ts-pything_tns)更详细地介绍了InfluxDB中的这些数据概念。

on
# Write data to InfluxDB
with InfluxDBClient(url=url, token=token, org=org) as client:
df = df
client.write_api(write_options=SYNCHRONOUS).write(bucket=bucket,record=df,
data_frame_measurement_name="weather",
data_frame_timestamp_column="dt")
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.

完整脚本

回顾一下,不妨看看完整的脚本。 我们采取以下步骤:

1. 导入库。

2. 收集以下内容:

  • InfluxDB存储桶
  • InfluxDB组织
  • InfluxDB令牌
  • InfluxDB URL
  • OpenWeatherMap URL
  • OpenWeatherMap 令牌

3. 创建请求。

4. 将JSON响应转换成Pandas DataFrame。

5. 删除您不想写入到InfluxDB的任何列。

6. 将时间戳列从Unix时间转换成Pandas日期时间对象。

7. 为InfluxDB Python Client库创建实例。

8. 编写DataFrame,并指定测量名称和时间戳列。

python
import requests
import influxdb_client
import pandas as pd
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
bucket = "OpenWeather"
org = "" # or email you used to create your Free Tier 
InfluxDB Cloud account
token = " 
url = "" # for example, 
https://us-west-2-1.aws.cloud2.influxdata.com/
openWeatherMap_token = ""
openWeatherMap_lat = "33.44"
openWeatherMap_lon = "-94.04"
openWeather_url = "https://api.openweathermap.org/data/2.5/onecall"
# Get time series data from OpenWeatherMap API
params = {'lat':openWeatherMap_lat, 'lon':openWeatherMap_lon, 'exclude': 
"minutely,daily", 'appid':openWeatherMap_token}
r = requests.get(openWeather_url, params = params).json()
hourly = r['hourly']
# Convert data to Pandas DataFrame and convert timestamp to datetime 
object
df = pd.json_normalize(hourly)
df = df.drop(columns=['weather', 'pop'])
df['dt'] = pd.to_datetime(df['dt'], unit='s')
print(df.head)
# Write data to InfluxDB
with InfluxDBClient(url=url, token=token, org=org) as client:
df = df
client.write_api(write_options=SYNCHRONOUS).write(bucket=bucket,record=df,
data_frame_measurement_name="weather",
data_frame_timestamp_column="dt")
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.

查询数据

现在,我们已经将数据写入到InfluxDB,可以使用InfluxDB UI来查询数据了。导航到数据资源管理器(从左侧导航栏中)。使用Query Builder(查询构建器),选择想要可视化的数据和想要为之可视化的范围,然后点击“提交”。

图1. 天气数据的默认物化视图。InfluxDB自动聚合时间序列数据,这样新用户就不会意外查询太多数据而导致超时

专业提示:当您使用查询构建器查询数据时,InfluxDB自动对数据进行下采样。要查询原始数据,导航到Script Editor(脚本编辑器)以查看底层Flux查询。Flux是面向InfluxDB的原生查询和脚本语言,可用于使用您的时间序列数据来分析和创建预测。使用aggregateWindow()函数取消行注释或删除行,以查看原始数据。

图2. 导航到脚本编辑器,并取消注释或删除aggregateWindow()函数,以查看原始天气数据

结语

但愿本文能帮助您充分利用InfluxDB Python Client库,获取时间序列数据并存储到InfluxDB中。如果您想进一步了解使用Python Client库从InfluxDB查询数据,建议您看看这篇文章(https://thenewstack.io/getting-started-with-python-and-influxdb/)。另外值得一提的是,您可以使用Flux从OpenWeatherMap API获取数据,并将其存储到InfluxDB。如果您使用InfluxDB Cloud,这意味着该Flux脚本将被托管和定期执行,因此您可以获得可靠的天气数据流,并馈入到实例中。想进一步了解如何使用Flux按用户定义的时间表获取天气数据,请阅读这篇文章(https://www.influxdata.com/blog/tldr-influxdb-tech-tips-handling-json-objects-mapping-arrays/?utm_source=vendor&utm_medium=referral&utm_campaign=2022-07_spnsr-ctn_obtaining-storing-ts-pything_tns)。