qtatsuの週報

初心者ですわぁ

DynamoDBから1MB以上のデータを取得する(boto3)

前書き

DynamoDBに蓄積したデータをグラフ表示するシステムを作成していました。開発の中途では問題なく動作し、テストコードもパスしていたのですが、ある時表示されるはずのグラフの一部が描画されていないことに気がつきました。 その原因はページネーションでした。DynamoDBのscanやqueryメソッドでは、指定したデータの全てを 1回のリクエストで取得できるとは限りません。 1MBまでしか取得できず、残りのデータを取得するにはもう一度DynamoDBにリクエストする必要があります。

本記事では、boto3からscanおよびqueryを実行してDynamoDBから1MBを越えるデータを取得する方法と注意点についてまとめます。

結論

  • DynamoDBからのレスポンスに含まれるLastEvaluatedKeyを調べ、whileループ中でデータ取得のリクエストを投げ続けることで取得します。
  • コードだけ知りたい方はこちら

参考リンク

DynamoDBのページネートを説明している公式ドキュメント。

Working with Scans in DynamoDB - Amazon DynamoDB

boto3でscanを行うサンプルコード公式ドキュメント。

Step 4: Query and Scan the Data - Amazon DynamoDB

環境

バージョン
MacOS Catalina 10.15.6
Python3 3.8.2
boto3 1.14.16

DynamoDB

  • ローカルでdockerを使って動かせる、公式のツール DynamoDB Local を利用して擬似的なDynamoDBを用意します。
  • 導入は、よろしければ以下の記事を参考にしてください。

ローカルでDynamoDBを動かす - qtatsuの週報

DynamoDBのセットアップ

  • 以下はpythonの対話モードから実行しました

テーブルの作成

ハッシュキーとして"name"、レンジキーとして"size"を持つテーブル、images を作成します。

import boto3
# ローカルのDynamoDBを使います。(上記記事参照)
dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')

table_name = 'images'
key_schema = [
    {
        "AttributeName": 'name',
        "KeyType": 'HASH',
    },
    {
         "AttributeName": 'size',
         "KeyType": "RANGE",
     }
]
attribute_definitions = [
    {
        'AttributeName': 'name',
        'AttributeType': 'S'
    },
    {
         'AttributeName': 'size',
         'AttributeType': 'N'
    }
]
provisioned_throughput = {
    'ReadCapacityUnits': 5,
    'WriteCapacityUnits': 5
}
table = dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    ProvisionedThroughput=provisioned_throughput
)

データの投入

今回は画像のバイナリデータを投入することにします。 現実的には画像をDynamoDBに入れるべきではないと思いますが、一レコードの情報量が大きい方と、1Mbの制限に早く到達できてわかりやすいと思ったためです。

以下の作業も全てPythonの対話モードで行います。

補足: base64

Dynamodbにバイナリデータを登録する場合、base64エンコードする必要があります。

AttributeValue - Amazon DynamoDB

B An attribute of type Binary. For example:

"B": "dGhpcyB0ZXh0IGlzIGJhc2U2NC1lbmNvZGVk"

Type: Base64-encoded binary data object

base64エンコードを行うには、以下の標準ライブラリを利用します。

バイト列を渡してエンコードするという点には注意しておきます。

base64 — Base16, Base32, Base64, Base85 Data Encodings — Python 3.8.6rc1 documentation

from base64 import b64encode
b64encode(b'abcdefg1234')
from base64 import b64decode
b64decode(b'YWJjZGVmZzEyMzQ=')


scan用のデータ

画像データ(PNG)を読み込み、エンコードします。 今回はハッシュキー(name)だけ変えて、画像データは同じものを使い回そうと思います。

注意点ですが、DynamoDBは1レコードあたりの上限サイズが決まっています。大きすぎる画像は登録できないので注意します。

Service, Account, and Table Quotas in Amazon DynamoDB - Amazon DynamoDB

with open('smallreimu.png', 'rb') as b:
    img = b.read()
    encoded_img = b64encode(img)
    
# バイナリでエンコードしているので、19644バイト。
len(encoded_img)
19644

試しに一件、登録してからscanで取得してみます。

data = {'name': '1件目画像', 'size': 19644,'image': encoded_img}
table.put_item(Item=data)

table.scan()
# .....省略.......
'Count': 1
# .....省略.......

データ一件について20,000バイト程度は増えると考え、

 1Mb \fallingdotseq  1,000,000 B
だから、
 1,000,000 \div 20,000 = 50
なので、3ページ分くらいでページネートすることを想定して110件登録しましょう。

for i in range(110):
    data = {'name': f'{i}件目-複数投入', 'size': 19644,'image': encoded_img}
    table.put_item(Item=data)

table.scan()
# .....省略.......
'Count': 54,
'LastEvaluatedKey': {'size': Decimal('19644'), 'name': '47件目-複数投入'},
# .....省略.......

大体予想通りです。データは110件入れましたが、おおよそ50件ほどしか取得できない状態になっています。これは1Mbの制限を超えたため、一度のリクエストでデータを取得しきれなくなった状態です。

また、LastEvaluatedKey という項目が取得されています。これは今回のリクエストで取得できたデータの内、最後のデータのキーの値です。(なお「47件目」と表示されていますが、dynamoDBは取得する順番が投入順とは全く関係ない点には注意します。)

ページネーションして、55件目からのデータを取得するにはこのLastEvaluatedKeyの値を、ExclusiveStartKeyというパラメータでscanメソッドに渡してやればOKです。渡したキーの"続き"の値を取得できます。

