ぴよ丸水産

週末ファゴッティストによる技術ブログ

【Apache Kafka】Debezium ConnectorをMSK Connectで試してみた


はじめに

本記事はエーピーコミュニケーションズ Advent Calendar 2021の17日目の記事です。

2021年9月16日にリリースされた、AWSマネージドのKafka Connectクラスター管理機能「MSK Connect」を試してみました。

aws.amazon.com

本記事の検証では、上記の記事を参考に、DBはPostgreSQL(RDSのDBインスタンス)とし、コネクタープラグインはDebezium Postgres Connectorに置き換えています。
2021/12/11時点のAWSマネジメントコンソールから操作しています。
AWSマネジメントコンソールのUIはアップデートされる可能性があります。

環境構築

構成図

f:id:blue-38:20211212221530p:plain
構成図

1. VPC/Subnet/Bastion(EC2)の作成

本記事では、詳しい作成方法は割愛します。

2. MSKクラスター設定の作成

  1. AWSマネジメントコンソールのMSKサービスのメニュー「クラスター構成」画面を開く
  2. クラスター設定を作成」を押下
  3. 「構成名」には任意の文字列、「リビジョン1の設定プロパティ」の以下の部分を修正
    • auto.create.topics.enable=false=>auto.create.topics.enable=true
  4. 「作成」を押下

3. MSK/RDS/MSK Connect用リソースの構築

CloudFormationを使用して以下のリソースを作成しました。

  • MSK用のSecurityGroup
  • MSK Cluster
  • DB用のSecurityGroup
  • RDS DBインスタンス
  • RDS DBサブネットグループ
  • RDS DBパラメータグループ
  • MSK Connectに設定するIAMロール
  • IAMロールにアタッチするIAMポリシー(参考)
  • MSK Connectのログを格納するロググループ

以下、使用したCloudFormationテンプレートです。

AWSTemplateFormatVersion: "2010-09-09"
Parameters:
  ClusterName:
    Type: String
    Description: MSK Cluster Name
  DBInstanceIdentifier:
    Type: String
    Description: DB Instance Identifier
  Vpc:
    Type: AWS::EC2::VPC::Id
    Description: Target VPC id
  Subnet1:
    Type: AWS::EC2::Subnet::Id
    Description: Target Subnet id
  Subnet2:
    Type: AWS::EC2::Subnet::Id
    Description: Target Subnet id
  ClientSecurityGroup:
    Type: AWS::EC2::SecurityGroup::Id
    Description: Security Group of Client(Bastion)
  DefaultSecurityGroup:
    Type: AWS::EC2::SecurityGroup::Id
    Description: Default Secrity Group of Target VPC
  MSKConfigurationInfoArn:
    Type: String
    Description: Arn of MSK Configration(use resource created in step2)
  DBUser:
    Type: String
    Description: Username of DB Instance
  DBPassword:
    Type: String
    Description: Password of DB Instance user  
