昨年のAWS Ambassador Global Summitでのシアトル体験を機に、英語学習のモチベーションが上がっています。2024年11月にKinesis Client Library(KCL)がメジャーバージョンアップしたことをご存じでしょうか。このメジャーバージョンアップによって、KCL3では大きな変更が加えられました。この投稿では、KCL3とSpring Bootを用いたKinesis Data Streamのコンシューマーアプリケーションの構築例をご紹介します。
Kinesis Client Library(KCL)とは
Kinesis Client Library(KCL)は、Amazon Kinesis Data Streamsからのデータの消費と処理のプロセスを簡素化するために設計されたスタンドアロンのJavaソフトウェアライブラリです。ライブラリであるため、依存関係を管理するMavenやGradleから利用することができます。KCLは2024年11月にメジャーバージョンアップされてVersion 3になりました。このVersion 3では以下のような特徴的なアップデートが導入されています。
- KCLが利用するDynamoDBのキャパシティプランが、デフォルトでオンデマンドモードに変更された。
- 新しいロードバランシングアルゴリズムが採用され、新たにワーカーメトリクスを保存するDynamoDBテーブルが追加された。
Spring BootをKCLのコンシュマーアプリケーションにするには
公式ドキュメントでは、KCLを用いたメインコンシューマーアプリケーションの実装例が紹介されています。そこでは、SampleConsumerクラスによる起動が示されています。シーケンス図を以下に示します。
実際のワークロードに適用する場合、上記のような単純なmainメソッドの起動ではなく、フレームワークの恩恵を受けたいと考える方は多いはずです。特にJavaは、Jakarta EEやSpringなどの有名なフレームワークが多くのアプリケーションで利用されてきた歴史があります。では、Spring Bootを使ってコンシューマーアプリケーションを構築するにはどうすればいいのでしょうか。
Spring BootにおけるKCLの利用
Spring BootをKCLで利用することによる恩恵としては、以下のような点があります。
- Dependency Injection(依存性の注入)による効率的な実装
- Aspect Oriented Programmingによる共通処理の実装
これに加えて、Lombokのプラグインを使用してSetterやGetterの実装を省略することが強力で、簡単にオブジェクトのカプセル化を実現することができます。Spring Bootでは、アプリケーションの起動となるクラスに@SpringBootApplicationアノテーションを付与します。コード例は以下の通りです。
@SpringBootApplication public class AwsKcl3SpringBootApplication { public static void main(String[] args) { SpringApplication.run(AwsKcl3SpringBootApplication.class, args); } }
このままではKCLのスケジューラーが起動しないため、Spring Bootのアプリケーションの起動をカスタマイズする必要があります。CommandLineRunnerを実装し、オーバーライドしたrunメソッドの処理でスケジューラーを起動することで実現できます。コード例は以下の通りです。
@SpringBootApplication public class AwsKcl3SpringBootApplication implements CommandLineRunner { @Autowired private Scheduler scheduler; public static void main(String[] args) { SpringApplication.run(AwsKcl3SpringBootApplication.class, args); } @Override public void run(String... args) throws Exception { scheduler.run(); } }
Configurationアノテーションを活用したKCLスケジューラーの設定
上記の@Autowiredで指定しているKCLのスケジューラーについて解説します。Spring Frameworkでは、@Configurationアノテーションと@Beanアノテーションを使用することで、@Autowiredアノテーションで依存性を注入できるクラスを指定できます。KCLの各種設定を含めたスケジューラークラスを@Beanアノテーションで指定することで、依存性を注入します。コード例は以下の通りです。
@Configuration public class SampleKinesisReaderConfig { @Autowired private SampleRecordProcessorFactory sampleRecordProcessorFactory; @Bean public Scheduler scheduler() { ConfigsBuilder configsBuilder = configsBuilder(); return new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); } }
これにより、KCLをSpring Bootの恩恵を受けながら利用することができます。完全に動作する例をGitHubで公開していますので、よろしければご参照ください。
KCL3をAmazon ECSで動かそう
KCL3をAmazon ECSで動かすには、Spring BootアプリケーションをDockerイメージとしてビルドし、ECSサービスのタスクとして常駐させます。
次に、以下のコマンドでシンプルなJSONをKinesisに投入します。
aws kinesis put-record --stream-name kinesis-consumer-stream-name --partition-key 123 --data testdata.json
CloudWatch Logsに以下のようなログが出力され、KCLがKinesisのレコードを正常に処理していることが確認できました。
KCL3で作成されるDynamoDBについて
KCL3では、KCLによって利用されるDynamoDBテーブルが2つ追加されました。また、既存のリーステーブルにもグローバルセカンダリインデックス(GSI)が追加されています。以下に、それぞれのテーブルの基本情報や属性を記載します。
リーステーブル
リーステーブルは、KCLコンシューマーアプリケーションのスケジューラーによって処理されるKinesisのレコードのチェックポイントを管理するDynamoDBテーブルです。リーステーブルには以下のような属性が存在します。各属性の詳細な説明は公式ドキュメントをご参照ください。
属性名 | PK | SK | タイプ |
---|---|---|---|
leaseKey | ◯ | 文字列 | |
checkpoint | 文字列 | ||
checkpointSubSequenceNumber | 数値 | ||
endingHashKey | 文字列 | ||
leaseCounter | 数値 | ||
leaseOwner | 文字列 | ||
ownerSwitchesSinceCheckpoint | 数値 | ||
startingHashKey | 文字列 |
KCL3から、リーステーブルにGSI(グローバルセカンダリインデックス)が追加されています。GSIは以下のような設定内容になっています。IaC(Infrastructure as Code)で定義する際には、このGSIの作成を忘れないように注意が必要です。
GSI設定項目 | 設定内容 |
---|---|
インデックス名 | LeaseOwnerToLeaseKeyIndex |
パーティションキー | leaseOwner(文字列) |
ソートキー | leaseKey(文字列) |
読み込みキャパシティ | オンデマンド |
書き込みキャパシティ | オンデマンド |
射影される属性 | キーのみ |
ワーカーメトリクステーブル
KCL3で新たに追加されたワーカーメトリクステーブルは、ワーカーのCPU使用率メトリクスを記録するために利用されるDynamoDBテーブルです。ワーカーメトリクステーブルで作成される属性は以下の表の通りです。ここで、widはワーカーを一意に識別するID、lutは最終更新時刻(Last Update Time)、stsはCPU使用率を記録したステータスであると推測できます。
属性名 | PK | SK | タイプ |
---|---|---|---|
wid | ◯ | 文字列 | |
lut | 数値 | ||
opr | マップ | ||
» C | リスト | ||
sts | マップ | ||
» C | リスト |
コーディネーター状態テーブル
KCL3で新たに追加されたコーディネーター状態テーブルは、ワーカーの内部状態情報を保存するために利用されるDynamoDBテーブルです。このテーブルには、主にリーダーの選択に関するデータと、KCL2からKCL3への移行時のマイグレーション情報が保存されています。共通属性は、プライマリーキーとなる'key'のみでした。
属性名 | PK | SK | タイプ |
---|---|---|---|
key | ◯ | 文字列 |
Leader属性一覧
リーダーの選択に関するデータには下表の属性がありました。ownerNameはワーカーIDと一致しています。leaseDurationという属性については、その意味や役割が気になるところです。これが何を表しているのか、興味深い点ですね。
属性名 | タイプ |
---|---|
leaseDuration | 文字列 |
ownerName | 文字列 |
recordVersionNumber | 文字列 |
Migration属性一覧
マイグレーション情報には、KCL2からKCL3への移行状態を示すための、比較的分かりやすい属性がありました。cvは現在のバージョンを表しており、今回の検証ではKCL3を利用して構築したため、初めからKCL3のレコードのみが登録されていました。KCL2からの移行の場合、どのような項目が登録されるのかは興味深い点です。
属性名 | タイプ |
---|---|
cv | 文字列 |
mb | |
mts | |
h | リスト |
まとめ
本稿では、Amazon KinesisのKinesis Client Library 3(KCL3)とSpring Bootを使用したコンシューマーアプリケーションの構築例をご紹介しました。完全に動作するコードをGitHubで公開していますので、ご興味があればぜひご参照ください。
KCLは、デベロッパーがデータ処理のためのビジネスロジックに集中できる、非常に便利なライブラリだと考えます。今回の経験を通じて、Amazon Kinesisへの理解がさらに深まりました。データ駆動型の開発は魅力的なアーキテクチャであり、今後も積極的にAmazon Kinesisを活用していきたいと思います。この記事が皆様のお役に立てば幸いです。