Simplified, efficient monitoring and storage of industrial equipment metrics

About the Author

DaLuo is a HeyGears solution architect, focusing on Cloud Native, big data processing system development, and industrial IoT system development.

Introduction

For an industrial IoT or IoT system, the most basic requirement is to fit curves to a series of data points (devices). Take the power curve as an example, alike stock price charts, it will take the last power value reported by the device in each minute as the power of the device in this minute. If the device does not report the power value in a certain minute, the power value reported at the last minute will be counted. Examples are as follows.

Obtaining the curve like:

Pain Points in the Usage Scenarios

Typically, we will write the data reported into Apache Kafka first. If it is an offline computing scenario, we will write data into Hive and then use Spark SQL to read data at a regular interval, afterwards, the results will be written into HBase. When it comes to a real-time computing scenario, we will employ Apache Flink to consume the data from Kafka and write the results into HBase, while in this case we also need to deal with issues like irregular data and delayed messages.

Moreover, based on Hadoop, we have to integrate ZooKeeper, HDFS, Hive, and HBase to implement our requirements, which leads to high O&M costs. In addition, HBase relies on the key-value (KV) to store time-series data, which will occupy storage space to store repetitive KVs.

Except for the pain points mentioned above, we also need to consider requirements such as data growth, data reconciliation (DR), and disaster recovery.

HeyGears provides customers with full-stack solutions based on 3D printing technology, hence we need to continuously monitor the operating status of the equipment and store large volumes of metric data. Therefore, we have conducted research on prevailing time-series databases and selected TDengine as a solution.

According to our research, TDengine is an open-sourced IoT big data platform capable of handling large data volumes with industry-leading high performance and high storage efficiency. It has been well received by the developer community since it was made available under an open-source license two years ago, garnering nearly 16,000 Github stars so far and hundreds of clones on a daily basis.

Referring to the documentation of TDengine, the problems we’ve encountered can be easily solved with its simple SQL-like syntax.

select last(val) a from super_table_xx where ts >= '2021-06-07 18:10:00' and ts <= '2021-06-07 18:20:00' interval(60s) fill(value, 0);

Remarkable STable and Table Design

Why does TDengine present such a powerful efficiency? It lies in the unique design of STable and table. On the one hand, TDengine uses “one table for one data collection point (device)” to fit in the typical IoT scenario where the data is generated by the single device and timestamped at a regular time interval. In the actual usage scenarios, no matter real-time queries or offline data analysis, reading data from a single device is always filtered by a consistent time interval.

On the other hand, to achieve higher management efficiency, TDengine development team invented STable to deal with the same type of data points (devices).

For instance, if we want to store the temperature data and humidity data of the equipment, STable can be created in this way:

create stable if not exists s_device (ts TIMESTAMP,
  temperature double,
  humidity double
) TAGS (device_sn BINARY(1000));

In actual use, for example, the SQL statement to perform ‘INSERT data’ for devices ‘d1’ and ‘d2’ is as follows.

insert into s_device_d1 (ts, temperature, humidity) USING s_device (device_sn) TAGS ('d1') values (1623157875000, 35.34, 80.24);
insert into s_device_d2 (ts, temperature, humidity) USING s_device (device_sn) TAGS ('d2') values (1623157891000, 29.63, 79.48);

The query for data from device ‘d1’ in a certain time interval is like this:

select * from s_device where device_sn = 'd1' and ts > 1623157871000 and ts < 1623157890000 ;

If selecting the average temperature data for the last 7 days by 1-hour interval, the statement can be:

select avg(temperature) temperature from s_device where  device_sn = #{deviceSn} and ts >= #{startTime} and ts < #{endTime}  interval(1h)

TDengine also provides many aggregation functions. Except ‘last’ and ‘fill’ used in the cases above, other commonly used functions such as ‘sum’ and ‘max’ are also included.

In the process of integrating with applications, we use MyBatis, a flexible and easy-to-use ORM framework. For example, for the table ‘s_device’ above, we defined the entity at first:

import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.sql.Timestamp;

/**
 * @author: DaLuo
 * @date: 2021/06/25
 * @description:
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@TableName(value = "s_device")
public class TestSuperDeviceEntity {

    private Timestamp ts;
    private Float temperature;
    private Float humidity;
    @TableField(value = "device_sn")
    private String device_sn ;
}

And then we defined the mapper:

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.hg.device.kafka.tdengine.entity.TestSuperDeviceEntity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;

import java.sql.Timestamp;
import java.util.List;

/**
 * @author: DaLuo
 * @date: 2021/06/25
 * @description:
 */
