qtatsuの週報

Python/Django/TypeScript/React/AWS

【AWS CLI】Amazon DynamoDBとAmazon OpenSearch Serverlessのzero-ETL integrationを AWS CLI で構築する

前書き

2023年の年末に公開された、DynamoDBとOpenSearch Serviceのzero-ETL integrationを AWS CLI で構築する手順を記録しておきます。 zero-ETL integrationは、DynamoDBに投入したデータをOpenSearch Serviceに同期させる仕組みです。 イベントを拾ってLambdaでOpenSearchに投入する処理を自前で書く必要が無くなるため、コーディングの面でもリソース管理の面でも便利になるはずです。

以下の作業は全てコピペで実行できます。(※削除手順も記載してあります)

注意点

リソースは課金されます。 テストが終わったら削除しておきましょう。以下の手順を実行して発生する問題について、筆者は一切の責任を取ることができません。自己責任でお願いします。

参考リンク

AWS公式の紹介記事です。

Amazon DynamoDB の Amazon OpenSearch Service とのゼロ ETL 統合が利用可能になりました | Amazon Web Services ブログ

公式チュートリアルです。こちらはGUIベースです。本記事では、下のcollection(serverless)版のリソースを、AWS CLIを使って構築します。

Tutorial: Ingesting data into a domain using Amazon OpenSearch Ingestion - Amazon OpenSearch Service

Tutorial: Ingesting data into a collection using Amazon OpenSearch Ingestion - Amazon OpenSearch Service

DynamoDB zero-ETL integration with Amazon OpenSearch Service - Amazon DynamoDB

AWS CLIの基本となる実行手順について、以下のシリーズを参考にさせていただいています。

JAWS-UG CLI専門支部 - connpass

環境

バージョン
MacOS Sonoma 14.4.1
AWS CLI 2.15.34
awscurl 0.33

全体像

以下のリソースを作ります。

figure1

Pipelineは以下の働きをします。

  1. DynamoDBの監視(データが投入されたことを感知)
  2. OpenSearchへのデータ投入/削除/更新
  3. S3へバックアップなどをアップロード

また、Amazon OpenSearch Serviceはリソースベースのポリシーを持ちます。 IAM RoleでAmazon OpenSearch Serviceへのアクセスを許可するだけでは不十分で、Amazon OpenSearch Serviceの Data access policies でIAM Roleに対して許可を出す必要があります(後述)。

0. 事前準備

AWS CLIを利用できるようにしておきます。 また、CLIを実行するユーザに必要な権限をつけておきます。

作成するリソース名などを、変数で定義しておきます。以下をシェルで実行し、後半の手順に進んでください。

export AWS_DEFAULT_REGION='ap-northeast-1'

COLLECTION_NAME=ingestion-collection
PIPELINE_NAME=serverless-ingestion

TABLE_NAME=ingestion-table

BUCKET_NAME="ingestion-dynamodb"
BUCKET_ARN=arn:aws:s3:::${BUCKET_NAME} && echo $BUCKET_ARN
PATH_PREFIX1="opensearch-export"
PATH_PREFIX2="dynamodb-pipeline"

IAM_POLICY_NAME=pipeline-policy
IAM_ROLE_NAME=PipelineRole
ACCOUNT_ID=$(aws sts get-caller-identity --query 'Account' --output text) && echo $ACCOUNT_ID
COLLECTION_ARN="arn:aws:es:${AWS_DEFAULT_REGION}:${ACCOUNT_ID}:domain/${COLLECTION_NAME}" && echo $COLLECTION_ARN

1. S3とDynamoDBの作成

S3のBucketを作成します。

aws s3api create-bucket --bucket $BUCKET_NAME --create-bucket-configuration LocationConstraint=$AWS_DEFAULT_REGION

完了確認をします。指定した名前のbucketができていれば成功です。

aws s3 ls | grep $BUCKET_NAME

DynamoDBを作成します。 今回は、nameとageフィールドを持ち、スループットを最小に固定したテーブルを作成することにします。(適当に変更しても大丈夫です)

aws dynamodb create-table \
    --table-name $TABLE_NAME \
    --attribute-definitions \
        AttributeName=name,AttributeType=S \
        AttributeName=age,AttributeType=N \
    --key-schema \
        AttributeName=name,KeyType=HASH \
        AttributeName=age,KeyType=RANGE \
    --provisioned-throughput \
        ReadCapacityUnits=1,WriteCapacityUnits=1

