【Apache Kafka】Mirror Maker2でKafkaクラスターをレプリケーション
はじめに
Apache KafkaクラスターのレプリケーションツールMirror Maker2を触ってみました。
今回の検証ではActive/Stanbyの構成を想定しています。
実行手順と気づいた注意点を記載します。
実行環境
Apache Kafka動作環境
※ベストプラクティスでは、Mirror Maker2はセカンダリークラスター側で動作させることを推奨しています。
※使用サービスが異なるのは、深い意味はないです。(MSKの検証してた名残です)
※本手順はMSK to MSK(Mirror Maker2は別途EC2で動作)/ MSK to EC2 / EC2 to EC2でも応用できると思います。
※バージョンが異なるのも深い意味はないです。(後から気づいた)
Producer/Consumerアプリケーション
検証には自作のquorkusアプリケーションを使用しています。
Producer/ConsumerともにEKS上のDeploymentとして起動させています。
本題ではないので詳細は割愛しますが、概要は以下の通りです。
- Producer
- HTTPリクエストのPOSTメソッドで送信されたデータをKafkaのmessagesトピックに格納する
- Consumer
- Kafkaのmessagesトピックに格納されたデータを受信しログに出力する
※アプリケーションは環境変数でKafkaの向き先を切り替えられるように仕込んであります。
Mirror Maker2実行手順
※本手順の前提として、Mirror Maker2実行環境のKafkaインストールパスはKAFKA_PATH
と表現します。
1. 設定ファイル
- KAFKA_PATH/config/connect-mirror-maker.properties
# Licensed to the Apache Software Foundation (ASF) under A or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see org.apache.kafka.clients.consumer.ConsumerConfig for more details # Sample MirrorMaker 2.0 top-level configuration file # Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties # specify any number of cluster aliases clusters = primary, secondary # connection information for each cluster # This is a comma separated host:port pairs for each cluster # for e.g. "A_host1:9092, A_host2:9092, A_host3:9092" primary.bootstrap.servers = localhost:9092 secondary.bootstrap.servers = b-1.msk-cluster-name.xxxxxx.kafka.ap-northeast-1.amazonaws.com:9092,msk-cluster-name.xxxxxx.kafka.ap-northeast-1.amazonaws.com:9092 # enable and configure individual replication flows primary->secondary.enabled = true # regex which defines which topics gets replicated. For eg "foo-.*" primary->secondary.topics = .* secondary->primary.enabled = false secondary->primary.topics = .* # Setting replication factor of newly created remote topics replication.factor=2 ############################# Internal Topic Settings ############################# # The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and # "mm2-offset-syncs.B.internal" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. primary.checkpoints.topic.replication.factor=1 primary.heartbeats.topic.replication.factor=1 primary.offset-syncs.topic.replication.factor=1 # The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and # "mm2-status.B.internal" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. primary.offset.storage.replication.factor=1 primary.status.storage.replication.factor=1 primary.config.storage.replication.factor=1 secondary.offset.storage.replication.factor=2 secondary.status.storage.replication.factor=2 secondary.config.storage.replication.factor=2 # customize as needed # replication.policy.separator = _ # sync.topic.acls.enabled = false # emit.heartbeats.interval.seconds = 5 primary->secondary.sync.group.offsets.enabled = true
ポイント
primary->secondary.enabled = true
とsecondary->primary.enabled = false
と設定することで、トピックの同期は、プライマリークラスターからセカンダリークラスターの一方向のみで実施されます。セカンダリクラスターからプライマリークラスターには同期されません。- Active/Active構成の場合は、
primary->secondary.enabled = true
とsecondary->primary.enabled = true
とします。
- Active/Active構成の場合は、
- 移行先(MSK)には
(移行元論理名).(トピック名)
のトピックが自動的に作成されます。- セパレーターはデフォルトで
.
ですが、replication.policy.separator
でカスタム可能です。
- セパレーターはデフォルトで
*.replication.factor
に設定する数字は、ブローカー台数と合わせます。- 移行元と移行先のブローカー台数が一致する場合は設定はまとめられます。
offset.storage.replication.factor=n
のように定義します。
- 移行元と移行先のブローカー台数が異なる場合は、それぞれに設定する必要があるようです。
- 今回は異なるので、
(論理名).offset.storage.replication.factor=n
のように記述しています。
- 今回は異なるので、
- 移行元と移行先のブローカー台数が一致する場合は設定はまとめられます。
primary->secondary.sync.group.offsets.enabled = true
を設定することで、オフセット(Consumerがどのメッセージまで読み込んだか)の情報も同期されます。- 2.7.0はデフォルトは
false
のため、明示的に設定が必要です。2.8.0はデフォルトがtrue
になっています。 - KIP-545: support automated consumer offset sync across clusters in MM 2.0 - Apache Kafka - Apache Software Foundation
- 2.7.0はデフォルトは
2. Producer/Consumerアプリケーション起動
Kafkaの向き先をプライマリークラスターに設定してProducer/Consumerアプリケーションを起動します。
3. メッセージ発行シェルスクリプト実行
以下のシェルスクリプトを実行し、Producerアプリケーションに対して、HTTPリクエストで毎秒データをPOSTします。
送信したデータはsend_message.logに追記されるようにしています。
#!/bin/bash url=http://(ProducerアプリケーションのURL):8080 message=`date` curl --header "Content-Type: text/plain" --request POST --data "$[start] {message}" ${url} while true; do message=`date` echo "${message}" >> send_message.log curl --header "Content-Type: text/plain" --request POST --data "${message}" ${url} sleep 1 done
4. Mirror Maker2実行
以下のコマンドでMirror Maker2を実行します。
KAFKA_URL/bin/connect-mirror-maker.sh kafka_2.13-2.7.0/config/connect-mirror-maker.properties
5. ProducerアプリケーションのKafkaの向き先をセカンダリークラスターに変更
マニフェストファイルのKafkaのブローカーを指定している環境変数の値を修正します。
マニフェストファイルをkubectl apply -f (マニフェストファイル名)
で読み込み、アプリケーションをデプロイします。
6. ConsumerアプリケーションのKafkaの向き先をセカンダリークラスターに変更
起動しているConsumerアプリケーションのログ出力が止まったら、
マニフェストファイルのKafkaのブローカーを指定している環境変数の値を修正します。
マニフェストファイルをkubectl apply -f (マニフェストファイル名)
で読み込み、アプリケーションをデプロイします。
7. 送信データと受信データの確認
送信データ(send_message.log)と受信データ(Consumerコンテナログ出力)を確認します。
データを欠損させないための注意点
Consumerアプリケーション側の設定
Kafkaの向き先変更後の
auto.offset.reset
の設定をnone
とします。- デフォルト設定の
latest
になっていると、移行先のKafkaからのメッセージ読み出し時点でオフセットはリセットされます。 - 参考:Apache Kafka
- デフォルト設定の
consumer groupを定義します。
- Consumer Groupが定義されていないと、Consumer Group IDはランダムな文字列が割り当てられるため、Kafka自身が、Kafkaの向き先を変える前後で別物のアプリケーションと認識するため、オフセット情報が引き継がれません
Mirror Maker2側の設定
sync.group.offsets.enabled
を有効化します。(前述)
データを重複させないための注意点
※検証結果からの推測です。情報の確度は低いですが参考までに。
ConsumerアプリケーションのKafka向き先変更タイミング
ProducerアプリケーションとConsumerアプリケーションをほぼ同時にKafkaの向き先を変えてみたところ、
Consumerが受信したデータが重複していました。
おそらく、移行先KafkaにOffset情報が同期される前に、Consumerがメッセージの消費を開始したためだと思われます。
おわりに
Kafkaに集約するデータの欠損/重複を許容できるかは、システムの性格に依存すると思います。
要件に応じて参考にしていただければ幸いです。