@suzu6

主にWEBと解析の技術について書きます。

広く浅くも続ければ深くなるはず。

【Python】Boto3でDynamoDBにデータを保存する

OpenWeatherMapで取得したデータをDynamoDBに保存します。 Pythonでコードを書き、AWS Lambdaを使って定期的に気象情報を保存することを目的としています。 DynamoDBの操作はAWSのSDKであるBoto3を利用しました。

OpenWeatherMapで都市名から気象情報を取得するの続きです。
ファイルの権限を変更できない。Googleドライブのシンボリックリンクが原因でした こんなこともあったので環境を少し変えています。

今回はローカルからBoto3でデータを保存するところまで。

環境

  • Mac
  • Python 3.6
  • Boto3 : v1.9.86
  • Requests : v2.21

Requestsはインストール済みとします。

Boto3

AWS謹製のPythonモジュール。 Boto3を利用するとAmazon S3、Amazon EC2、Amazon DynamoDB などAWSの各種サービスと容易に統合できます。

ドキュメントはこちら

インストール方法

pipでインストールできます。

$ pip install boto3

DynamoDBにテーブルを用意する

DynamoDBはNoSQLです。 PostgreSQLなどのリレーショナルDBとデータの持ち方が違うため、まずそれ用のテーブルの設計をします。

テーブル設計

保存したいのは気象情報の時系列データです。 また、複数の観測点がありそれぞれ都市名が付いています。

よって、プライマリパーティションキーcity_name (文字列)プライマリソートキーtimestamp (数値)にしました。 これで、都市ごとのとある期間のデータをまとめて取得できるはず。

その他の項目は自由に追加できます。 気象データは項目dataにJSON形式でぶち込む予定です。

hongkong

登録してみた香港の気象データ。 dataの中身は変更があると思うので適当。

テーブル名はcurrent-weatherとした。 scanなどテーブル全体の操作で無駄を減らすため、月ごとにテーブルを分けていく。 年月をつけてcurrent-weather.201901とした。

テーブルを作成する

設計を基にDynamoDBに気象情報を溜め込むテーブルを作成します。 リージョンは**アジアパシフィック(東京)**にしています。

createdb

ダッシュボードからテーブルを作成する。

init-dynamodb

テーブル名、プライマーキーを設定する。 ソートキーとしてtimestampを登録した。

AWSのアクセスキーを取得する。

セキュリティのため、AWSの各種サービスの操作にはアクセスキーが必要です。 DynamoDBの読み書きやテーブルの作成するためAmazonDynamoDBFullAccessを持ったユーザーのアクセスキーを取得します。

マスターユーザーで試すことは良いですが、アクセスキーが流出したときえらいことになるので権限を絞ったユーザーを使います。

サービスのIAMから、左メニューの「ユーザー」→「ユーザーを追加」から気象情報をDynamoDBに書き込むユーザーを追加します。

user-name

名前をweatherAPIとして「プログラムによるアクセス」をチェックする。

policy

「既存のポリシーを直接アタッチ」を選んで、AmazonDynamoDBFullAccessを探してチェックします。 データを読むだけならAmazonDynamoDBReadOnlyAccessがあります。

access-key

必要であればタグを付け、進めるとアクセスキーが表示されます。 アクセスキーIDシークレットアクセスキーを保存してください。 シークレットアクセスキーはここでしか見ることができない(はず)です。 csvも保存できます。

くれぐれも、Githubにアクセスキーを上げないよう気をつけてください。

※ 正直このセキュリティ関連は詳しくないため、公式や他のサイトも参考にしてください。

プログラムからデータを保存する

やっと本題。 気象情報を取得する部分は前回作成したコードを使います。

全体像は、定期的に気象情報を取得して保存するプログラム。

  • 複数の都市名をキーに現在の気象情報を取得する。
  • データを整形する。
  • 都市名をパーティションキーにしてDynamoDBに保存する。

※ テーブルを月ごとに分けるよう設計していますが、テーブルの作成はまだ機能していません。

以下のソースコード部分を繋いで一つのファイルにすると実行できます。

使用するモジュール

"""
OpenWeatherMapから現在の気象データを取得してDynamoDBに保存する。
"""

import boto3    # AWSのSDK
import requests # OpenWeatheMapのAPI取得
import json
import datetime
import decimal  # DynamoDB用にFloat型をDecimal型に変換する