完了確認を兼ねて、テーブルのARNを取得しておきます。

TABLE_ARN=$(aws dynamodb describe-table --table-name $TABLE_NAME --query Table.TableArn --output text) && echo $TABLE_ARN

PITR(Point-in-Time-Recovery)の有効化を行います。OpenSearch Ingestion初期データ利用時に必要です。

aws dynamodb update-continuous-backups --table-name $TABLE_NAME --point-in-time-recovery-specification PointInTimeRecoveryEnabled=true

# 確認
aws dynamodb describe-continuous-backups --table-name $TABLE_NAME 

DynamoDB StreamをNEW_IMAGEに変更します。 ( ※変更発生時、新しいイメージをキャプチャできるようになります。)

aws dynamodb update-table --table-name $TABLE_NAME --stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE

# 確認
aws dynamodb describe-table --table-name $TABLE_NAME --query Table.StreamSpecification

2. PipelineのRoleを作成

先にRoleだけ作ります。Policyは手順4で作成します(collectionを作成し、そのIDを手に入れてからpolicyを作成したいためです。)

まずは Trust policy をファイルに保存しておきます。 「このRoleを使うのはosis-pipelines.amazonaws.comですよ」と示すためのものです。

IAM_ROLE_PRINCIPAL='osis-pipelines.amazonaws.com'
FILE_IAM_ROLE_DOC="role-document.json"

cat << EOF > ${FILE_IAM_ROLE_DOC}
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "${IAM_ROLE_PRINCIPAL}"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
EOF
cat ${FILE_IAM_ROLE_DOC}

上記のTrust policyを指定してRoleを作成します。

aws iam create-role --role-name $IAM_ROLE_NAME  --assume-role-policy-document file://$FILE_IAM_ROLE_DOC

作成したRoleを確認します。

aws iam list-roles --query "Roles[?RoleName == '${IAM_ROLE_NAME}'].RoleName"

3. Amazon OpenSearch Collectionの作成

少し複雑ですが、以下のリソースを作成します。

  • Data access policies
    • リソースポリシーです。先ほど作成したRoleにアクセス許可を出します。
  • Encryption policies
    • 必須です。OpenSearch collection保存データの暗号化方法を指定します。
  • Network policies
    • 今回はPublicアクセスを許可します。AWSログインでダッシュボードが開けるようになります。
  • collection
    • 今回はDomainではなく、OpenSearchのserverless版を使います。

ポイント: 中間テーブルのようなオブジェクトはありません。単に作る予定のcollection名を指定した各種ポリシーを前もって作成しておくことでポリシーを有効化します。 特にEncryption policiesは先に作っておかないと、collectionを作成することができません。

Data access policiesの作成

policy documentをファイルに保存しておきます。 今回はCLIユーザと先ほど作成したRoleを許可します。GUIでも確認したければ、AWSのマネコンにログインしているユーザも追加で指定してください。(後から追加することもできます)

FILE_ACCESS_POLICY_DOC="access-policy-document.json" 

CALLER_ARN=$(aws sts get-caller-identity --query Arn --output text) && echo $CALLER_ARN
IAM_ROLE_ARN=$(aws iam get-role --role-name $IAM_ROLE_NAME --query Role.Arn --output text) && echo $IAM_ROLE_ARN

cat << EOS > $FILE_ACCESS_POLICY_DOC
[
  {
    "Rules": [
      {
        "Resource": [
          "index/${COLLECTION_NAME}/*"
        ],
        "Permission": [
          "aoss:CreateIndex",
          "aoss:UpdateIndex",
          "aoss:DescribeIndex",
          "aoss:ReadDocument",
          "aoss:WriteDocument"
        ],
        "ResourceType": "index"
      }
    ],
    "Principal": [
      "${IAM_ROLE_ARN}",
      "${CALLER_ARN}"
    ],
    "Description": "Rule 1"
  }
]
EOS
cat $FILE_ACCESS_POLICY_DOC

Data access policiesを作成します。 詳しくはaws opensearchserverless create-access-policy helpをみてください。 なお、2024/06現在 typeはdata以外選択できません。

ACCESS_POLICY_NAME=access-policy
aws opensearchserverless create-access-policy --name $ACCESS_POLICY_NAME --policy file://$FILE_ACCESS_POLICY_DOC --type data

Encryption policiesの作成

