Change Data Capture using Maxwell for MySQL
April 15, 2021
Robust database decoupling gives developers greater freedom when fine-tuning our service and infrastructure. Plus, it also makes our data traffic more efficient across the board. However, it doesn’t always come built-in. We asked one of our lead database admins to discuss how we achieved greater decoupling in MySQL
We decided to do something unconventional by looking for a Change Data Capture(CDC) solution from MySQL into, well, MySQL.
What we wanted to achieve was a better decoupling between our source and target databases. We wanted to:
Leave DDL changes unreplicated (including dangerous statements like drop database, drop table, truncate table, drop index, alter constraint, etc.);
Only replicate data changes;
Only replicate a small subset of tables;
Only replicate a subset of table columns;
As far as the consistency issue is concerned, we were blessed that eventual consistency was satisfactory for our purposes.
Eventual referential integrity was also satisfactory, so there were no foreign key constraints on the target database.
GTID and Galera Cluster support was an absolute must for us. We want to have MySQL Galera Clusters as both source and a target database servers.
Why didn’t we use MySQL’s replication? MySQL replication works very well if two databases or schemas have to be kept in 100% sync in terms of both data and structure. Every structure modification(DDL) is immediately replicated, which is exactly what we wanted to avoid.
Moreover, MySQL replication and Galera Clusters, especially when the target databases are Galera Clusters, is a whole other can of worms.
Here’s a short description of how the CDC solution works:
MySQL -> Maxwell CDC -> Kafka topic -> Maxwell-sink managed by Kafka Connect -> MySQL
Maxwell captures changes from the MySQL Galera Cluster binlog and puts them into the Kafka topic (queue) called Maxwell. The Maxwell-sink reads change records from the Kafka topic and applies them into the target MySQL database (Galera Cluster). The Maxwell-sink is managed by Kafka Connect.
We decided to use Maxwell’s Daemon as the MySQL Change Data Capture engine.
As an aside, the name “Maxwell’s Daemon” has quite a story behind it. Maxwell’s Daemon was originally a captivating thought experiment that is still provoking discussions more than a hundred years after its introduction.
What Maxwell’s Daemon does is consume MySQL binary logs and produce JSON messages of its own format. The messages are reasonably well-documented and simple.
Maxwell’s Daemon also has an initial data load procedure called bootstrapping. Bootstrapping can be started and stopped at any time. The one problem with Maxwell’s bootstrap is that changes (at least those being made to the bootstrapped table) are halted while the table is being bootstrapped. But once the bootstrap finishes, the flow of changes continues. I strongly recommend reading Maxwell’s description of how to configure and run the bootstrap.
Maxwell’s Daemon integrates with a number of queuing servers to transfer the captured change messages. We chose Kafka.
So far so good. However, one part is still missing here. We need some sort of software to read the change messages and apply them to the target MySQL database. (The reason Maxwell has its own message format is that Kafka Connect came later than Maxwell’s Daemon. See this discussion for more info.)
Our solution was to fork this particular Kafka Connect sink, which is capable of parsing Maxwell’s messages and applying them to the MySQL target.
What we did with the Maxwell-sink fork
We made some major changes to the Maxwell-sink fork:
Update/insert/delete logic changed (see below for more details);
Support for primary key column updates;
Support for column filtering;
Modified diagnostic logging, including more informative diagnostic messages on errors;
Support for Maxwell’s bootstrap messages;
The last-applied GTID is recorded for diagnostic purposes;
Throw exceptions and stop on any “apply” errors (we do not want to lose a single message from Maxwell);
Changed the logic for overlapping data. If a record already exists, it is overwritten.
Our design criteria was to handle overlapping data. We did not want the CDC flow to stop if:
a record to be inserted already exists (in which case we just update it);
a record to be deleted has already been deleted or never existed in the first place;
or a record to be updated doesn’t exist (in which case we create it).
That covers pretty much every case of data overlap.
The discerning reader will notice that handling data in this way can lead to issues with referential integrity and, consequently, with foreign keys. However, as we said at the beginning, we have no foreign keys in the target database. We ensure that every child record eventually makes it to the target database. The same rule applies to every parent record. Our purpose is to achieve eventual consistency, i.e. making sure that the newest data will arrive eventually.
High availability considerations
Maxwell’s Daemon is a well-thought-out CDC engine.
Attempting to start more than one instance of the Daemon will produce an error. Judging from the error message, Maxwell uses some sort of lock-step method. It complains about an out-of-order step sequence and quits.
This property is extremely important. It’s easy to imagine that two parallel Maxwell’s streams can create data issues on the receiving end. If those two (or more) streams run in parallel indefinitely, then things might turn out to be fine, apart from the performance impact. But if one stream ends at some point, you may very well lose a delete. Deletes, as operations to be replicated, are in their own class and have to be dealt with carefully.
On the sink end, we rely on Kafka-Connect’s ability to ensure that only one sink is running at a time.
Ultimately, we need to make sure that we have at least 2 instances of every component: Maxwell’s Daemon, a multi-node Kafka cluster with all Kafka topics properly replicated, the Maxwell-sink, and, last but not least, multi-node MySQL Galera Clusters for both source and target databases.
While testing Kafka Connect, we discovered a minor issue with Kafka Connect’s configuration. The default value of consumer.max.poll.interval.ms was quite large. As a result, it took quite some time for the sink to rebalance to a healthy node after any of the Kafka nodes crashed. Setting it to 30 seconds (use the value 30000) helped.
Here's one of the documents on the issue.
This solution has been running in the development environment for over half a year with no issues.
However, there is always (at least) one last thing to consider. One such issue is rollback.
Maxwell’s Daemon produces transactional information in its messages, including a xid (a transaction id) and commit: true fields. See this document for more.
Fortunately, our source database binary log stream has never produced records that have been rolled back. Our application code does use transactions and potentially even a rollback, but it looks like the transactions are rolled back before ever being entered into the binary logs. According to this document, MySQL should not write records into the binary log that are uncommitted or that have potentially been rolled back, but there is an interesting Debezium document on the matter.
In that document, it says:
“Under specific conditions it is possible that MySQL binlog contains uncommitted data finished by a ROLLBACK statement. Typical examples are using savepoints or mixing temporary and regular table changes in a single transaction.”
Regardless, we regularly execute simple source and destination table comparisons to ensure that the data matches up to a recent fixed point in time. So far, all is good.
We have given the solution a good run, examined it, and learned from it. We eventually developed yet another solution based on the first. It has its own drawbacks, but also has some very nice properties that were missing in the first version. You know what they say – good things come to those who iterate. But that’s a story for another Engineering blog post.