ぴよ丸水産

週末ファゴッティストによる技術ブログ

【Apache Kafka】Mirror Maker2でKafkaクラスターをレプリケーション

はじめに

Apache KafkaクラスターのレプリケーションツールMirror Maker2を触ってみました。
今回の検証ではActive/Stanbyの構成を想定しています。
実行手順と気づいた注意点を記載します。

実行環境

Apache Kafka動作環境

ベストプラクティスでは、Mirror Maker2はセカンダリクラスター側で動作させることを推奨しています。
※使用サービスが異なるのは、深い意味はないです。(MSKの検証してた名残です)
※本手順はMSK to MSK(Mirror Maker2は別途EC2で動作)/ MSK to EC2 / EC2 to EC2でも応用できると思います。
※バージョンが異なるのも深い意味はないです。(後から気づいた)

Producer/Consumerアプリケーション

検証には自作のquorkusアプリケーションを使用しています。
Producer/ConsumerともにEKS上のDeploymentとして起動させています。
本題ではないので詳細は割愛しますが、概要は以下の通りです。

  • Producer
    • HTTPリクエストのPOSTメソッドで送信されたデータをKafkaのmessagesトピックに格納する
  • Consumer
    • Kafkaのmessagesトピックに格納されたデータを受信しログに出力する

※アプリケーションは環境変数でKafkaの向き先を切り替えられるように仕込んであります。

Mirror Maker2実行手順

※本手順の前提として、Mirror Maker2実行環境のKafkaインストールパスはKAFKA_PATHと表現します。

1. 設定ファイル

  • KAFKA_PATH/config/connect-mirror-maker.properties
# Licensed to the Apache Software Foundation (ASF) under A or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details

# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties

# specify any number of cluster aliases
clusters = primary, secondary

# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
primary.bootstrap.servers = localhost:9092
secondary.bootstrap.servers = b-1.msk-cluster-name.xxxxxx.kafka.ap-northeast-1.amazonaws.com:9092,msk-cluster-name.xxxxxx.kafka.ap-northeast-1.amazonaws.com:9092

# enable and configure individual replication flows
primary->secondary.enabled = true

# regex which defines which topics gets replicated. For eg "foo-.*"
primary->secondary.topics = .*

secondary->primary.enabled = false
secondary->primary.topics = .*

# Setting replication factor of newly created remote topics
replication.factor=2

############################# Internal Topic Settings  #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
primary.checkpoints.topic.replication.factor=1
primary.heartbeats.topic.replication.factor=1
primary.offset-syncs.topic.replication.factor=1

# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
primary.offset.storage.replication.factor=1
primary.status.storage.replication.factor=1
primary.config.storage.replication.factor=1

secondary.offset.storage.replication.factor=2
secondary.status.storage.replication.factor=2
secondary.config.storage.replication.factor=2

# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
primary->secondary.sync.group.offsets.enabled = true

ポイント

  • primary->secondary.enabled = truesecondary->primary.enabled = falseと設定することで、トピックの同期は、プライマリークラスターからセカンダリクラスターの一方向のみで実施されます。セカンダリクラスターからプライマリークラスターには同期されません。
    • Active/Active構成の場合は、primary->secondary.enabled = truesecondary->primary.enabled = trueとします。
  • 移行先(MSK)には(移行元論理名).(トピック名)のトピックが自動的に作成されます。
    • セパレーターはデフォルトで.ですが、replication.policy.separatorでカスタム可能です。
  • *.replication.factorに設定する数字は、ブローカー台数と合わせます。
    • 移行元と移行先のブローカー台数が一致する場合は設定はまとめられます。
      • offset.storage.replication.factor=nのように定義します。
    • 移行元と移行先のブローカー台数が異なる場合は、それぞれに設定する必要があるようです。
      • 今回は異なるので、(論理名).offset.storage.replication.factor=nのように記述しています。
  • primary->secondary.sync.group.offsets.enabled = trueを設定することで、オフセット(Consumerがどのメッセージまで読み込んだか)の情報も同期されます。

2. Producer/Consumerアプリケーション起動

Kafkaの向き先をプライマリークラスターに設定してProducer/Consumerアプリケーションを起動します。

3. メッセージ発行シェルスクリプト実行

以下のシェルスクリプトを実行し、Producerアプリケーションに対して、HTTPリクエストで毎秒データをPOSTします。
送信したデータはsend_message.logに追記されるようにしています。

#!/bin/bash

url=http://(ProducerアプリケーションのURL):8080
message=`date`
curl --header "Content-Type: text/plain" --request POST --data "$[start] {message}" ${url}

while true; do
  message=`date`
  echo "${message}" >> send_message.log
  curl --header "Content-Type: text/plain" --request POST --data "${message}" ${url}
  sleep 1
done

4. Mirror Maker2実行

以下のコマンドでMirror Maker2を実行します。

KAFKA_URL/bin/connect-mirror-maker.sh kafka_2.13-2.7.0/config/connect-mirror-maker.properties

5. ProducerアプリケーションのKafkaの向き先をセカンダリクラスターに変更

マニフェストファイルのKafkaのブローカーを指定している環境変数の値を修正します。
マニフェストファイルをkubectl apply -f (マニフェストファイル名)で読み込み、アプリケーションをデプロイします。

6. ConsumerアプリケーションのKafkaの向き先をセカンダリクラスターに変更

起動しているConsumerアプリケーションのログ出力が止まったら、
マニフェストファイルのKafkaのブローカーを指定している環境変数の値を修正します。
マニフェストファイルをkubectl apply -f (マニフェストファイル名)で読み込み、アプリケーションをデプロイします。

7. 送信データと受信データの確認

送信データ(send_message.log)と受信データ(Consumerコンテナログ出力)を確認します。

データを欠損させないための注意点

Consumerアプリケーション側の設定

  • Kafkaの向き先変更後のauto.offset.resetの設定をnoneとします。

    • デフォルト設定のlatestになっていると、移行先のKafkaからのメッセージ読み出し時点でオフセットはリセットされます。
    • 参考:Apache Kafka
  • consumer groupを定義します。

    • Consumer Groupが定義されていないと、Consumer Group IDはランダムな文字列が割り当てられるため、Kafka自身が、Kafkaの向き先を変える前後で別物のアプリケーションと認識するため、オフセット情報が引き継がれません

Mirror Maker2側の設定

  • sync.group.offsets.enabledを有効化します。(前述)

データを重複させないための注意点

※検証結果からの推測です。情報の確度は低いですが参考までに。

ConsumerアプリケーションのKafka向き先変更タイミング

ProducerアプリケーションとConsumerアプリケーションをほぼ同時にKafkaの向き先を変えてみたところ、
Consumerが受信したデータが重複していました。
おそらく、移行先KafkaにOffset情報が同期される前に、Consumerがメッセージの消費を開始したためだと思われます。

おわりに

Kafkaに集約するデータの欠損/重複を許容できるかは、システムの性格に依存すると思います。
要件に応じて参考にしていただければ幸いです。

参考

kafka.apache.org