@Mapper
public interface TestSuperDeviceMapper extends BaseMapper<TestSuperDeviceEntity> {

    /**
     * insert
     * @param entity
     * @return
     */
    @Insert({
            "INSERT INTO 's_device_${entity.deviceSn}' (ts ,temperature, humidity ) ",
            "USING s_device (device_sn) TAGS (#{entity.deviceSn}) ",
            "VALUES (#{entity.ts}, #{entity.temperature}, #{entity.humidity})"
    })
    int insertOne(@Param(value = "entity") TestSuperDeviceEntity entity);


    /**
     * batch writing
     * @param entities
     * @return
     */
    @Insert({
            "<script>",
            "INSERT INTO ",
            "<foreach collection='list' item='item' separator=' '>",
            "'s_device_${item.deviceSn}' (ts ,temperature, humidity) USING s_device (device_sn) TAGS (#{item.deviceSn}) ",
            "VALUES (#{item.ts}, #{item.temperature}, #{item.humidity})",
            "</foreach>",
            "</script>"
    })
    int batchInsert(@Param("list") List<TestSuperDeviceEntity> entities);

    /**
     *  select the average temperature data for the last 7 days by 1-hour interval
     * @param deviceSn
     * @param startTime inclusive
     * @param endTime   exclusive
     * @return
     */
    @Select("select avg(temperature) temperature from s_device where  device_sn = #{deviceSn} and ts >= #{startTime} and ts < #{endTime}  interval(1h)")
    List<TempSevenDaysTemperature> selectSevenDaysTemperature(
            @Param(value = "deviceSn") String deviceSn,
            @Param(value = "startTime") long startTime,
            @Param(value = "endTime") long endTime);

    @AllArgsConstructor
    @NoArgsConstructor
    @Data
    @Builder
    class TempSevenDaysTemperature {
        private Timestamp ts;
        private float temperature;
    }
}

Another amazing feature of TDengine is no need for creating tables in advance. By setting the ‘tag’ as a part of ‘table_name’, it will automatically create tables when inserting data.

Connectors

We can connect to TDengine via TDengine JDBC-driver in two ways — JDBC-JNI and JDBC-RESTful. JDBC-JNI demonstrates higher ingestion performance while it requires to install TDengine client on the server where the application is running.

We have deployed our application in the Kubernetes cluster, while the program is running in Docker. Therefore, we created an image of the application program. Here is the Dockerfile of the base image:

FROM openjdk:8-jdk-oraclelinux7
 
COPY TDengine-client-2.0.16.0-Linux-x64.tar.gz /
 
RUN tar -xzvf /TDengine-client-2.0.16.0-Linux-x64.tar.gz &&  cd /TDengine-client-2.0.16.0 &&  pwd && ls && ./install_client.sh

build:

docker build -t tdengine-openjdk-8-runtime:2.0.16.0 -f Dockerfile .

The Dockerfile shows:

FROM tdengine-openjdk-8-runtime:2.0.16.0
 
ENV JAVA_OPTS="-Duser.timezone=Asia/Shanghai -Djava.security.egd=file:/dev/./urandom"
 
COPY app.jar /app.jar
 
ENTRYPOINT ["java","-jar","/app.jar"]

Thus, the application can be scheduled on any node in Kubernetes.

In addition, our program involves automated scheduling, which requires frequent message transports between upper machines and lower machines via MQTT. For example, the cloud sends command 1000-“start task A”, and then the lower machine responds a command 2000-“received task A”. If regarding the command as a device and the sequence number of the command as its attributes, this kind of data will also be compatible to be stored in TDengine.

*************************** 1.row ***************************
       ts: 2021-06-23 16:10:30.000
      msg: {"task_id":"7b40ed4edc1149f1837179c77d8c3c1f","action":"start"}
device_sn: deviceA
     kind: 1000
*************************** 2.row ***************************
       ts: 2021-06-23 16:10:31.000
      msg: {"task_id":"7b40ed4edc1149f1837179c77d8c3c1f","action":"received"}
device_sn: deviceA
     kind: 2000

Conclusion

In the end, TDengine also has a STable ‘log.dn’ which stores metric data including memory usage, CPU usage, etc., hence we can utilize Grafana to visualize those metrics as a reliable reference for monitoring.