【Apache Kafka】Debezium ConnectorをMSK Connectで試してみた
はじめに
本記事はエーピーコミュニケーションズ Advent Calendar 2021の17日目の記事です。
2021年9月16日にリリースされた、AWSマネージドのKafka Connectクラスター管理機能「MSK Connect」を試してみました。
本記事の検証では、上記の記事を参考に、DBはPostgreSQL(RDSのDBインスタンス)とし、コネクタープラグインはDebezium Postgres Connectorに置き換えています。
2021/12/11時点のAWSマネジメントコンソールから操作しています。
AWSマネジメントコンソールのUIはアップデートされる可能性があります。
環境構築
構成図
1. VPC/Subnet/Bastion(EC2)の作成
本記事では、詳しい作成方法は割愛します。
2. MSKクラスター設定の作成
- AWSマネジメントコンソールのMSKサービスのメニュー「クラスター構成」画面を開く
- 「クラスター設定を作成」を押下
- 「構成名」には任意の文字列、「リビジョン1の設定プロパティ」の以下の部分を修正
auto.create.topics.enable=false
=>auto.create.topics.enable=true
- 「作成」を押下
3. MSK/RDS/MSK Connect用リソースの構築
CloudFormationを使用して以下のリソースを作成しました。
- MSK用のSecurityGroup
- MSK Cluster
- DB用のSecurityGroup
- RDS DBインスタンス
- RDS DBサブネットグループ
- RDS DBパラメータグループ
- MSK Connectに設定するIAMロール
- IAMロールにアタッチするIAMポリシー(参考)
- MSK Connectのログを格納するロググループ
以下、使用したCloudFormationテンプレートです。
AWSTemplateFormatVersion: "2010-09-09" Parameters: ClusterName: Type: String Description: MSK Cluster Name DBInstanceIdentifier: Type: String Description: DB Instance Identifier Vpc: Type: AWS::EC2::VPC::Id Description: Target VPC id Subnet1: Type: AWS::EC2::Subnet::Id Description: Target Subnet id Subnet2: Type: AWS::EC2::Subnet::Id Description: Target Subnet id ClientSecurityGroup: Type: AWS::EC2::SecurityGroup::Id Description: Security Group of Client(Bastion) DefaultSecurityGroup: Type: AWS::EC2::SecurityGroup::Id Description: Default Secrity Group of Target VPC MSKConfigurationInfoArn: Type: String Description: Arn of MSK Configration(use resource created in step2) DBUser: Type: String Description: Username of DB Instance DBPassword: Type: String Description: Password of DB Instance user Resources: MSKSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: debezium test SecurityGroupIngress: - SourceSecurityGroupId: !Ref ClientSecurityGroup IpProtocol: tcp ToPort: 9092 FromPort: 9092 - SourceSecurityGroupId: !Ref ClientSecurityGroup IpProtocol: tcp ToPort: 2181 FromPort: 2181 - SourceSecurityGroupId: !Ref ClientSecurityGroup IpProtocol: tcp ToPort: 9098 FromPort: 9098 VpcId: !Ref Vpc MSKCluster: Type: AWS::MSK::Cluster Properties: BrokerNodeGroupInfo: ClientSubnets: - !Ref Subnet1 - !Ref Subnet2 InstanceType: kafka.m5.large SecurityGroups: - !Ref MSKSecurityGroup - !Ref DefaultSecurityGroup StorageInfo: EBSStorageInfo: VolumeSize: 30 ClusterName: !Ref ClusterName ClientAuthentication: Unauthenticated: Enabled: true Sasl: Iam: Enabled: true KafkaVersion: 2.7.1 ConfigurationInfo: Arn: !Ref MSKConfigurationInfoArn Revision: 1 EncryptionInfo: EncryptionInTransit: ClientBroker: TLS_PLAINTEXT InCluster: true EnhancedMonitoring: DEFAULT NumberOfBrokerNodes: 2 DBSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: debezium test SecurityGroupIngress: - SourceSecurityGroupId: !Ref ClientSecurityGroup IpProtocol: tcp ToPort: 5432 FromPort: 5432 VpcId: !Ref Vpc DBInstance: Type: AWS::RDS::DBInstance DeletionPolicy: Delete Properties: DBInstanceIdentifier: !Ref DBInstanceIdentifier DBInstanceClass: db.t3.small DBSubnetGroupName: !Ref DBSubnetGroup AllocatedStorage: 10 Engine: postgres EngineVersion: 13.4 MasterUsername: !Ref DBUser MasterUserPassword: !Ref DBPassword StorageType: gp2 BackupRetentionPeriod: 0 VPCSecurityGroups: - !Ref DBSecurityGroup - !Ref DefaultSecurityGroup DBParameterGroupName: !Ref DBParameterGroup DBSubnetGroup: Type: AWS::RDS::DBSubnetGroup Properties: DBSubnetGroupDescription: subnet group for debezium test SubnetIds: - !Ref Subnet1 - !Ref Subnet2 DBParameterGroup: Type: AWS::RDS::DBParameterGroup Properties: Family: postgres13 Description: for debezium test Parameters: rds.logical_replication: 1 IAMPolicy: Type: AWS::IAM::Policy Properties: PolicyName: msk-connect-role PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - kafka-cluster:Connect - kafka-cluster:DescribeCluster - kafka-cluster:AlterCluster - kafka-cluster:DescribeClusterDynamicConfiguration Resource: - !Ref MSKCluster - Effect: Allow Action: - kafka-cluster:*Topic* - kafka-cluster:ReadData - kafka-cluster:WriteData Resource: - !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:topic/${ClusterName}/*" - Effect: Allow Action: - kafka-cluster:AlterGroup - kafka-cluster:DescribeGroup Resource: - !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:group/${ClusterName}/*" - Effect: Allow Action: - logs:CreateLogDelivery - logs:PutResourcePolicy - logs:DescribeResourcePolicies - logs:DescribeLogGroups Resource: - !GetAtt LogGroup.Arn Roles: - !Ref IAMRole IAMRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - kafkaconnect.amazonaws.com Action: - sts:AssumeRole Description: for MSK Connect LogGroup: Type: AWS::Logs::LogGroup Properties: RetentionInDays: 7
4. PostgreSQLにデータベース・テーブル作成
BastionからPostgreSQLに接続し、データベース・テーブルを作成します。
Bastionにはpostgresqlをインストールし、psqlコマンドが使用できるようにしておきます。
データベース名はdebezium-test
として作成しました。
- Bastionから以下のコマンドでPostgreSQLに接続
psql -h <DBインスタンスのエンドポイント> -p 5432 -U <DBのユーザー名> -d postgres
- パスワードを入力し、ログインする
- 以下のコマンドでデータベース作成し、接続する
CREATE DATABASE "debezium-test"; \c debezium-test
- 以下のコマンドでテーブル作成
CREATE TABLE users ( id int, name varchar(32) ); ALTER TABLE users REPLICA IDENTITY FULL;
5. MSK Connectの設定
- コネクタプラグイン格納用のS3バケットを事前に準備
- https://debezium.io/releases/ から最新のDebeziumコネクターをダウンロード
- 検証時は1.7.1のPostgres Connector Pluginを使用
- tarコマンド、zipコマンドが使用できる環境で、アップロードするための構成に変更
tar xvf debezium-connector-postgres-1.7.1.Final-plugin.tar.gz cd debezium-connector-postgres zip -9 ../debezium-connector-postgres-1.7.1.zip * cd ..
- debezium-connector-postgres-1.7.1.zipをS3バケットにアップロード
- AWSマネジメントコンソールのMSKサービスのメニュー「コネクタ」画面を開く
- 「コネクタを作成」を押下
- 「カスタムプラグインタイプ」の項目で「カスタムプラグインを作成」を選択
- 「S3 URI - カスタムプラグインオブジェクト」では手順4でアップロードしたzipファイルを設定する
- カスタムプラグイン名を設定して「次へ」を押下
- コネクタのプロパティ画面に遷移するので以下の設定をし、「次へ」を押下
- 基本情報-コネクタ名:任意のコネクター名を設定
- Apache Kafka-クラスタータイプ:MSKクラスター
- CloudFormationで作成したMSKクラスターを選択
- 認証メソッドは「IAM」を選択
- コネクタ設定
connector.class=io.debezium.connector.postgresql.PostgresConnector tasks.max=1 database.hostname=<DBインスタンスのエンドポイント> database.dbname=<データベース名> database.port=5432 database.user=<DBのユーザー名> database.password=<DBのパスワード> database.server.id=123456 database.server.name=debezium-test-db database.history.kafka.bootstrap.servers=<MSKのBootstrapServersのエンドポイント(認証タイプ「IAM」)> database.history.consumer.security.protocol=SASL_SSL database.history.consumer.sasl.mechanism=AWS_MSK_IAM database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler database.history.producer.security.protocol=SASL_SSL database.history.producer.sasl.mechanism=AWS_MSK_IAM database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler include.schema.changes=true plugin.name=pgoutput
- コネクタのキャパシティー:(すべてデフォルト)
- ワーカー設定:MSK のデフォルト設定を使用
- アクセス許可:CloudFormationで作成したIAMロール
- 「次へ」を押下
- ログ配信で「Amazon CloudWatch Logs への配信」を選択
- CloudFormationで作成したロググループを設定し、「次へ」を選択
- 設定内容を確認し、「コネクタを作成」を押下
- コネクタの状態が「Active」になるのを待つ
本検証のコネクタが「Failed」になることがしばしばありました。
なぜFailedになっているのかは、コネクタのログを見ないと推測できないようです。
ログ配信設定は有効にしてお試しすることを推奨します。
(本記事ではCloudWatch Logsを使用してます。)
Kafka Topicの確認
BastionにKafkaクライアントツールをインストールしておきます。
本検証では、v2.8.1を使用しました。
Topicにどんなメッセージが格納されるのか確認します。
以下のコマンドでTopic一覧を確認します。
<Kafkaクライアントツールインストールパス>/bin/kafka-topic.sh --list --zookeeper <MSKクラスターのzookeeperのエンドポイント>
DBに何も操作をしていない時点では、以下のTopicが存在します。
__amazon_msk_connect_*
はコネクタによって自動的に作成されたTopicです。
__amazon_msk_canary __amazon_msk_connect_configs_debezium-test_067bcbed-2e35-492b-8c4c-84ff98c90bcc-4 __amazon_msk_connect_offsets_debezium-test_067bcbed-2e35-492b-8c4c-84ff98c90bcc-4 __amazon_msk_connect_status_debezium-test_067bcbed-2e35-492b-8c4c-84ff98c90bcc-4 __consumer_offsets
INSERT文
試しに適当なINSERT文を実行します。
INSERT INTO users VALUES(1,'piyomaru');
DB操作を行った後、Topic一覧を確認すると、
debezium-test-db.public.users
というTopicが自動的に生成されていました。
以下のコマンドで、Topicのメッセージを確認します。
<Kafkaクライアントツールインストールパス>/bin/kafka-console-consumer.sh --bootstrap-server <MSKクラスターのBootstrap Serverのエンドポイント> --topic <Topic名> --from-beginning
以下のメッセージが格納されていました。
Struct{after=Struct{id=1,name=piyomaru},source=Struct{version=1.7.1.Final,connector=postgresql,name=debezium-test-db,ts_ms=1639304674844,db=debezium-test,sequence=["1207988808","1275070024"],schema=public,table=users,txId=597,lsn=1275070024},op=c,ts_ms=1639304675254}
他にも実行してみます。
UPDATE文
UPDATE users SET name="Piyomaru" WHERE id=1;
メッセージ
Struct{before=Struct{id=1,name=piyomaru},after=Struct{id=1,name=Piyomaru},source=Struct{version=1.7.1.Final,connector=postgresql,name=debezium-test-db,ts_ms=1639305735822,db=debezium-test,sequence=["1543504608","1543505184"],schema=public,table=users,txId=603,lsn=1543505184},op=u,ts_ms=1639305736034}
DELETE文
DELETE FROM users WHERE id=2;
メッセージ
Struct{before=Struct{id=2,name=piyokichi},source=Struct{version=1.7.1.Final,connector=postgresql,name=debezium-test-db,ts_ms=1639305770016,db=debezium-test,sequence=["1543505456","1543505512"],schema=public,table=users,txId=604,lsn=1543505512},op=d,ts_ms=1639305770418}
DBの操作をTopicに格納できていることが確認できました。
おわりに
マイクロサービスが個別にDBを持つ構成の場合に、Kafka、Kafka Connect(Debezium Connector)を活用することで、各DB間の整合性を保つ仕組みを構築することができるようです。(以下、参考記事)
今回の検証ではここまでできませんでしたが、とても興味深いので、別の機会に検証したいと考えてます。
今年はKafka漬けの一年でした。少し仲良くなれたと思います。
よいお年を!