policy documentをファイルに保存しておきます。今回はAWSが提供するkeyを使うので、AWSOwnedKeyをtrueにします。

FILE_ENC_POLICY_DOC="access-policy-document.json" 
cat << EOS > $FILE_ENC_POLICY_DOC
{
  "Rules": [
    {
      "ResourceType": "collection",
      "Resource": [ "collection/${COLLECTION_NAME}" ]
    }
  ],
  "AWSOwnedKey": true
}
EOS
cat $FILE_ENC_POLICY_DOC

Encryption policiesを作成します。 わかりにくいですが、こちらはcreate-security-policyのAPIを使い、typeとしてencryptionを指定します。

ENC_POLICY_NAME=encryption-policy
aws opensearchserverless create-security-policy --name $ENC_POLICY_NAME --policy file://$FILE_ENC_POLICY_DOC --type encryption

Network policiesの作成

Network policiesを作成します。 policy documentをまずファイルに書き込みます

FILE_NET_POLICY_DOC="access-policy-document.json" 
cat << EOS > $FILE_NET_POLICY_DOC
[
  {
    "Rules": [
      {
        "ResourceType": "dashboard",
        "Resource": [ "collection/${COLLECTION_NAME}"]
      },
      {
        "ResourceType": "collection",
        "Resource": [ "collection/${COLLECTION_NAME}"]
      }
    ],
    "AllowFromPublic": true
  }
]
EOS
cat $FILE_NET_POLICY_DOC

Network policiesを作成します。 わかりにくいですが、こちらはcreate-security-policyのAPIを使い、typeとしてnetworkを指定します。

NET_POLICY_NAME=network-policy
aws opensearchserverless create-security-policy --name $NET_POLICY_NAME --policy file://$FILE_NET_POLICY_DOC --type network

Collectionを作成します。 テスト用なのでreplicasは無効にして、typeはSEARCHにしておきます。

aws opensearchserverless create-collection --name $COLLECTION_NAME --standby-replicas DISABLED --type SEARCH

4. Policyの作成

pipelineに付与するPolicyを作成します。

まずは必要な権限をJSONで記載し、ファイルに保存しておきます。

COLLECTION_ID=$(aws opensearchserverless batch-get-collection --names $COLLECTION_NAME --query 'collectionDetails[].id' --output text) && echo $COLLECTION_ID

FILE_IAM_POLICY_DOC="policy-document.json" 
cat << EOS > $FILE_IAM_POLICY_DOC
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "allowRunExportJob",
      "Effect": "Allow",
      "Action": [
        "dynamodb:DescribeTable",
        "dynamodb:DescribeContinuousBackups",
        "dynamodb:ExportTableToPointInTime"
      ],
      "Resource": [ "${TABLE_ARN}" ]
    },
    {
      "Sid": "allowCheckExportjob",
      "Effect": "Allow",
      "Action": [
        "dynamodb:DescribeExport"
      ],
      "Resource": [ "${TABLE_ARN}/export/*" ]
    },
    {
      "Sid": "allowReadFromStream",
      "Effect": "Allow",
      "Action": [
        "dynamodb:DescribeStream",
        "dynamodb:GetRecords",
        "dynamodb:GetShardIterator"
      ],
      "Resource": [ "${TABLE_ARN}/stream/*" ]
    },
    {
      "Sid": "allowReadAndWriteToS3ForExport",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:AbortMultipartUpload",
        "s3:PutObject",
        "s3:PutObjectAcl"
      ],
      "Resource": [
        "${BUCKET_ARN}/${PATH_PREFIX1}/*",
        "${BUCKET_ARN}/${PATH_PREFIX2}/*"
      ]
    },
    {
      "Action": [
        "aoss:BatchGetCollection",
        "aoss:APIAccessAll"
      ],
      "Effect": "Allow",
      "Resource": "arn:aws:aoss:${AWS_DEFAULT_REGION}:${ACCOUNT_ID}:collection/${COLLECTION_ID}"
    },
    {
      "Action": [
        "aoss:CreateSecurityPolicy",
        "aoss:GetSecurityPolicy",
        "aoss:UpdateSecurityPolicy"
      ],
      "Effect": "Allow",
      "Resource": "*",
      "Condition": {
        "StringEquals": {
          "aoss:collection": "${COLLECTION_NAME}"
        }
      }
    }
  ]
}
EOS
cat $FILE_IAM_POLICY_DOC

Policyを作成します。

