Synchronizing data from Kafka to TDengine

This article discusses how to use the TDengine Kafka Connector to synchronize data from Kafka to TDengine and provides a sample script to test its in a data synchronization scenario.

Background

Kafka is a general-purpose message broker featuring a distributed architecture. You can use the Kafka Connect component to read from and write to Kafka, and its plug-ins can be used to read from and write to a variety of data sources. Kafka Connect supports fault tolerance, restarting, logging, elastic scaling, and serialization and deserialization.

To make integrating Kafka and TDengine a simpler process, the TDengine Team developed the TDengine Kafka Connector as a Kafka Connect plug-in. The TDengine Kafka Connector consists of the TDengine Source Connector and TDengine Sink Connector. In this article, the TDengine Sink Connector is used to integrate Kafka with TDengine.

TDengine Sink Connector implementation

The TDengine Sink Connector synchronizes the data from specified Kafka topics to a TDengine time-series database (TSDB) in batches or in real time.

Before running the TDengine Sink Connector, you must create a properties file. For information about the properties file, see the official documentation.

The implementation of the TDengine Sink Connector is as follows:

  1. The Kafka Connect framework starts a specified number of consumer threads.
  2. These consumers simultaneously subscribe to data and deserialize the data according to the values of key.converter and value.converter set in the configuration file.
  3. The Kafka Connect framework sends the deserialized data to a specified number of SinkTask instances.
  4. SinkTask uses the schemaless write interface provided by TDengine to write data into the database.

In this process, the first three steps are implemented automatically by the Kafka Connect framework, and the TDengine Sink Connector performs the final step on its own.

Supported data formats

The TDengine Sink Connector writes data in schemaless mode. It supports the InfluxDB line format, OpenTSDB telnet format, and OpenTSDB JSON format. You can modify the value of the db.schemaless parameter to choose the format that you want to use. As an example, the following configuration enables InfluxDB line format:

db.schemaless=line

If the data in your Kafka deployment is already in one of the three formats mentioned, set value.converter to the built-in Kafka Connect string converter:

value.converter=org.apache.kafka.connect.storage.StringConverter

Otherwise, you must implement your own converter to process your data into one of the supported formats. For more information, see Converters.

In this implementation, Kafka Connect is acting as the consumer. Consumer behavior can therefore be controlled by modifying the Kafka Connect configuration.

Topic configuration

The topics to which the consumer subscribes are controlled by the topics parameter. For all other parameters, you can override the default configuration by adding the parameter to the properties file with each parameter having the consumer.override prefix. For example, the following line changes the maximum records per poll to 3000:

consumer.override.max.poll.records=3000

Thread configuration

In a Kafka Connect sink connector, each task is a consumer thread that receives data from the partitions of a topic. You can use the tasks.max parameter to control the maximum number of tasks and thereby the maximum number of threads. However, the number of tasks that are actually initiated is related to the number of topic partitions. For example, if you have ten partitions and the value of the tasks.max parameter is 5, each task will receive data from two partitions and keep track of the offsets of two partitions.

Note that if the value of the tasks.max parameter is larger than the number of partitions, the number of tasks that Kafka Connect initiates is equal to the number of partitions. The number of tasks is not related to the number of topics.