boto3をインポートする。 DynamoDBにFloat型がないためDecimal型に変換するようにdecimalを利用する。decimalは標準モジュール。

気象情報の取得

詳細は前回を見てください。

def getWheather(city_name):
    API_KEY = "xxxxxxxxxxxxxxxxxxxxxxxx"  # xxxにOWMのAPI Keyを入力。
    api = "http://api.openweathermap.org/data/2.5/weather?units=metric&q={city}&APPID={key}"

    url = api.format(city=city_name, key=API_KEY)
    print(url)

    response = requests.get(url)
    # APIレスポンスの表示
    jsonText = json.dumps(response.json(), indent=2)
    print(jsonText)

    return response.json(parse_float=decimal.Decimal) # dict型に変換する際にFloatをDecimalにする

Requestsで受け取ったresponseをdict型にする際、Float型をDecimal型にします。 そうしないと以下のエラーが出ます。

TypeError: Float types are not supported. Use Decimal types instead.

参考:DynamoDB x Python / Decimal を登録する

データを整形する

DynamoDBのテーブルに合わせてデータを整形する。

def formatter(response):
    data = response
    # 表示用時刻
    unix = data["dt"]
    now = datetime.datetime.fromtimestamp(unix)

    # 保存するデータを作成
    item = {
        'city_name': data['name'],  # プライマリパーティションキー
        'timestamp': data["dt"],   # プライマリソートキー
        'datetime': now.strftime("%Y-%m-%dT%H:%M:%S"),
        'latitude': data['coord']['lat'],
        'longitude': data['coord']['lon'],
        'data': {
            'weather': data['weather'],
            'temp': data['main']['temp'],
            'humidity': data['main']['humidity'],
            'pressure': data['main']['pressure'],
            'clouds': data['clouds']['all'],
            'wind': data['wind']
        }
    }
    return item

itemの形式で保存される。 timestampはUNIX時間。

DynamoDBへ登録する

前半にDynamoDBと接続をして、後半にitemを保存している。

先ほど登録したAWSのユーザーのアクセスキーを渡す。 コードに書きたくない場合は、設定ファイルや環境変数に登録して使うと良い。

def insert(items):
    # データベース接続の初期化
    session = boto3.session.Session(
        region_name='DBのリージョン',
        aws_access_key_id='アクセスキーID',
        aws_secret_access_key='シークレットアクセスキー'
    )
    dynamodb = session.resource('dynamodb')

    # tableのパーティション用
    now = datetime.datetime.now()
    ym = now.strftime("%Y%m")

    # テーブルと接続
    table_name = 'current-weather.' + ym
    table = dynamodb.Table(table_name)

    for item in items:
        # 追加する
        response = table.put_item(
            TableName=table_name,
            Item=item
        )
        if response['ResponseMetadata']['HTTPStatusCode'] is not 200:
            # 失敗処理
            print(response)
        else:
            # 成功処理
            print('Successed :', item['city_name'])
    return

table.put_item()ではなくtable..batch_writer()を使い一括で登録しても良さそう。

本体部分

取得する都市の名前をリストで持ち、順次処理していく。

def weather_api(cities):
    items = []

    for city_name in cities:
        response = getWheather(city_name)

        if response['cod'] is not 200:
            print(city_name, ', status code :',
                  response['cod'], response['sys']['message'])
            continue
        item = formatter(response)
        print(item)
        items.append(item)

    insert(items)
    return True


def lambda_handler(event, context):
    # 都市と送信先を指定する。
    # OWMで指定可能な都市名は以下で確認する。
    # https://openweathermap.org/weathermap?basemap=map&cities=true&layer=temperature&lat=44.0079&lon=144.2487&zoom=12
    cities = [
        'Sapporo',
        'Tokyo',
        'Osaka-shi',
        'Okinawa',
        'Hong Kong',
        'New York',
        'City of London'
    ]
    weather_api(cities)

    return


if __name__ == "__main__":
    lambda_handler(None, None)

終わりに

無事にDynamoDBに気象データを保存することができました。 続いてはAWS Lambdaにあげて定期的に取得するように自動化していきます。

AWS LambdaにPythonコードをzipでアップして実行する方法を詳しく