In this blog, we’ll show you some examples of the functions and extensions available in TDengine to perform simple time-series analyses.
Getting Started with TDengine
To get started quickly and follow along, you can register for a free TDengine Cloud account at cloud.tdengine.com. The process takes just a minute and no credit card is required.
After entering the confirmation code, as part of the registration process, make sure you select the checkbox to create a sample database. The sample database is synthetic data from smart meters and has voltage, current, and phase as measurements/metrics and location as a tag/label.
When you first log in to TDengine Cloud, it will walk you through some of the novel concepts in TDengine. Specifically, it will talk about “supertables” and also the idea of “one table per device” in a time-series database (TSDB). At any time, you can click on the TDengine logo in the top left hand corner to bring up the walkthrough of these concepts.
Ingesting Beijing Multi-site Air Quality Data
In addition to data from the sample database, we will also use data from the Beijing Multi-site Air Quality Dataset. Inserting the data from this dataset into TDengine is a good exercise in figuring out the different ways to ingest data into TDengine.
The first step in ingesting this data is, of course, to design a schema to hold this data. The data looks as follows:
"No","year","month","day","hour","PM2.5","PM10","SO2","NO2","CO","O3","TEMP","PRES","DEWP","RAIN","wd","WSPM","station" 1,2013,3,1,0,4,4,4,7,300,77,-0.7,1023,-18.8,0,"NNW",4.4,"Aotizhongxin" 2,2013,3,1,1,8,8,4,7,300,77,-1.1,1023.2,-18.2,0,"N",4.7,"Aotizhongxin" 3,2013,3,1,2,7,7,5,10,300,73,-1.1,1023.5,-18.2,0,"NNW",5.6,"Aotizhongxin" 4,2013,3,1,3,6,6,11,11,300,72,-1.4,1024.5,-19.4,0,"NW",3.1,"Aotizhongxin"
Each station has its own .csv file. We can treat each station as a device, and so we come up with a schema as follows. First we create a database. We then create a supertable with the station as a tag/label.
CREATE DATABASE weather; CREATE STABLE weather.pollution (ts TIMESTAMP, pm25 FLOAT, pm10 FLOAT, so2 FLOAT, no2 FLOAT, co FLOAT, o3 FLOAT, temperature FLOAT, pressure FLOAT, dewp FLOAT, rain FLOAT, winddirection VARCHAR(8), windspeed FLOAT) TAGS (station VARCHAR(64));
ETL Using Python
As with any data, we need to do a little bit of ET (extraction and transformation) before loading the data. It’s really easy to write a Python script to ingest this data using the TDengine Python connector. The simple Python script is shown below. Note that if you copy and paste this script, make sure the tabs are set correctly after the paste. Also note that this is not the most efficient way of ingesting data into your Cloud instance, but we are just trying to demonstrate some concepts here. You can also use this script to transform these data files into files that can be uploaded into TDengine through the taos CLI.
import sys import os import taosrest import fnmatch from dotenv import load_dotenv '''Environment''' url = os.environ["TDENGINE_CLOUD_URL"] token = os.environ["TDENGINE_CLOUD_TOKEN"] ''' SQL statements ''' '''createDatabase = 'create database if not exists weather''' createStable = 'CREATE STABLE if not exists weather.pollution (ts TIMESTAMP, pm2 5 FLOAT, pm10 FLOAT,' createStable += 'so2 FLOAT, no2 FLOAT, co FLOAT, o3 FLOAT,' createStable += 'temperature FLOAT, pressure FLOAT, dewp FLOAT, rain FLOAT, wind direction VARCHAR(8),' createStable += 'windspeed FLOAT) TAGS (station VARCHAR(64))' ''' create connection ''' conn = taosrest.connect(url=url, token=token) conn.execute(createStable) path = "./data" fileList = fnmatch.filter(os.listdir(path), "*.csv") '''inputfilename = sys.argv tablename = sys.argv ''' ''' This counter is just used to increment sub-table name ''' counter=1 totalRows=0 totalLines=0 for eachFile in fileList: tablename='p'+ str(counter) print("Inputfilename is:",eachFile) print("Table name is:", tablename) infile = open(path+'/'+eachFile,'r') ''' skip first header line of each file''' next(infile) for eachline in infile: totalLines += 1 ''' if there is an NA in the line, skip the line ''' if 'NA' in eachline: print('Skipping - '+eachline) else: myfields = eachline.split(',') insertstr = 'insert into weather.'+tablename+' using weather.pollution tags ('+myfields [-1].strip() insertstr += ') values (' ''' next 2 lines create the timestamp from the year/mon/day/hr fields in the files ''' insertstr += '"' + "-".join(myfields[1:4]) insertstr += ' '+ myfields + ':00:00",' insertstr += ",".join(myfields[5:-1]) insertstr += ')' '''outfile.write(insertstr)''' '''print(insertstr)''' af=conn.execute(insertstr) totalRows += af infile.close() '''outfile.close()''' print('Inserted '+str(totalRows)+'so far\n') counter += 1
TDengine SQL Functions
With time-series data with perhaps millions of rows, we are usually interested in downsampling so that we can see data in reasonable time frames. With pollution data, for example, we want to see the exposure on a daily, weekly, or monthly basis. Let’s say we want to see the weekly exposure to PM2.5 – particles that are 2.5 microns or less and are able to travel into the lungs and cause respiratory diseases. An AQI (air quality index) of 0-50 is considered Good, above 150 is considered Unhealthy, above 200 Very Unhealthy, and above 300 is Hazardous by US standards.
We can use the following SQL statement to quickly get this information.
_wstart is a “pseudo-column” and is the start of the downsampled interval, which in this case is a week. We also use the function
AVG, which automatically calculates the averages in the defined interval. Furthermore, we also want to see this by station and not mix the data. If you want to see the exposure by day, you simply change the interval to be “1d” or “1n” for month.
SELECT _wstart, AVG(pm25), station FROM weather.pollution PARTITION BY station INTERVAL (1w);
For exposure, it’s usually better to see a time-weighted average. For this, TDengine provides the
TWA function, which you can use similarly to
SELECT _wstart, TWA(pm25), station FROM weather.pollution PARTITION BY station INTERVAL (1w);
When performing time-series analysis and downsampling, one may have to deal with missing data. In this case, TDengine makes it easy by adding the
FILL clause to the query, and it allows you to choose how to deal with missing values. For example, I can choose in this case to do a linear
FILL, which fills it with the closest non-null value.
You can also use a sliding window to look at the time series. The sliding window slides your interval window forward by the time unit specified. This is particularly useful for stream processing. For example, you can use a sliding window of 1 day, as shown below.
SELECT _wstart, avg(pm25), twa(pm25),station FROM weather.pollution PARTITION BY station INTERVAL(1w) SLIDING (1d);
There are several aggregate functions supported by TDengine.
If I want to see the 10 highest values of PM2.5 in the dataset, I can simply do the following. Note that I am ordering by the PM2.5 value in ascending order.
SELECT ts, TOP(pm25,10) AS high, station FROM weather.pollution ORDER BY high ASC;
DIFF, which returns the difference between the current value and the previous value, is also very useful when setting up a time series as a supervised machine learning dataframe. Note that in this case you would have to select from the individual station, not the supertable. Subtracting the result of
DIFF from the current value gives you the previous value, in case it isn’t clear.
SELECT ts, pm25-DIFF(pm25), pm25 FROM weather.p1;
You can, of course, do the same thing if you are setting up a multivariate series as well. So for example, if you are looking at PM25, CO, NO2 and windspeed, you can do the following:
SELECT ts, pm25-DIFF(pm25), co-DIFF(co), no2-DIFF(no2), windspeed-DIFF(windspeed), pm25,co,no2,windspeed FROM weather.p1;
Using the Python connector, you can get the results of any of the above queries into a Pandas dataframe.
Moving averages are frequently used in time-series analysis. TDengine provides the
MAVG function, which takes the column and also the number of values over which the moving average is calculated. In our case, since the measurement is collected hourly, if we want to see the weekly moving average we can do the following:
SELECT ts, MAVG(pm25,168) FROM weather.p1;
TDengine also provides a
HISTOGRAM function, which we could use to see how many measurements fall into the good, unhealthy, very unhealthy, and hazardous categories. We can look at this on a yearly basis to see whether the air quality is getting better. The
HISTOGRAM function returns a table/grid.
SELECT _wstart, HISTOGRAM(pm25, "user_input","[50,100,200,300,350]",0), station FROM weather.pollution PARTITION BY station INTERVAL(1y);
Simple Analysis on TDengine Sample Database
Let’s shift our attention to the sample database in TDengine Cloud. As we mentioned, this has synthetic data from smart meters in various cities.
To see what the supertable looks like, we can do the following. The name of the database is
meters is the supertable.
As you can see, for each location we have the current, voltage, and phase.
If you wanted to get the hourly energy consumption in kWh, the following simple query would suffice. Note that we use the cosine function,
COS, that is provided by TDengine. We divide by 1000 to get the result in kW. Since this is an hourly sum, we are getting the approximate consumption in kWh. I also constrain the timestamp and exclude the first and last days because there is not enough data there.
SELECT _wstart, SUM(current*voltage*COS(phase))/1000 AS kWh, location FROM test.meters WHERE ts>'2017-07-15' and ts < '2017-07-24' PARTITION BY location INTERVAL (1h);
When I visualize this in Grafana, I get the following.
You can get the difference between the maximum and minimum values of a column by using the
SPREAD function. Note that you can always constrain the time spans by using a
WHERE clause on the timestamp field.
SELECT SPREAD(voltage), SPREAD(current), location FROM test.meters PARTITION BY location;
In addition to these, TDengine also provides functions like
STDDEV (standard deviation),
MODE (the value with the highest frequency), and several other useful functions for basic and easy time-series analysis.
We hope this has been useful. If you have questions you can always visit the TDengine Discord channel.
The following video gives a walkthrough of the information discussed in this article.