Kinesis Stream から AWS Lambda でデータを取得して処理
[History] [Last Modified] (2017/10/07 06:04:11)
ここは
趣味のプログラミングを楽しむための情報共有サービス。記事の一部は有料設定にして公開できます。 詳しくはこちらをクリック📝
Recent posts
Popular pages

概要

Kinesis Stream はデータ集計などに便利な機能を有するキューのようなものを提供します。SQS と似ていますが、リアルタイム性の有無など用途が異なります。本ページでは、こちらのページで使い方を把握した AWS Lambda が Kinesis Stream からデータを定期的に取得して処理するための設定方法をまとめます。

zip パッケージの作成

以下のようなファイルを作ります。Kinesis Stream に入力された文字列を CloudWatch にログ出力しています。

ProcessKinesisRecords.js

console.log('Loading function');

exports.handler = function(event, context, callback) {
  event.Records.forEach(function(record) {
    // base64 エンコーディングされたデータをデコードします。
    // https://nodejs.org/dist/latest-v6.x/docs/api/buffer.html#buffer_class_method_buffer_from_string_encoding
    var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
    console.log('Decoded payload:', payload);
  });
  callback(null, "message");
};

依存ライブラリはないため js ファイルのみを zip 化します。

zip -r ProcessKinesisRecords.zip ProcessKinesisRecords.js

IAM ロールの作成

AWS コンソールを利用して、Lambda 関数が利用する lambda-kinesis-execution-role ロールを作成します。ロールには Kinesis Stream からデータを取得するための AWSLambdaKinesisExecutionRole 権限を付与します。

Lambda 用の設定がなされたロールを選択

Uploaded Image

Kinesis 権限を付与

Uploaded Image

ロール名を設定

Uploaded Image

Lambda 関数の登録

以下のコマンドでアップロードします。lambda-test-user-20170929こちらのページの事前準備で登録した、Lambda 関数全般の権限が付与された IAM です。

aws lambda create-function \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--function-name ProcessKinesisRecords \
--zip-file fileb://ProcessKinesisRecords.zip \
--role arn:aws:iam::123412341234:role/lambda-kinesis-execution-role \
--handler ProcessKinesisRecords.handler \
--runtime nodejs6.10

Lambda 関数のテスト実行

CLI から aws lambda invoke コマンドでテストすることもできますが、ここでは簡単のためコンソールからテスト実行します。様々なテンプレートが用意されています。ここでは Kinesis Stream からのデータを模擬した Kinesis テンプレートを選択します。

Uploaded Image

Kinesis Stream の作成

AmazonKinesisFullAccess 権限を lambda-test-user-20170929 に付与してから、以下のコマンドで Kinesis Stream を作成します。

aws kinesis create-stream \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--stream-name examplestream \
--shard-count 1

正常に作成されたことを確認します。

aws kinesis describe-stream \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--stream-name examplestream

Kinesis Stream からデータを取得するための Lambda 設定

S3 が Lambda を実行する場合と異なり、Lambda が Kinesis Stream から能動的にデータを取得します。そのための設定 Event Source Mapping を以下の create-event-source-mapping コマンドで行います。

aws lambda create-event-source-mapping \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:ap-northeast-1:123412341234:stream/examplestream \
--batch-size 100 \
--starting-position TRIM_HORIZON

一度に最大 100 個のデータを取得するように --batch-size で設定しています。また --starting-position では Kinesis Stream のどの位置のイベントからデータ取得を開始するかを指定しています。TRIM_HORIZON はキューの最も古いイベントからデータ取得を開始するための設定です。その他に以下のようなものがあります。

  • LATEST 最新のイベントからデータ取得を開始
  • AT_SEQUENCE_NUMBER 指定した位置 StartingSequenceNumber のイベントからデータ取得を開始
  • AFTER_SEQUENCE_NUMBER 指定した位置 StartingSequenceNumber の「次の」イベントからデータ取得を開始
  • AT_TIMESTAMP 指定した時刻 Timestamp のタイムスタンプを有するイベントからデータ取得を開始

正しく設定されたことは list-event-source-mappings コマンドで確認できます。

aws lambda list-event-source-mappings \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:ap-northeast-1:123412341234:stream/examplestream

Kinesis Stream にデータを投入

実際には何らかの集計対象ログ等を監視するクライアントアプリケーションが行うことですが、ここでは検証のため put-record コマンドを用いてデータを投入してみます。CloudWatch ログに --data で指定した文字列が出力されていることが確認できれば成功です。

aws kinesis put-record \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--stream-name examplestream \
--data "TEST TEST TEST" \
--partition-key shardId-000000000000
Related pages
    概要 DynamoDB は MongoDB/Cassandra/Couchbase といった NoSQL データベースの一つです。DynamoDB のオプション機能としてテーブルのストリームを有効にできます。こちらのページの Kinesis Stream と同様に、ストリームが有効なテーブルを更新すると、その更新内容を Lambda 関数で取得して何らかの処理を行うことができます
    概要 AWS Lambda はイベントドリブンな「関数」を登録できるサービスです。例えば S3 に画像がアップロードされたときにサムネイル用のサイズに加工する処理が記述された関数を登録できます。基本的な使い方をまとめます。 事前準備 関数の登録はブラウザで AWS コンソールにログインして行うこともできますが、本ページでは