A look inside Kafka Mirrormaker 2
In our previous blog on A Case for Mirromaker 2, we had discussed how enterprises rely on Apache Kafka as an essential component of their data pipelines and require that the data availability and durability guarantees cover for entire cluster or datacenter failures. As we had discussed in the blog, the current Apache Kafka solution with Mirrormaker 1 has known limitations in providing an enterprise managed disaster recovery solution.
MM2 (KIP-382) fixes the limitations of Mirrormaker 1 with the ability to dynamically change configurations, keep the topic properties in sync across clusters and improve performance significantly by reducing rebalances to a minimum. Moreover, handling active-active clusters and disaster recovery are use cases that MM2 supports out of the box.
MirrorMaker2 Architecture
MM2 is based on the Kafka Connect framework and can be viewed at its core as a combination of a Kafka source and sink connector. In a typical Connect configuration, the source-connector writes data into a Kafka cluster from an external source and the sink-connector reads data from a Kafka cluster and writes to an external repository. As with MM1, the pattern of remote-consume and local-produce is recommended, thus in the simplest source-target replication pair, the MM2 Connect cluster is paired with the target Kafka cluster. Connect internally always needs a Kafka cluster to store its state and this is called the “primary” cluster which in this case would be the target cluster. In settings where there are multiple clusters across multiple data centers in active-active settings, it would be prohibitive to have an MM2 cluster for each target cluster. In MM2 only one connect cluster is needed for all the cross-cluster replications between a pair of datacenters. Now if we simply take a Kafka source and sink connector and deploy them in tandem to do replication, the data would need to hop through an intermediate Kafka cluster. MM2 avoids this unnecessary data copying by a direct passthrough from source to sink.
In MM1 the topic name at the source is typically the same at the target cluster and is automatically created in the downstream cluster. Such a naming process causes a bidirectional active-active setup to create an infinite loop. MM2 fixes this by automatically adding a preconfigured prefix (e.g, the source cluster alias which is human-readable alias of the clusterid) to the target topic name.
For example, consider two clusters in two datacenters DC-X and DC-Y replicating in an active-active setup ,
DC-X Topics: Topic_1, Topic_2, …
DC-Y Topics: Topic_A, Topic_B, …
If MM2 is setup to replicate from DC-X to DC-Y and vice versa the following topics will exist in both clusters:
DC-X Topics: Topic_1, Topic_2, …, DC-Y.Topic_A, DC-Y.Topic_B,…
DC-Y Topics: Topic_A, Topic_B, …, DC-X.Topic_1, DC-X.Topic_2,…
MM2 filters out any topics that carry the target cluster name in the prefix. A consumer can subscribe to the super topic e.g., “*TopicA” to consume from the source cluster and continue consuming from the target cluster after failover.
More Balance less Rebalance
Internally, MirrorMaker2 uses the Kafka Connect framework which in turn use the Kafka high level consumer to read data from Kafka. Kafka high level consumer coordinates such that the partitions being consumed in a consumer group are balanced across the group and any change in metadata triggers a consumer rebalance. Similarly, each time there is a change in topics, say when a new topic is created or an old topic is deleted, or a partition count is changed, or there is a source cluster change event, or when Connect nodes are bounced for a software upgrade, or the number of Connect workers are changed or worker configuration is changed it triggers a Connect workers cycle of stop/rebalance/start. Frequent rebalances cause hiccups and are bad for the mirroring throughput.
In MM2 the rebalances that are triggered due to change in the topics (when a new topic is created or added to the whitelist or source cluster change events) are avoided by using the low-level consumer to subscribe to a given list of partitions. MM2 uses an internal mechanism to farm out the partitions among these workers/consumers. Instead of the Connect worker’s high-level consumer directly subscribing to the source cluster partitions MM2 will manage this assignment in the Connector leader or controller. The controller tracks the changes at the source cluster and then farms out the partitions to the workers. The workers use the low-level consumer to directly subscribe to the partitions that were assigned to them by the controller thereby eliminating a majority of rebalances.
Thus, any change to the number of topics and partitions does not cause a full rebalance. There are however rebalances triggered by changes to the Connect cluster itself (more worker nodes etc) that cannot be avoided. These changes in most cases are much more infrequent than topic changes.
Offset Mapping
In Mirromaker1, the offsets of a topic in the source cluster and the offset of the replica topic at the target cluster can be completely different based on the point in the topic lifetime that replication began. Thus the committed offsets in the consumer offsets topic are tracking a completely different location at the source than at the target. If the consumers make a switch to the target cluster they cannot simply use the value of the last committed offset at the source to continue. One approach to deal with this offset mismatch is to rely on Kafka’s support of timestamps when doing a failover. Kafka (v0.10) added support for timestamps that are set by the producer at message create time (or by the broker when the message is written). If the timestamp is carried forward by Mirroring process then the target cluster will have the timestamp of the source message.
Say a consumer at the source cluster had committed offset O_s (in the __consumer_offsets topic) and that message had a timestamp T_s. When the consumer fails-over to the target cluster it has to find out the offset at the target O_t which corresponds to the original timestamp T_s. Now recall that mirroring is an asynchronous process so it is possible that the target cluster may not yet have seen the message with a timestamp T_s. If the consumer tries to find the offset corresponding to T_s it may result in an offset out of range which would reset the offsets to the beginning or end both of which are not desirable. To find a reasonable “guess” of the target offset one approach is to timebound the lag in the mirroring pipeline to ??. Knowing the maximum delay by which mirroring can lag, on failover the consumer can rewind to the offset corresponding to the timestamp T_s -?? by using consumer.offsetsForTimes() and then seeking to that offset consumer.seek(). This offset, however, may not exist as this timestamp may not map to an actual message timestamp. The consumer needs to “guess” to find one closest to that time and start consuming from that offset. A few messages may be reread in the process!
In MM2, this entire guesswork is eliminated. MM2 uses 2 internal topics to track the mapping of source and target offsets as well as the mapping between the source consumer_offsets to the target offset. The offset_sync topic at the target cluster maps the source topic, partition and offset with the corresponding offset at the target. MM2 gets the target offset from the RecordMetadata returned by producer.send().
For consumers relying on the __consumer_offsets topic to track progress, MM2 maps the consumer offsets in a separate log compacted __checkpoint topic per source cluster. MM2 periodically queries the source cluster for all committed offsets from all consumer groups, filters for those topics and consumer groups that need to be replicated and emits a message to the internal checkpoints topic at the target cluster. These checkpoint records are emitted at a configurable interval that can be dynamically controlled.
Using the checkpoint topic, a consumer, on failover, can directly determine (using the MM2 utilities) the target offset corresponding to the source committed offset that it needs to start consuming from.
More on how failover and failback is handled is discussed in the next blog in the series: “Handling Disaster Recovery with MM2”.
Consolidated MirrorMaker Clusters
Traditionally a MirrorMaker cluster is paired with the target cluster. Thus there is a mirroring cluster for each target cluster following a remote-consume and local-produce pattern. For example, for 2 data centers with 8 clusters each and 8 bidirectional replication pairs there are 16 mirrormaker clusters. For large data centers, this can significantly increase the operational cost. Ideally there should be one MirrorMaker cluster per target data center. Thus in the above example, there would be 2 Mirrormaker clusters, one in each data center.
Internally the Kafka Connect framework assumes that a source connector reads from an external source and writes to Kafka while a sink connector reads from Kafka and writes to an external sink. In MM2 there needs to be one connect cluster per target data center thus all clusters replicating across a pair of data centers can be handled by a single MM2 cluster.
Flexible Whitelists and Blacklists
To control what topics get replicated between the source and target cluster Mirrormaker uses whitelists and blacklists with regular expressions or explicit topic listings. But these are statically configured. Mostly when new topics are created that match the whitelist the new topic gets created at the target and the replication happens automatically. However, when the whitelist itself has to be updated, it requires mirrormaker instances to be bounced. Restarting mirrormaker each time the list changes creates backlogs in the replication pipeline causing operational pain points. In MM2 the configuration of the topic lists and regex can be changed dynamically using a REST API.
What is coming next in MirrorMaker 2
Cross-cluster Exactly Once Guarantee
Kafka provides support for exactly-once processing but that guarantee is provided only within a given Kafka cluster and does not apply across multiple clusters. Cross-cluster replication cannot directly take advantage of the exactly-once support within a Kafka cluster. This means MM2 can only provide at least once semantics when replicating data across the source and target clusters which implies there could be duplicate records downstream.
Does this mean MM2 cannot support exactly-once semantics for replicated data?
Actually, it can. Let’s dig a bit into where exactly once processing breaks in replication. If you look at replication to be, at its core, a consume from source cluster and produce to target cluster then the consumer state is being tracked by a write to the __consumer_offsets topic at the source while the data is being produced or written to the topic at the target cluster. These two “writes” cannot be controlled by an atomic transaction as they span two different clusters and there is always a chance that on failure they will diverge causing duplicates.
How do we get these 2 writes to be in an atomic transaction? Hint: In MM2 we have a checkpoint topic that is at the target cluster tracking the state of the consumer at the source. MM2 can thus provide exactly-once semantics by leveraging the checkpoint topic write to be in the same transaction as the product to the target topic. MM2 can support exactly-once semantics even across clusters. This feature will be coming soon in MM2’s next iteration.
High Throughput Identity Mirroring
Let’s go back yet again to replication being basically a combo deal of consume from source and produce to target. The APIs used to read and write data are that of any standard producer and consumer which are both record-oriented. In replication, if the source and target are identical in terms of having the same topic, same number of partitions, same hash function, same compression, same serde we call this identity mirroring. In this scenario, which is in fact quite common, the ideal case would be to read a batch of records as a byte stream and write them out without doing any processing. The batch does not need to be decompressed and compressed and deserialized and serialized if nothing had to be changed. Identity mirroring can have a much higher throughput than the traditional approach. This is another feature that will be coming soon in MM2.