- 前書き
- 注意点
- 参考リンク
- 環境
- 全体像
- 0. 事前準備
- 1. S3とDynamoDBの作成
- 2. PipelineのRoleを作成
- 3. Amazon OpenSearch Collectionの作成
- 4. Policyの作成
- 5. Pipelineの作成
- 6. テスト実行
- リソースの削除
- 余談: ChatGPT と AWS CLI
- まとめ
前書き
2023年の年末に公開された、DynamoDBとOpenSearch Serviceのzero-ETL integrationを AWS CLI で構築する手順を記録しておきます。 zero-ETL integrationは、DynamoDBに投入したデータをOpenSearch Serviceに同期させる仕組みです。 イベントを拾ってLambdaでOpenSearchに投入する処理を自前で書く必要が無くなるため、コーディングの面でもリソース管理の面でも便利になるはずです。
以下の作業は全てコピペで実行できます。(※削除手順も記載してあります)
注意点
リソースは課金されます。 テストが終わったら削除しておきましょう。以下の手順を実行して発生する問題について、筆者は一切の責任を取ることができません。自己責任でお願いします。
参考リンク
AWS公式の紹介記事です。
Amazon DynamoDB の Amazon OpenSearch Service とのゼロ ETL 統合が利用可能になりました | Amazon Web Services ブログ
公式チュートリアルです。こちらはGUIベースです。本記事では、下のcollection(serverless)版のリソースを、AWS CLIを使って構築します。
Tutorial: Ingesting data into a domain using Amazon OpenSearch Ingestion - Amazon OpenSearch Service
DynamoDB zero-ETL integration with Amazon OpenSearch Service - Amazon DynamoDB
AWS CLIの基本となる実行手順について、以下のシリーズを参考にさせていただいています。
環境
バージョン | |
---|---|
MacOS Sonoma | 14.4.1 |
AWS CLI | 2.15.34 |
awscurl | 0.33 |
全体像
以下のリソースを作ります。
- Amazon S3
- Amazon DynamoDBのTable
- Amazon OpenSearch Service の collection
- 3種のポリシー
- Data access policies
- Encryption policies
- Network policies
- 3種のポリシー
- Pipeline
- IAM Role
- IAM Policy : OpenSearchとDynamoDBへのアクセス権
Pipelineは以下の働きをします。
- DynamoDBの監視(データが投入されたことを感知)
- OpenSearchへのデータ投入/削除/更新
- S3へバックアップなどをアップロード
また、Amazon OpenSearch Serviceはリソースベースのポリシーを持ちます。 IAM RoleでAmazon OpenSearch Serviceへのアクセスを許可するだけでは不十分で、Amazon OpenSearch Serviceの Data access policies でIAM Roleに対して許可を出す必要があります(後述)。
0. 事前準備
AWS CLIを利用できるようにしておきます。 また、CLIを実行するユーザに必要な権限をつけておきます。
作成するリソース名などを、変数で定義しておきます。以下をシェルで実行し、後半の手順に進んでください。
export AWS_DEFAULT_REGION='ap-northeast-1' COLLECTION_NAME=ingestion-collection PIPELINE_NAME=serverless-ingestion TABLE_NAME=ingestion-table BUCKET_NAME="ingestion-dynamodb" BUCKET_ARN=arn:aws:s3:::${BUCKET_NAME} && echo $BUCKET_ARN PATH_PREFIX1="opensearch-export" PATH_PREFIX2="dynamodb-pipeline" IAM_POLICY_NAME=pipeline-policy IAM_ROLE_NAME=PipelineRole ACCOUNT_ID=$(aws sts get-caller-identity --query 'Account' --output text) && echo $ACCOUNT_ID COLLECTION_ARN="arn:aws:es:${AWS_DEFAULT_REGION}:${ACCOUNT_ID}:domain/${COLLECTION_NAME}" && echo $COLLECTION_ARN
1. S3とDynamoDBの作成
S3のBucketを作成します。
aws s3api create-bucket --bucket $BUCKET_NAME --create-bucket-configuration LocationConstraint=$AWS_DEFAULT_REGION
完了確認をします。指定した名前のbucketができていれば成功です。
aws s3 ls | grep $BUCKET_NAME
DynamoDBを作成します。 今回は、nameとageフィールドを持ち、スループットを最小に固定したテーブルを作成することにします。(適当に変更しても大丈夫です)
aws dynamodb create-table \ --table-name $TABLE_NAME \ --attribute-definitions \ AttributeName=name,AttributeType=S \ AttributeName=age,AttributeType=N \ --key-schema \ AttributeName=name,KeyType=HASH \ AttributeName=age,KeyType=RANGE \ --provisioned-throughput \ ReadCapacityUnits=1,WriteCapacityUnits=1
完了確認を兼ねて、テーブルのARNを取得しておきます。
TABLE_ARN=$(aws dynamodb describe-table --table-name $TABLE_NAME --query Table.TableArn --output text) && echo $TABLE_ARN
PITR(Point-in-Time-Recovery)の有効化を行います。OpenSearch Ingestion初期データ利用時に必要です。
aws dynamodb update-continuous-backups --table-name $TABLE_NAME --point-in-time-recovery-specification PointInTimeRecoveryEnabled=true # 確認 aws dynamodb describe-continuous-backups --table-name $TABLE_NAME
DynamoDB StreamをNEW_IMAGEに変更します。 ( ※変更発生時、新しいイメージをキャプチャできるようになります。)
aws dynamodb update-table --table-name $TABLE_NAME --stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE # 確認 aws dynamodb describe-table --table-name $TABLE_NAME --query Table.StreamSpecification
2. PipelineのRoleを作成
先にRoleだけ作ります。Policyは手順4で作成します(collectionを作成し、そのIDを手に入れてからpolicyを作成したいためです。)
まずは Trust policy をファイルに保存しておきます。 「このRoleを使うのはosis-pipelines.amazonaws.comですよ」と示すためのものです。
IAM_ROLE_PRINCIPAL='osis-pipelines.amazonaws.com' FILE_IAM_ROLE_DOC="role-document.json" cat << EOF > ${FILE_IAM_ROLE_DOC} { "Version": "2012-10-17", "Statement": [ { "Action": "sts:AssumeRole", "Principal": { "Service": "${IAM_ROLE_PRINCIPAL}" }, "Effect": "Allow", "Sid": "" } ] } EOF cat ${FILE_IAM_ROLE_DOC}
上記のTrust policyを指定してRoleを作成します。
aws iam create-role --role-name $IAM_ROLE_NAME --assume-role-policy-document file://$FILE_IAM_ROLE_DOC
作成したRoleを確認します。
aws iam list-roles --query "Roles[?RoleName == '${IAM_ROLE_NAME}'].RoleName"
3. Amazon OpenSearch Collectionの作成
少し複雑ですが、以下のリソースを作成します。
- Data access policies
- リソースポリシーです。先ほど作成したRoleにアクセス許可を出します。
- Encryption policies
- 必須です。OpenSearch collection保存データの暗号化方法を指定します。
- Network policies
- collection
- 今回はDomainではなく、OpenSearchのserverless版を使います。
ポイント: 中間テーブルのようなオブジェクトはありません。単に作る予定のcollection名を指定した各種ポリシーを前もって作成しておくことでポリシーを有効化します。 特にEncryption policiesは先に作っておかないと、collectionを作成することができません。
Data access policiesの作成
policy documentをファイルに保存しておきます。 今回はCLIユーザと先ほど作成したRoleを許可します。GUIでも確認したければ、AWSのマネコンにログインしているユーザも追加で指定してください。(後から追加することもできます)
FILE_ACCESS_POLICY_DOC="access-policy-document.json" CALLER_ARN=$(aws sts get-caller-identity --query Arn --output text) && echo $CALLER_ARN IAM_ROLE_ARN=$(aws iam get-role --role-name $IAM_ROLE_NAME --query Role.Arn --output text) && echo $IAM_ROLE_ARN cat << EOS > $FILE_ACCESS_POLICY_DOC [ { "Rules": [ { "Resource": [ "index/${COLLECTION_NAME}/*" ], "Permission": [ "aoss:CreateIndex", "aoss:UpdateIndex", "aoss:DescribeIndex", "aoss:ReadDocument", "aoss:WriteDocument" ], "ResourceType": "index" } ], "Principal": [ "${IAM_ROLE_ARN}", "${CALLER_ARN}" ], "Description": "Rule 1" } ] EOS cat $FILE_ACCESS_POLICY_DOC
Data access policiesを作成します。
詳しくはaws opensearchserverless create-access-policy help
をみてください。
なお、2024/06現在 typeはdata以外選択できません。
ACCESS_POLICY_NAME=access-policy aws opensearchserverless create-access-policy --name $ACCESS_POLICY_NAME --policy file://$FILE_ACCESS_POLICY_DOC --type data
Encryption policiesの作成
policy documentをファイルに保存しておきます。今回はAWSが提供するkeyを使うので、AWSOwnedKeyをtrueにします。
FILE_ENC_POLICY_DOC="access-policy-document.json" cat << EOS > $FILE_ENC_POLICY_DOC { "Rules": [ { "ResourceType": "collection", "Resource": [ "collection/${COLLECTION_NAME}" ] } ], "AWSOwnedKey": true } EOS cat $FILE_ENC_POLICY_DOC
Encryption policiesを作成します。 わかりにくいですが、こちらはcreate-security-policyのAPIを使い、typeとしてencryptionを指定します。
ENC_POLICY_NAME=encryption-policy aws opensearchserverless create-security-policy --name $ENC_POLICY_NAME --policy file://$FILE_ENC_POLICY_DOC --type encryption
Network policiesの作成
Network policiesを作成します。 policy documentをまずファイルに書き込みます
FILE_NET_POLICY_DOC="access-policy-document.json" cat << EOS > $FILE_NET_POLICY_DOC [ { "Rules": [ { "ResourceType": "dashboard", "Resource": [ "collection/${COLLECTION_NAME}"] }, { "ResourceType": "collection", "Resource": [ "collection/${COLLECTION_NAME}"] } ], "AllowFromPublic": true } ] EOS cat $FILE_NET_POLICY_DOC
Network policiesを作成します。 わかりにくいですが、こちらはcreate-security-policyのAPIを使い、typeとしてnetworkを指定します。
NET_POLICY_NAME=network-policy aws opensearchserverless create-security-policy --name $NET_POLICY_NAME --policy file://$FILE_NET_POLICY_DOC --type network
Collectionを作成します。 テスト用なのでreplicasは無効にして、typeはSEARCHにしておきます。
aws opensearchserverless create-collection --name $COLLECTION_NAME --standby-replicas DISABLED --type SEARCH
4. Policyの作成
pipelineに付与するPolicyを作成します。
まずは必要な権限をJSONで記載し、ファイルに保存しておきます。
COLLECTION_ID=$(aws opensearchserverless batch-get-collection --names $COLLECTION_NAME --query 'collectionDetails[].id' --output text) && echo $COLLECTION_ID FILE_IAM_POLICY_DOC="policy-document.json" cat << EOS > $FILE_IAM_POLICY_DOC { "Version": "2012-10-17", "Statement": [ { "Sid": "allowRunExportJob", "Effect": "Allow", "Action": [ "dynamodb:DescribeTable", "dynamodb:DescribeContinuousBackups", "dynamodb:ExportTableToPointInTime" ], "Resource": [ "${TABLE_ARN}" ] }, { "Sid": "allowCheckExportjob", "Effect": "Allow", "Action": [ "dynamodb:DescribeExport" ], "Resource": [ "${TABLE_ARN}/export/*" ] }, { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator" ], "Resource": [ "${TABLE_ARN}/stream/*" ] }, { "Sid": "allowReadAndWriteToS3ForExport", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:AbortMultipartUpload", "s3:PutObject", "s3:PutObjectAcl" ], "Resource": [ "${BUCKET_ARN}/${PATH_PREFIX1}/*", "${BUCKET_ARN}/${PATH_PREFIX2}/*" ] }, { "Action": [ "aoss:BatchGetCollection", "aoss:APIAccessAll" ], "Effect": "Allow", "Resource": "arn:aws:aoss:${AWS_DEFAULT_REGION}:${ACCOUNT_ID}:collection/${COLLECTION_ID}" }, { "Action": [ "aoss:CreateSecurityPolicy", "aoss:GetSecurityPolicy", "aoss:UpdateSecurityPolicy" ], "Effect": "Allow", "Resource": "*", "Condition": { "StringEquals": { "aoss:collection": "${COLLECTION_NAME}" } } } ] } EOS cat $FILE_IAM_POLICY_DOC
Policyを作成します。
aws iam create-policy --policy-name ${IAM_POLICY_NAME} --policy-document file://$FILE_IAM_POLICY_DOC
作成したPolicyを確認します。
IAM_POLICY_ARN="arn:aws:iam::${ACCOUNT_ID}:policy/${IAM_POLICY_NAME}" && echo $IAM_POLICY_ARN aws iam get-policy --policy-arn $IAM_POLICY_ARN
手順2で作成したpipeline用のRoleに、Policyを紐つけます。
aws iam attach-role-policy --role-name $IAM_ROLE_NAME --policy-arn $IAM_POLICY_ARN
Policyが紐ついていることを確認します。
aws iam list-attached-role-policies --role-name $IAM_ROLE_NAME
5. Pipelineの作成
こちらの公式ドキュメントも参考にしてください。
Amazon OpenSearch Ingestion パイプラインの作成 - Amazon OpenSearch サービス
PipelineのETL処理に当たる部分はymlで定義します。0から記載するのは大変なので、公式が用意しているblueprintを使います。 blueprintの一覧を確認し、DynamoDBの名前が入っているものを探します。
aws osis list-pipeline-blueprints
AWS-DynamoDBChangeDataCapturePipeline
を使いましょう。
blueprintを手に入れます。
aws osis get-pipeline-blueprint --blueprint-name AWS-DynamoDBChangeDataCapturePipeline --query Blueprint.PipelineConfigurationBody --output text > blueprint.yml
blueprintを元に、ETL(DynamoDBのテーブルの中身をOpenSearchにコピーする)に必要な情報を設定していきます。
hostsの設定内容については、collectionが作成完了し、endpointが手に入るまでしばらく待ちます。 以下のコマンドがCREATINGだと、endpointが得られません。ACTIVEになるまで待ちます。
aws opensearchserverless batch-get-collection --names $COLLECTION_NAME --query 'collectionDetails[].status'
collectionが完了していれば、以下を使ってymlを書いてもOKです。(blueprintを編集した後の内容例です。)
TABLE_ARN=$(aws dynamodb describe-table --table-name $TABLE_NAME --query Table.TableArn --output text) && echo $TABLE_ARN IAM_ROLE_ARN=$(aws iam get-role --role-name $IAM_ROLE_NAME --query Role.Arn --output text) && echo $IAM_ROLE_ARN HOST=$(aws opensearchserverless batch-get-collection --names $COLLECTION_NAME --query 'collectionDetails[].collectionEndpoint' --output text) && echo $HOST FILE_INGESTION_DOCUMENT=injestion.yml cat << EOS > $FILE_INGESTION_DOCUMENT version: "2" dynamodb-pipeline: source: dynamodb: acknowledgments: true tables: - table_arn: "${TABLE_ARN}" stream: start_position: "LATEST" export: s3_bucket: "${BUCKET_NAME}" s3_region: "${AWS_DEFAULT_REGION}" s3_prefix: "${PATH_PREFIX1}/" aws: sts_role_arn: "${IAM_ROLE_ARN}" region: "${AWS_DEFAULT_REGION}" sink: - opensearch: hosts: [ "${HOST}" ] index: "\${getMetadata(\"table_name\")}" index_type: custom normalize_index: true document_id: "\${getMetadata(\"primary_key\")}" action: "\${getMetadata(\"opensearch_action\")}" document_version: "\${getMetadata(\"document_version\")}" document_version_type: "external" aws: sts_role_arn: "${IAM_ROLE_ARN}" region: "${AWS_DEFAULT_REGION}" serverless: true dlq: s3: bucket: "${BUCKET_NAME}" key_path_prefix: "${PATH_PREFIX2}/dlq" region: "${AWS_DEFAULT_REGION}" sts_role_arn: "${IAM_ROLE_ARN}" EOS cat $FILE_INGESTION_DOCUMENT
パイプラインを作成します。 min/max-unitは読み書きのcapacityで、時間あたりの課金に影響します。 OpenSearch Compute Unit (OCU)と言われる単位です。
aws osis create-pipeline \ --pipeline-name $PIPELINE_NAME \ --min-units 1\ --max-units 2\ --pipeline-configuration-body file://${FILE_INGESTION_DOCUMENT}
こちらも作成に時間が数分かかります。
6. テスト実行
DynamoDBにデータを投入してみます。
aws dynamodb put-item \ --table-name $TABLE_NAME \ --item '{"name": {"S": "saki"}, "age": {"N": "16"}, "height": {"N": "152"}}' aws dynamodb put-item \ --table-name $TABLE_NAME \ --item '{"name": {"S": "temari"}, "age": {"N": "15"}, "height": {"N": "162"}}' aws dynamodb put-item \ --table-name $TABLE_NAME \ --item '{"name": {"S": "kotone"}, "age": {"N": "15"}, "height": {"N": "156"}}'
OpenSearchを確認します。反映されるまで、少し時間がかかるかもしれません。
awscurl --service aoss --region $AWS_DEFAULT_REGION -X GET ${HOST}/_cat/indices awscurl --service aoss --region $AWS_DEFAULT_REGION -X GET ${HOST}/${TABLE_NAME}/_search | jq .
以下のように、DynamoDBに投入したデータがOpenSearchに反映されていれば成功です!
{ "took": 1494, "timed_out": false, "_shards": { "total": 0, "successful": 0, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 1, "hits": [ { "_index": "ingestion-table", "_id": "temari|15", "_score": 1, "_source": { "name": "temari", "age": 15, "height": 162 } }, { "_index": "ingestion-table", "_id": "kotone|15", "_score": 1, "_source": { "name": "kotone", "age": 15, "height": 156 } }, { "_index": "ingestion-table", "_id": "saki|16", "_score": 1, "_source": { "name": "saki", "age": 16, "height": 152 } } ] } }
リソースの削除
以下の順にリソースを削除していきます。
Pipeline
aws osis delete-pipeline --pipeline-name $PIPELINE_NAME
Amazon OpenSearch Service Collection
COLLECTION_ID=$( aws opensearchserverless list-collections --query "collectionSummaries[?name=='${COLLECTION_NAME}'].id" --output text) && $COLLECTION_ID aws opensearchserverless delete-collection --id $COLLECTION_ID
Amazon OpenSearch Service 3種のポリシー
aws opensearchserverless delete-access-policy --name $ACCESS_POLICY_NAME --type data aws opensearchserverless delete-security-policy --name $ENC_POLICY_NAME --type encryption aws opensearchserverless delete-security-policy --name $NET_POLICY_NAME --type network
Amazn DynamoDB
aws dynamodb delete-table --table-name $TABLE_NAME
Amazn S3
aws s3 rm s3://$BUCKET_NAME --recursive aws s3api delete-bucket --bucket $BUCKET_NAME
Pipelineに付与していたPolicyとRole
aws iam detach-role-policy --role-name $IAM_ROLE_NAME --policy-arn $IAM_POLICY_ARN aws iam delete-policy --policy-arn $IAM_POLICY_ARN aws iam delete-role --role-name $IAM_ROLE_NAME
余談: ChatGPT と AWS CLI
AWS CLI や CDK はChatGPTと相性がよく、指示が適切であればリソース作成においてとても便利です。しかし、複数のリソースを組み合わせた正確な構成手順を一度に出力するのはまだまだ難しいようです。
個人的にChatGPTが便利だと感じるのは以下のケースです。
AWS CLI や CDK はリソースの作成手順を残せると言う意味で重宝していますが、パラメータなどを調べるのがやや面倒です。 個人的にはaws cliのヘルプとCDKの予測変換とドキュメント(typescript)を併用しつつ、ChatGPTに質問しながら最終的にCDKのコードに落とし込むのが一番楽だと感じています。
まとめ
Amazon DynamodbとAmazon OpenSearch Serviceのzero-ETL integrationをAWS CLIで構築する手順をまとめました。 AWSはリソースが色々あってややこしいですが、GUIのチュートリアルをこなした後に同じものをCLIで構築し、ノートにまとめておくと理解が深まると思っています。 CDKはCLIと同じAPIの上に作られているので、後々CDKを記述する時も楽になります。