Procedure

  1. Install Kafka.
    1. Download and decompress the Kafka installation package.
      wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
      tar -xzf kafka_2.13-3.2.0.tgz
    2. Edit the .bash_profile file and add the following text:
      export KAFKA_HOME=/home/bding/kafka_2.13-3.2.0
      export PATH=$PATH:$KAFKA_HOME/bin
    3. Reload the bash profile.
      source .bash_profile
  2. Configure Kafka.
    1. Edit the Kafka Connect properties file:
      cd kafka_2.13-3.2.0/config/
      vi connect-standalone.properties
    2. Add the plug-in directory:
      plugin.path=/home/bding/connectors
    3. Edit the log configuration of Kafka Connect:
      vi connect-log4j.properties
    4. Set the logging level for the TDengine Sink Connector to DEBUG:
      log4j.logger.com.taosdata.kafka.connect.sink=DEBUG
      This modification is necessary because these logs are used to calculate the time spent in synchronizing data.
  3. Compile and install the TDengine Sink Connector.
   git clone git@github.com:taosdata/kafka-connect-tdengine.git
   cd kafka-connect-tdengine
   mvn clean package
   unzip -d ~/connectors target/components/packages/taosdata-kafka-connect-tdengine-*.zip
  1. Start the ZooKeeper and Kafka servers.
   zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
   kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
  1. Create a topic.
   kafka-topics.sh --create --topic meters --partitions 1 --bootstrap-server localhost:9092
  1. Generate test data.
    1. Save the following script as gen-data.py:
      #!/usr/bin/python3
      import random
      import sys
      topic = sys.argv[1]
      count = int(sys.argv[2])
      start_ts = 1648432611249000000
      location = ["SanFrancisco", "LosAngeles", "SanDiego"]
      for i in range(count):
      ts = start_ts + i
      row = f"{topic},location={location[i % 3]},groupid=2 current={random.random() * 10},voltage={random.randint(100, 300)},phase={random.random()} {ts}"
      print(row)
    2. Run gen-data.py:
      python3 gen-data.py meters 10000 | kafka-console-producer.sh --broker-list localhost:9092 --topic meters

      The script generates 10,000 data points in InfluxDB line format and adds them to the meters topic. Each data point has two label fields and three data fields.

  2. Start Kafka Connect.
    1. Save the following configuration as sink-test.properties:
      name=TDengineSinkConnector connector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnector
      tasks.max=1
      topics=meters
      connection.url=jdbc:TAOS://127.0.0.1:6030
      connection.user=root
      connection.password=taosdata
      connection.database=power
      db.schemaless=line
      key.converter=org.apache.kafka.connect.storage.StringConverter
      value.converter=org.apache.kafka.connect.storage.StringConverter
    2. Start Kafka Connect:
      connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties sink-test.properties
  3. Use the TDengine CLI to query the meters table in the power database to verify that there are 10,000 data points.
   [bding@vm95 test]$ taos

   Welcome to the TDengine shell from Linux, Client Version:2.6.0.4
   Copyright (c) 2022 by TAOS Data, Inc. All rights reserved.
   taos> select count(*) from power.meters;
          count(*)        |
   ========================
                    10000 |

TDengine Sink Connector performance testing

This performance test is similar to steps 4 through 7 in the previous section. Note the following:

  • The performance test script takes two arguments. The first is the number of partitions, and the second is the number of data points to generate.
  • The tasks.max parameter is set to the number of partitions. This controls the number of threads in each test.
  • The test database must be empty before the test begins.
  • After the test is complete, stop Kafka, ZooKeeper, and Kafka Connect manually.

In each test, data is first written to Kafka and then synchronized by Kafka Connect to TDengine. This ensures that the sink connector handles the entire pressure caused by the synchronization task. The total time required for synchronization is calculated from when the sink connector receives the first batch of data to when it receives the last batch of data.

To run the performance test, save the following script as run-test.sh:

#!/bin/bash
if [ $# -lt 2 ];then
        echo  "Usage: ./run-test.sh <num_of_partitions>  <total_records>"
        exit 0
fi
echo "---------------------------TEST STARTED---------------------------------------"
echo clean data and logs
taos -s "DROP DATABASE IF EXISTS power"
rm -rf /tmp/kafka-logs /tmp/zookeeper
rm -f $KAFKA_HOME/logs/connect.log
np=$1     # number of partitions
total=$2  # number of records
echo number of partitions is $np, number of recordes is $total.
echo start zookeeper
zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
echo start kafka
sleep 3
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
sleep 5
echo create topic
kafka-topics.sh --create --topic meters --partitions $np --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic meters --bootstrap-server localhost:9092
echo generate test data
python3 gen-data.py meters $total  | kafka-console-producer.sh --broker-list localhost:9092 --topic meters

echo alter connector configuration setting tasks.max=$np
sed -i  "s/tasks.max=.*/tasks.max=${np}/"  sink-test.properties
echo start kafka connect
connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties sink-test.properties
     
echo -e "\e[1;31m open another console to monitor connect.log. press enter when no more data received.\e[0m"
read
     
echo stop connect
jps | grep ConnectStandalone | awk '{print $1}' | xargs kill
echo stop kafka server
kafka-server-stop.sh
echo stop zookeeper
zookeeper-server-stop.sh
# extract timestamps of receiving the first batch of data and the last batch of data
grep "records" $KAFKA_HOME/logs/connect.log  | grep meters- > tmp.log
start_time=`cat tmp.log | grep -Eo "[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}" | head -1`
stop_time=`cat tmp.log | grep -Eo "[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}" | tail -1`
    
echo "--------------------------TEST FINISHED------------------------------------"
echo "| records | partitions | start time | stop time |"
echo "|---------|------------|------------|-----------|"
echo "| $total | $np | $start_time | $stop_time |"

You can then run the script and specify a number of partitions and data points. As an example, run the following command for a performance test with one partition and 1 million data points:

./run-test.sh 1 1000000

The test process is shown in the following figure.

Note that you must monitor the connect.log file in a separate console and press Enter once all data has been consumed. You can use the tail -f connect.log command to monitor progress:

[bding@vm95 ~]$ cd kafka_2.13-3.2.0/logs/
[bding@vm95 logs]$ tail -f connect.log
[2022-06-21 17:39:00,176] DEBUG [TDengineSinkConnector|task-0] Received 500 records. First record kafka coordinates:(meters-0-314496). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101)
[2022-06-21 17:39:00,180] DEBUG [TDengineSinkConnector|task-0] Received 500 records. First record kafka coordinates:(meters-0-314996). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101)

When new entries are no longer being written to the log file, this indicates that the data consumption has been completed.

The following table shows the data write performance determined by this test.

1 thread3 threads5 threads10 threads
1 million105,219232,937333,000489,956
3 million107,650239,330363,240573,175
5 million108,321246,573364,087580,720
10 million107,803248,855372,912562,936
15 million106,651249,671377,283541,927
20 million103,626244,921371,402557,460
Table 1. Average records written per second

The data points indicate the average number of entries written per second for configurations with one, three, five, and ten threads.

Conclusion

From the preceding figure, we can see that given the same size data set, write speed increases with the number of threads.

  • With a single thread, 100,000 data points are written per second.
  • With five threads, the speed increases to 350,000 data points per second
  • With ten threads, the speed increases to 550,000 data points per second.

The write speed is relatively stable and is not clearly associated with the total size of the data set.

However, it can be seen that the performance improvement per thread declines as the number of threads increases. Going from one to ten threads only increases speed by a factor of five. This may be caused by uneven distribution of data among partitions. Some tasks take longer to complete than others, and this shift increases with the size of the data set.

As an example, if the data set is created with 10 million data points spread across 10 partitions, the distribution of data points per partition is as follows:

[bding@vm95 kafka-logs]$ du -h ./ -d 1
125M    ./meters-8
149M    ./meters-7
119M    ./meters-9
138M    ./meters-4
110M    ./meters-3
158M    ./meters-6
131M    ./meters-5
105M    ./meters-0
113M    ./meters-2
99M     ./meters-1

Another factor influencing multithreaded write speed is out-of-order data. Because this test allocates data randomly across partitions, only a single-partition configuration can result in strictly ordered data, which provides the highest performance. As the number of threads is raised, the degree to which the data is out of order increases.

For this reason, it is recommended that all data contained within a subtable is stored in the same Kafka partition.

Appendix

  • All source code and test results for this article have been uploaded to GitHub.
  • The test environment used in this article is described in the following table.
JavaOSMemoryCPUHard DiskTDengineKafka
OpenJDK 1.8.0CentOS 7.964 GB16-core i7-10700 (x86-64, 2.90 GHz)HDD2.6.0.43.2