Data Ingestion From RDBMS By Leveraging Confluent’s JDBC Kafka Connector
Kafka Connect assumes a significant part for streaming data between Apache Kafka and other data systems. As a tool, it holds the responsibility of a scalable and reliable way to move the data in and out of Apache Kafka. Importing data from the Database set to Apache Kafka is surely perhaps the most well-known use instance of JDBC Connector (Source & Sink) that belongs to Kafka Connect.
This article aims to elaborate the steps and procedure to integrate the Confluent’s JDBC Kafka connector with an operational multi-broker Apache Kafka Cluster for ingesting data from the functioning MySQL Database to Kafka topic. Since we are not going to export/pull out data from the topic via consumer and pass through any subsequent data pipeline, the Schema Registry has not clubbed/configured with the existing multi-broker Kafka cluster to support schema evolution, compatibility, etc. The Schema Registry is not mandatory to integrate or run any Kafka Connector like JDBC, File, etc. but it supports to use easily Avro, Protobuf, and JSON Schema as common data formats for the Kafka records that Kafka Connectors read from and write to. The Confluent Platform includes the Apache Kafka, additional tools, and services including Schema Registry but comes under community and commercial license.
Article Structure
This article has structured in seven parts:
- Assumptions w.r.t environment details about the Kafka Cluster
- Build/Download Confluent’s JDBC Source connector and installation
- Allow remote access to functioning MySQL Database
- Configurations changes/update on the Kafka Cluster as well as JDBC Connector
- Execution/Running the JDBC connector
- Data Verification in Messages
- Final Note
Environment details and Assumptions
Here we are considering the operational Kafka cluster having four nodes and each one is already installed and running Kafka of version 2.6.0 with Zookeeper (V 3.5.6) on top of OS Ubuntu 14.04 LTS and java version “1.8.0_101”. As mentioned, there won’t be any operation on the post ingested data, the Confluent’s Schema Registry won’t be running or included in this exercise. The transported data from MySQL database would be in raw JSON document. But It is advisable to integrate and run the Schema Registry in real time scenarios or in the production environment so that compatible data format can be maintained across the data sources (RDBMS) and multiple consumers along with enforced compatibility rules. You can read here how to integrate Confluent Schema Registry with multi-node Kafka cluster.
Build/Download Confluent’s JDBC Source connector and Installation
Using the Confluent Hub client, the JDBC connectors can be installed. But we downloaded manually confluentinc-kafka-connect-jdbc-10.0.2.zip file from here and extracted on a healthy node in the cluster. The version is 10.0.2 and the license is free to use.
You can manually build the development version by downloading the source code available at https://github.com/confluentinc/kafka-connect-jdbc .
Downloaded confluentinc-kafka-connect-jdbc-10.0.1.zip copied to the node, extracted, and eventually copied to the location. The location should be under /usr/share/java and to make it more specific from other connectors, created one sub folder named “kafka-connect-jdbc” with sudo privileges.
Not mandatory to copy all the jar files available inside the extracted folder, only kafka-connect-jdbc-10.0.1.jar should be copied to /usr/share/java/kafka-connect-jdbc as rest of the jars are already available inside Apache Kafka’s lib folder.
As we are importing data from the MySQL database, MySQL java Connector is required by the Kafka JDBC source connector to establish the JDBC connection. The jar file “mysql-connector-java-5.1.48.jar” too should be copied under /usr/share/java/kafka-connect-jdbc. This MySQL specific driver class can be downloaded from here.
Allow remote access to functioning MySQL Database
We have changed in the configurations of MySQL database server instance that runs on a separate node in the cluster in order to make it accessible from another node with root user. Normally, MySQL database server can be accessed using a separate database user but decided not to create a separate user. By default, MySQL server instance is only configured to listen for local connections. To enable in order to listen for an external IP address, we need to modify my.cnf file where MySQL server is running.
>sudo vi /etc/mysql/my.cnf
By default, this value is set to 127.0.0.1 for the key bind-address that means the MySQL server will only look for local connections. To avoid, we replaced 127.0.0.1 with the IP address of the node which is 192.168.10.130
After changing this line, save and close the file then restart the MySQL service to put the changes we made to my.cnf into effect.
> sudo systemctl restart mysql
To grants the root user with full access to the database on the remote host that is 192.168.10.110 (in this node Kafka JDBC Connector would be running), log in to MySQL server which is running at node 192.168.10.130 in the cluster and connect to the MySQL database as the root user. After opening then MySQL client, execute the following sql statements to grant all.
GRANT ALL ON <local database name>.* TO <remote username>@<remote node IP address> IDENTIFIED BY ‘<database user password>’;
In our scenario, it was
GRANT ALL ON Order_Streaming.* TO root@192.168.10.110 IDENTIFIED BY ‘mypassword’;
then flush previleges and exit from the MySQL client.
Configurations changes/update on the Kafka Cluster as well as JDBC Connector
a. connect-standalone.properties
As we are going to run the JDBC connector on standalone mode in one of the node of multi-broker Kafka cluster, the connect-standalone.properties file should be updated available under the config directory of Apache Kafka. In Standalone mode, a single process executes all connectors and their associated tasks. By default, the key ” bootstrap.servers ” in the properties file available with value ” localhost:9092” but the JDBC connectors would runs in a multi-node/multi-broker cluster so it has to be updated with the IP address of each node and port separated by comma (,)
bootstrap.servers=192.168.10.130:9092, 192.168.10.150:9092, 192.168.10.110:9092
Since, we won’t be preserving the versioned history of schemas while streaming data from MYSQL database to Kafka Topic, Schema Registry is not used or coupled with the cluster. Because of this reason the following keys need to set the value as “false”. By default values are as “true”.
key.converter.schemas.enable=false
value.converter.schemas.enable=false
Similarly the values of ” key.converter ” and ” value.converter ” should not be updated as JDBC Kafka Connect won’t use with Schema Registry and continue with default “JsonConverter” that serializes the message keys and values into JSON documents.
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.file.filename is the most important setting when running Connect in standalone mode. It defines where JDBC Connect should store its offset data. We updated as
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms defines the interval at which the worker tries to commit offsets for tasks. We kept as default value
offset.flush.interval.ms=10000
The value of the key “plugin.path” should be updated as “/usr/share/java” as mentioned above in installation section where the jar files kafka-connect-jdbc-10.0.1.jar and mysql-connector-java-5.1.48.jar are available.
plugin.path=/usr/share/java
b. connect-jdbc-mysql-source.properties
In the same node where the JDBC connector would run, created another configuration file with name “connect-jdbc-myql-source.properties” under the config directory of Apache Kafka. Update the “connect-jdbc-myql-source.properties” file with below properties.
name= local-mysql-source
name is a user-specified name for the connector instance
connector.class=io.confluent.connect.jdbc.JdbcSourcceConnector
connector.class specifies the implementing class, basically the kind of connector that is available in kafka-connect-jdbc-10.0.1.jar
tax.max=1
tasks.max specifies how many instances of our source connector should run in parallel
connection.url=jdbc:mysql://192.168.10.130:3306/order_streaming
connection.url specifies MySQL database server running on different node in the cluster with the name of the database
connection.user=root
connection.user specifies the user name of the MySQL database server
connection.password=XXXX
connection.password specifies password to login in MySQL
mode=bulk
mode=bulk performs a bulk load of the entire table each time it is polled. In bulk mode, the connector will load all the selected tables in each iteration. It can be useful if a periodical backup, or dumping the entire database.
topic.prefix=connect-test
Prefix to prepend to table names to generate the name of the Apache Kafka topic to publish data to.
Already created one topic named connect-test with replication factor 3 and partitions 5
Execution/Running the JDBC Connector
There is no complexity for running the JDBC Kafka connector on multi-node Apache Kafka cluster. The connector can be run from the same node where it installed and configured, make sure MySQL database server should be accessible that running on another node in the cluster.
If the multi-node Kafka cluster is up and running with a topic named in connect-jdbc-myql-source.properties, open a new terminal on the installed JDBC connector node and run the following command inside bin directory.
/kafka/bin $ ./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-jdbc-myql-source.properties
After running the above command, if all the settings, configurations correct then following log would occurs in the same terminal.
[2021–03–08 04:22:04,818] INFO Using JDBC dialect MySql (io.confluent.connect.jdbc.source.JdbcSourceTask:102)
[2021–03–08 04:22:04,819] INFO Attempting to open connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider:82)
[2021–03–08 04:22:04,887] INFO Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:261)
Besides, using built-in Kafka Connect REST API that runs on default port 8083 we can check the status of the JDBC connector. This is useful for getting status information, adding and removing connectors without stopping the process. Open another terminal and type the following command
$ curl -X GET http://IP of the node where connector is running:8083/connectors/{name of the connector}/status
$ curl -X GET http://192.168.10.110:8083/connectors/{local-mysql-source}/status
And above would returns
{“name”:”local-mysql-source”,”connector”:{“state”:”RUNNING”,”worker_id”:”192.168.10.110:8083″},”tasks”:[{“id”:0,”state“:”RUNNING“,”worker_id”:”192.168.10.110:8083″}],”type”:”source”}
Open another terminal to execute built-in console consumer script so that ingested data from MySQL database can be viewed immediately in raw JSON format.
Data Verification in Messages.
In this exercise, we have imported very small of dummy data from a database “order_streaming” having only one table named “order_details” with 3 rows.
As mentioned in the beginning that we have not clubbed or executed any data pipeline via consumer once data arrived on the topic, so verified or displayed the messages on the console only by running Apache Kafka’s built-in console consumer script. Due to absence of the Schema Registry and designed schema definition in it, the data in the messages are displaying in a raw JSON document.
In real time implementations if we are continuously ingesting data from the databases to Apache Kafka and subsequently passing though multiple data pipelines into other data systems, it is advisable to integrate the Schema Registry to reduce operational complexity. Click here to know more about Confluent Schema Registry.
Final Note
Ingesting data from RDBMS to Apache Kafka by leveraging JDBC Kafka Connector is a good way to go even though having some downsides. Without writing a single line of code, we can start shipping data continuously from tables and subsequently sink to other data systems. However, can’t be utilized completely for event streaming from the database tables. This connector is limited only to detecting new rows on incrementing columns. To assume the column is updated with a new row, the table should have an additional timestamp column. By using incrementing + timestamp columns, the JDBC connector detect and ingest the newly added raw data to the Kafka topic. Besides, JDBC Connector also lags to listen to DELETE event on the rows of tables as it uses only SELECT queries to retrieve data.
Hope you have enjoyed this read. Please like and share if you feel this composed is valuable.
Ref:-