aws iam create-policy --policy-name ${IAM_POLICY_NAME}  --policy-document file://$FILE_IAM_POLICY_DOC

作成したPolicyを確認します。

IAM_POLICY_ARN="arn:aws:iam::${ACCOUNT_ID}:policy/${IAM_POLICY_NAME}" && echo $IAM_POLICY_ARN
aws iam get-policy --policy-arn $IAM_POLICY_ARN

手順2で作成したpipeline用のRoleに、Policyを紐つけます。

aws iam attach-role-policy --role-name $IAM_ROLE_NAME --policy-arn $IAM_POLICY_ARN 

Policyが紐ついていることを確認します。

aws iam list-attached-role-policies --role-name $IAM_ROLE_NAME

5. Pipelineの作成

こちらの公式ドキュメントも参考にしてください。

Amazon OpenSearch Ingestion パイプラインの作成 - Amazon OpenSearch サービス

PipelineのETL処理に当たる部分はymlで定義します。0から記載するのは大変なので、公式が用意しているblueprintを使います。 blueprintの一覧を確認し、DynamoDBの名前が入っているものを探します。

aws osis list-pipeline-blueprints

AWS-DynamoDBChangeDataCapturePipelineを使いましょう。

blueprintを手に入れます。

aws osis get-pipeline-blueprint --blueprint-name AWS-DynamoDBChangeDataCapturePipeline --query Blueprint.PipelineConfigurationBody --output text > blueprint.yml

blueprintを元に、ETL(DynamoDBのテーブルの中身をOpenSearchにコピーする)に必要な情報を設定していきます。

hostsの設定内容については、collectionが作成完了し、endpointが手に入るまでしばらく待ちます。 以下のコマンドがCREATINGだと、endpointが得られません。ACTIVEになるまで待ちます。

aws opensearchserverless batch-get-collection --names $COLLECTION_NAME --query 'collectionDetails[].status'

collectionが完了していれば、以下を使ってymlを書いてもOKです。(blueprintを編集した後の内容例です。)

TABLE_ARN=$(aws dynamodb describe-table --table-name $TABLE_NAME --query Table.TableArn --output text) && echo $TABLE_ARN
IAM_ROLE_ARN=$(aws iam get-role --role-name $IAM_ROLE_NAME --query Role.Arn --output text) && echo $IAM_ROLE_ARN
HOST=$(aws opensearchserverless batch-get-collection --names $COLLECTION_NAME --query 'collectionDetails[].collectionEndpoint' --output text) && echo $HOST 

FILE_INGESTION_DOCUMENT=injestion.yml

cat << EOS > $FILE_INGESTION_DOCUMENT
version: "2"
dynamodb-pipeline:
  source:
    dynamodb:
      acknowledgments: true
      tables:
        - table_arn: "${TABLE_ARN}"
          stream:
            start_position: "LATEST"
          export:
            s3_bucket: "${BUCKET_NAME}"
            s3_region: "${AWS_DEFAULT_REGION}"
            s3_prefix: "${PATH_PREFIX1}/"
      aws:
        sts_role_arn: "${IAM_ROLE_ARN}"
        region: "${AWS_DEFAULT_REGION}"
  sink:
    - opensearch:
        hosts:
          [
            "${HOST}"
          ]
        index: "\${getMetadata(\"table_name\")}"
        index_type: custom
        normalize_index: true
        document_id: "\${getMetadata(\"primary_key\")}"
        action: "\${getMetadata(\"opensearch_action\")}"
        document_version: "\${getMetadata(\"document_version\")}"
        document_version_type: "external"
        aws:
          sts_role_arn: "${IAM_ROLE_ARN}"
          region: "${AWS_DEFAULT_REGION}"
          serverless: true
        dlq:
          s3:
            bucket: "${BUCKET_NAME}"
            key_path_prefix: "${PATH_PREFIX2}/dlq"
            region: "${AWS_DEFAULT_REGION}"
            sts_role_arn: "${IAM_ROLE_ARN}"
EOS
cat $FILE_INGESTION_DOCUMENT

パイプラインを作成します。 min/max-unitは読み書きのcapacityで、時間あたりの課金に影響します。 OpenSearch Compute Unit (OCU)と言われる単位です。

aws osis create-pipeline \
      --pipeline-name $PIPELINE_NAME \
      --min-units 1\
      --max-units 2\
      --pipeline-configuration-body file://${FILE_INGESTION_DOCUMENT}