Resources:
  MSKSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: debezium test
      SecurityGroupIngress:
        - SourceSecurityGroupId: !Ref ClientSecurityGroup
          IpProtocol: tcp
          ToPort: 9092
          FromPort: 9092
        - SourceSecurityGroupId: !Ref ClientSecurityGroup
          IpProtocol: tcp
          ToPort: 2181
          FromPort: 2181
        - SourceSecurityGroupId: !Ref ClientSecurityGroup
          IpProtocol: tcp
          ToPort: 9098
          FromPort: 9098
      VpcId: !Ref Vpc
  MSKCluster:
    Type: AWS::MSK::Cluster
    Properties:
      BrokerNodeGroupInfo:
        ClientSubnets:
          - !Ref Subnet1
          - !Ref Subnet2
        InstanceType: kafka.m5.large
        SecurityGroups:
          - !Ref MSKSecurityGroup
          - !Ref DefaultSecurityGroup
        StorageInfo:
          EBSStorageInfo:
            VolumeSize: 30
      ClusterName: !Ref ClusterName
      ClientAuthentication:
        Unauthenticated:
          Enabled: true
        Sasl:
          Iam:
            Enabled: true
      KafkaVersion: 2.7.1
      ConfigurationInfo:
        Arn: !Ref MSKConfigurationInfoArn
        Revision: 1
      EncryptionInfo:
        EncryptionInTransit:
          ClientBroker: TLS_PLAINTEXT
          InCluster: true
      EnhancedMonitoring: DEFAULT
      NumberOfBrokerNodes: 2
  DBSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: debezium test
      SecurityGroupIngress:
        - SourceSecurityGroupId: !Ref ClientSecurityGroup
          IpProtocol: tcp
          ToPort: 5432
          FromPort: 5432
      VpcId: !Ref Vpc
  DBInstance:
    Type: AWS::RDS::DBInstance
    DeletionPolicy: Delete
    Properties:
      DBInstanceIdentifier: !Ref DBInstanceIdentifier
      DBInstanceClass: db.t3.small
      DBSubnetGroupName: !Ref DBSubnetGroup
      AllocatedStorage: 10
      Engine: postgres
      EngineVersion: 13.4
      MasterUsername: !Ref DBUser
      MasterUserPassword: !Ref DBPassword
      StorageType: gp2
      BackupRetentionPeriod: 0
      VPCSecurityGroups:
      - !Ref DBSecurityGroup
      - !Ref DefaultSecurityGroup
      DBParameterGroupName: !Ref DBParameterGroup
  DBSubnetGroup:
    Type: AWS::RDS::DBSubnetGroup
    Properties:
      DBSubnetGroupDescription: subnet group for debezium test
      SubnetIds:
        - !Ref Subnet1
        - !Ref Subnet2
  DBParameterGroup:
    Type: AWS::RDS::DBParameterGroup
    Properties:
      Family: postgres13
      Description: for debezium test
      Parameters:
        rds.logical_replication: 1
  IAMPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyName: msk-connect-role
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action:
              - kafka-cluster:Connect
              - kafka-cluster:DescribeCluster
              - kafka-cluster:AlterCluster
              - kafka-cluster:DescribeClusterDynamicConfiguration
            Resource:
              - !Ref MSKCluster
          - Effect: Allow
            Action:
              - kafka-cluster:*Topic*
              - kafka-cluster:ReadData
              - kafka-cluster:WriteData
            Resource:
              - !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:topic/${ClusterName}/*"
          - Effect: Allow
            Action:
              - kafka-cluster:AlterGroup
              - kafka-cluster:DescribeGroup
            Resource:
              - !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:group/${ClusterName}/*"
          - Effect: Allow
            Action:
              - logs:CreateLogDelivery
              - logs:PutResourcePolicy
              - logs:DescribeResourcePolicies
              - logs:DescribeLogGroups
            Resource:
              - !GetAtt LogGroup.Arn
      Roles:
        - !Ref IAMRole
  IAMRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - kafkaconnect.amazonaws.com
            Action:
              - sts:AssumeRole
      Description: for MSK Connect
  LogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      RetentionInDays: 7

4. PostgreSQLにデータベース・テーブル作成

BastionからPostgreSQLに接続し、データベース・テーブルを作成します。
Bastionにはpostgresqlをインストールし、psqlコマンドが使用できるようにしておきます。
データベース名はdebezium-testとして作成しました。

  1. Bastionから以下のコマンドでPostgreSQLに接続
    • psql -h <DBインスタンスのエンドポイント> -p 5432 -U <DBのユーザー名> -d postgres
      
  2. パスワードを入力し、ログインする
  3. 以下のコマンドでデータベース作成し、接続する
    • CREATE DATABASE "debezium-test";
      \c debezium-test
      
  4. 以下のコマンドでテーブル作成
    • CREATE TABLE users (
      id int,
      name varchar(32)
      );
      
      ALTER TABLE users REPLICA IDENTITY FULL;
      

5. MSK Connectの設定

  1. コネクタプラグイン格納用のS3バケットを事前に準備
  2. https://debezium.io/releases/ から最新のDebeziumコネクターをダウンロード
    • 検証時は1.7.1のPostgres Connector Pluginを使用
  3. tarコマンド、zipコマンドが使用できる環境で、アップロードするための構成に変更
    • tar xvf debezium-connector-postgres-1.7.1.Final-plugin.tar.gz
      cd debezium-connector-postgres
      zip -9 ../debezium-connector-postgres-1.7.1.zip *
      cd ..
      
  4. debezium-connector-postgres-1.7.1.zipをS3バケットにアップロード
  5. AWSマネジメントコンソールのMSKサービスのメニュー「コネクタ」画面を開く
  6. 「コネクタを作成」を押下
  7. 「カスタムプラグインタイプ」の項目で「カスタムプラグインを作成」を選択
  8. 「S3 URI - カスタムプラグインオブジェクト」では手順4でアップロードしたzipファイルを設定する
  9. カスタムプラグイン名を設定して「次へ」を押下
  10. コネクタのプロパティ画面に遷移するので以下の設定をし、「次へ」を押下
    • 基本情報-コネクタ名:任意のコネクター名を設定
    • Apache Kafka-クラスタータイプ:MSKクラスタ
      • CloudFormationで作成したMSKクラスターを選択
      • 認証メソッドは「IAM」を選択
    • コネクタ設定
      • connector.class=io.debezium.connector.postgresql.PostgresConnector
        tasks.max=1
        database.hostname=<DBインスタンスのエンドポイント>
        database.dbname=<データベース名>
        database.port=5432
        database.user=<DBのユーザー名>
        database.password=<DBのパスワード>
        database.server.id=123456
        database.server.name=debezium-test-db
        database.history.kafka.bootstrap.servers=<MSKのBootstrapServersのエンドポイント(認証タイプ「IAM」)>
        database.history.consumer.security.protocol=SASL_SSL
        database.history.consumer.sasl.mechanism=AWS_MSK_IAM
        database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
        database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
        database.history.producer.security.protocol=SASL_SSL
        database.history.producer.sasl.mechanism=AWS_MSK_IAM
        database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
        database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
        include.schema.changes=true
        plugin.name=pgoutput
        
    • コネクタのキャパシティー:(すべてデフォルト)
    • ワーカー設定:MSK のデフォルト設定を使用
    • アクセス許可:CloudFormationで作成したIAMロール
  11. 「次へ」を押下
  12. ログ配信で「Amazon CloudWatch Logs への配信」を選択
  13. CloudFormationで作成したロググループを設定し、「次へ」を選択
  14. 設定内容を確認し、「コネクタを作成」を押下
  15. コネクタの状態が「Active」になるのを待つ

本検証のコネクタが「Failed」になることがしばしばありました。
なぜFailedになっているのかは、コネクタのログを見ないと推測できないようです。
ログ配信設定は有効にしてお試しすることを推奨します。
(本記事ではCloudWatch Logsを使用してます。)

Kafka Topicの確認

BastionにKafkaクライアントツールをインストールしておきます。
本検証では、v2.8.1を使用しました。
Topicにどんなメッセージが格納されるのか確認します。

以下のコマンドでTopic一覧を確認します。

<Kafkaクライアントツールインストールパス>/bin/kafka-topic.sh --list --zookeeper <MSKクラスターのzookeeperのエンドポイント>

DBに何も操作をしていない時点では、以下のTopicが存在します。
__amazon_msk_connect_*はコネクタによって自動的に作成されたTopicです。

__amazon_msk_canary
__amazon_msk_connect_configs_debezium-test_067bcbed-2e35-492b-8c4c-84ff98c90bcc-4
__amazon_msk_connect_offsets_debezium-test_067bcbed-2e35-492b-8c4c-84ff98c90bcc-4
__amazon_msk_connect_status_debezium-test_067bcbed-2e35-492b-8c4c-84ff98c90bcc-4
__consumer_offsets

INSERT文

試しに適当なINSERT文を実行します。

INSERT INTO users VALUES(1,'piyomaru');

DB操作を行った後、Topic一覧を確認すると、
debezium-test-db.public.usersというTopicが自動的に生成されていました。
以下のコマンドで、Topicのメッセージを確認します。

<Kafkaクライアントツールインストールパス>/bin/kafka-console-consumer.sh --bootstrap-server <MSKクラスターのBootstrap Serverのエンドポイント> --topic <Topic名> --from-beginning

以下のメッセージが格納されていました。

Struct{after=Struct{id=1,name=piyomaru},source=Struct{version=1.7.1.Final,connector=postgresql,name=debezium-test-db,ts_ms=1639304674844,db=debezium-test,sequence=["1207988808","1275070024"],schema=public,table=users,txId=597,lsn=1275070024},op=c,ts_ms=1639304675254}

他にも実行してみます。

UPDATE文

UPDATE users SET name="Piyomaru" WHERE id=1;

メッセージ

Struct{before=Struct{id=1,name=piyomaru},after=Struct{id=1,name=Piyomaru},source=Struct{version=1.7.1.Final,connector=postgresql,name=debezium-test-db,ts_ms=1639305735822,db=debezium-test,sequence=["1543504608","1543505184"],schema=public,table=users,txId=603,lsn=1543505184},op=u,ts_ms=1639305736034}

DELETE文

DELETE FROM users WHERE id=2;

メッセージ

Struct{before=Struct{id=2,name=piyokichi},source=Struct{version=1.7.1.Final,connector=postgresql,name=debezium-test-db,ts_ms=1639305770016,db=debezium-test,sequence=["1543505456","1543505512"],schema=public,table=users,txId=604,lsn=1543505512},op=d,ts_ms=1639305770418}

DBの操作をTopicに格納できていることが確認できました。

おわりに

マイクロサービスが個別にDBを持つ構成の場合に、Kafka、Kafka Connect(Debezium Connector)を活用することで、各DB間の整合性を保つ仕組みを構築することができるようです。(以下、参考記事)

speakerdeck.com

今回の検証ではここまでできませんでしたが、とても興味深いので、別の機会に検証したいと考えてます。

今年はKafka漬けの一年でした。少し仲良くなれたと思います。
よいお年を!

参考