scanメソッドで1Mb以上のデータを取得する場合

Step 4: Query and Scan the Data - Amazon DynamoDB この公式リンクのStep4.3: Scanの項目を参考に、合計1Mb以上のデータがあっても全てscanして取り出せるメソッドscan_allを定義します。

def scan_all(table, scan_kwargs):
    items = []
    done = False
    start_key = None
    while not done:
        if start_key:
            scan_kwargs['ExclusiveStartKey'] = start_key
        response = table.scan(**scan_kwargs)
        items.extend(response.get('Items', []))
        start_key = response.get('LastEvaluatedKey', None)
        done = start_key is None
    return items

先ほどの説明通り、レスポンスに含まれるLastEvaluatedKeyを、次回のリクエストでExclusiveStartKeyとして渡してあげればOKです。 データを全てとりつくしたら、LastEvaludatedKeyはレスポンスに含まれなくなるので、そこでストップします。

なお、scan_kwargsはscanメソッドに渡す引数となりますが(ProjectionExpressionなどを渡せます。)、今回は必要ないので空のdictを渡しておきます。

tableは、今回作成したimagesテーブルです。

data = scan_all(table, {})
len(data)
110  # 連続投入110件

全件のデータが取れました!

queryメソッドで1MB以上のデータを取得する場合

DynamoDBでは、可能な限りscanよりもqueryメソッドを利用すべきです。ある程度条件を指定してデータを絞って取得しないと、余計なリードコストと通信量、取得したデータによるメモリの圧迫が起こります。

ちなみにFilterExpressionを利用したとしても、Fileterはscanの後に行われるためリードにかかるコストは節約できていません。受けとるデータは減るようなので、メモリは節約できると思います。

queryメソッドを使った場合も、scanと全く同じ仕組みでページネートに対処して1Mb以上のデータを取得できます。

queryは、ハッシュキーを固定で指定してレンジキーの条件を指定、という形でしか使えないので、同一のハッシュキー(name)かつ、ことなるレンジキー(size)のデータを新しく作ります。

sizeにfor文の回した回数を入れているのでデータとしてはおかしいですが、そこは目をつぶります...

for i in range(110):
    data = {'name': f'query用のデータ', 'size': i,'image': encoded_img}
    table.put_item(Item=data)

queryを、size > 10 のデータを取得するように実行すると以下の様になります。

from boto3.dynamodb.conditions import Key

t.query(KeyConditionExpression=Key('name').eq('query用のデータ') & Key('size').gt(10))

# .....省略.......
'Count': 54,
'LastEvaluatedKey': {'size': Decimal('64'), 'name': 'query用のデータ'},
# .....省略.......

scanと全く同じ仕組みの関数を定義して、全件取得します。

def query_all(table, query_kwargs):
    items = []
    done = False
    start_key = None
    while not done:
        if start_key:
            query_kwargs['ExclusiveStartKey'] = start_key
        response = table.query(**query_kwargs)
        items.extend(response.get('Items', []))
        start_key = response.get('LastEvaluatedKey', None)
        done = start_key is None
    return items
res = query_all(table, {'KeyConditionExpression': Key('name').eq('query用のデータ') & Key('size').gt(10)}) 


len(res)
99  # 110件のデータのうち、0 ~ 10の11件は弾かれるのであっている。

注意: limit句がある場合には気をつける

上記のようなラッパー関数を定義すればscanやqueryで1Mb以上のデータをページネートして取得可能なのですが、注意が必要なケースがあります。

それは limit句を使用していた場合です。

通常のlimit句は、以下の様な挙動をします。直感通り、limitで指定した件数だけデータを取得します。

table.scan(Limit=2)

{'Items': 
 [
   {'name': '11件目-複数投入',
    'size': Decimal('19644'),
    'image': Binary(b'iVBORw0KGgoAAAANS.....')}, # 省略
   {'name': '91件目-複数投入',
    'size': Decimal('19644'),
    'image': Binary(b'iVBORw0KGgoAAAANS.....')}, # 省略
 ]
 'Count': 2,
 'ScannedCount': 2,
 'LastEvaluatedKey': {'size': Decimal('19644'), 'name': '91件目-複数投入'},
 # ............省略..............  
   

レスポンスにLastEvaluatedKeyが入ってきます。これはqueryメソッドも同様です。

例えばこのLimitScanIndexForwardを組み合わせ、DynamoDBからレンジキーを用いて、「最大の値」のItemを取得する様な処理を実行したいとします。

上記のLastEvaluatedKeyを処理してページネートする関数にうっかり通してしまうと1リクエストで1件データ取得という処理を、dynamodbに入っている全てのデータをとり尽くすまで実行してしまうということになりかねません。

DynamoDBを操作するラッパークラスを定義するのは良いですが、あまり密な実装にならない様にした方が良さそうです。

まとめ

  • DynamoDBは1回のリクエストで1Mbまでのデータしか取得できない。
  • 1Mb以上のデータをscanやqueryで取得するにはページネートする必要がある。
  • LastEvaluatedKeyに、「最後に取得した」データのキーが入っている。これを次のリクエストのExclusiveStartKeyに指定することでページネートを実現する。
  • 最終的にLastEvaluatedKeyがレスポンスに含まれていないということが、データを全てとり尽くしたことの証明になる。
  • 上記はwhile文で処理するとわかりやすい。
  • Limitを指定した場合にもLastEvaluatedKeyは返ってくる。Limitを使う場合にはwhileによるページネーションを併用しない様に気をつける。