こちらも作成に時間が数分かかります。

6. テスト実行

DynamoDBにデータを投入してみます。

aws dynamodb put-item \
    --table-name $TABLE_NAME \
    --item '{"name": {"S": "saki"}, "age": {"N": "16"}, "height": {"N": "152"}}'
aws dynamodb put-item \
    --table-name $TABLE_NAME \
    --item '{"name": {"S": "temari"}, "age": {"N": "15"}, "height": {"N": "162"}}'
aws dynamodb put-item \
    --table-name $TABLE_NAME \
    --item '{"name": {"S": "kotone"}, "age": {"N": "15"}, "height": {"N": "156"}}'

OpenSearchを確認します。反映されるまで、少し時間がかかるかもしれません。

awscurl --service aoss --region $AWS_DEFAULT_REGION -X GET ${HOST}/_cat/indices
awscurl --service aoss --region $AWS_DEFAULT_REGION -X GET ${HOST}/${TABLE_NAME}/_search | jq . 

以下のように、DynamoDBに投入したデータがOpenSearchに反映されていれば成功です!

{
  "took": 1494,
  "timed_out": false,
  "_shards": {
    "total": 0,
    "successful": 0,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "ingestion-table",
        "_id": "temari|15",
        "_score": 1,
        "_source": {
          "name": "temari",
          "age": 15,
          "height": 162
        }
      },
      {
        "_index": "ingestion-table",
        "_id": "kotone|15",
        "_score": 1,
        "_source": {
          "name": "kotone",
          "age": 15,
          "height": 156
        }
      },
      {
        "_index": "ingestion-table",
        "_id": "saki|16",
        "_score": 1,
        "_source": {
          "name": "saki",
          "age": 16,
          "height": 152
        }
      }
    ]
  }
}

リソースの削除

以下の順にリソースを削除していきます。

Pipeline

aws osis delete-pipeline --pipeline-name $PIPELINE_NAME

Amazon OpenSearch Service Collection

COLLECTION_ID=$( aws opensearchserverless list-collections --query "collectionSummaries[?name=='${COLLECTION_NAME}'].id" --output text) && $COLLECTION_ID
aws opensearchserverless delete-collection  --id $COLLECTION_ID

Amazon OpenSearch Service 3種のポリシー

aws opensearchserverless delete-access-policy --name $ACCESS_POLICY_NAME --type data
aws opensearchserverless delete-security-policy --name $ENC_POLICY_NAME --type encryption
aws opensearchserverless delete-security-policy --name $NET_POLICY_NAME --type network 

Amazn DynamoDB

aws dynamodb delete-table --table-name $TABLE_NAME

Amazn S3

aws s3 rm s3://$BUCKET_NAME --recursive
aws s3api delete-bucket --bucket $BUCKET_NAME

Pipelineに付与していたPolicyとRole

aws iam detach-role-policy --role-name $IAM_ROLE_NAME --policy-arn $IAM_POLICY_ARN 
aws iam delete-policy --policy-arn $IAM_POLICY_ARN
aws iam delete-role --role-name $IAM_ROLE_NAME

余談: ChatGPT と AWS CLI

AWS CLI や CDK はChatGPTと相性がよく、指示が適切であればリソース作成においてとても便利です。しかし、複数のリソースを組み合わせた正確な構成手順を一度に出力するのはまだまだ難しいようです。

個人的にChatGPTが便利だと感じるのは以下のケースです。

  • 1リクエスト単位での質問(nameをxxとしたs3バケットを作る方法を教えて、など)
  • --queryパラメータの記述方法を質問する
  • リソース作成ログを与え、「これらのリソースの削除方法を教えて」

AWS CLI や CDK はリソースの作成手順を残せると言う意味で重宝していますが、パラメータなどを調べるのがやや面倒です。 個人的にはaws cliのヘルプとCDKの予測変換とドキュメント(typescript)を併用しつつ、ChatGPTに質問しながら最終的にCDKのコードに落とし込むのが一番楽だと感じています。

まとめ

Amazon DynamodbAmazon OpenSearch Serviceのzero-ETL integrationをAWS CLIで構築する手順をまとめました。 AWSはリソースが色々あってややこしいですが、GUIチュートリアルをこなした後に同じものをCLIで構築し、ノートにまとめておくと理解が深まると思っています。 CDKはCLIと同じAPIの上に作られているので、後々CDKを記述する時も楽になります。