User-Defined Functions (UDF)
Introduction to UDF
In certain application scenarios, the query functions required by the application logic cannot be directly implemented using built-in functions. TDengine allows the writing of User-Defined Functions (UDF) to address the specific needs in such scenarios. Once the UDF is successfully registered in the cluster, it can be called in SQL just like system-built-in functions, with no difference in usage. UDFs are divided into scalar functions and aggregate functions. Scalar functions output a value for each row of data, such as calculating the absolute value (abs), sine function (sin), string concatenation function (concat), etc. Aggregate functions output a value for multiple rows of data, such as calculating the average (avg) or maximum value (max).
TDengine supports writing UDFs in both C and Python programming languages. UDFs written in C have performance almost identical to built-in functions, while those written in Python can leverage the rich Python computation libraries. To prevent exceptions during UDF execution from affecting database services, TDengine uses process separation technology to execute UDFs in another process. Even if a user-defined UDF crashes, it will not affect the normal operation of TDengine.
Developing UDFs in C
When implementing UDFs in C, it is necessary to implement the specified interface functions:
- Scalar functions need to implement the scalar interface function
scalarfn
. - Aggregate functions need to implement the aggregate interface functions
aggfn_start
,aggfn
,aggfn_finish
. - If initialization is required, implement
udf_init
. - If cleanup is required, implement
udf_destroy
.
Interface Definition
The name of the interface function is the UDF name or a combination of the UDF name and specific suffixes (_start, _finish, _init, _destroy). The function names described later, such as scalarfn
and aggfn
, need to be replaced with the UDF name.
Scalar Function Interface
A scalar function is a function that converts input data to output data, typically used for calculations and transformations on a single data value. The prototype for the scalar function interface is as follows.
int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn);
The main parameter descriptions are as follows:
inputDataBlock
: The input data block.resultColumn
: The output column.
Aggregate Function Interface
An aggregate function is a special function used to group and calculate data to generate summary information. The workings of an aggregate function are as follows:
- Initialize the result buffer: First, call the
aggfn_start
function to generate a result buffer (result buffer) for storing intermediate results. - Group data: Relevant data will be divided into multiple row data blocks, each containing a set of data with the same grouping key.
- Update intermediate results: For each data block, call the
aggfn
function to update the intermediate results. Theaggfn
function will compute the data according to the type of aggregate function (such as sum, avg, count, etc.) and store the calculation results in the result buffer. - Generate final results: After updating the intermediate results of all data blocks, call the
aggfn_finish
function to extract the final result from the result buffer. The final result will contain either 0 or 1 piece of data, depending on the type of aggregate function and the input data.
The prototype for the aggregate function interface is as follows.
int32_t aggfn_start(SUdfInterBuf *interBuf);
int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf);
int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result);
Where aggfn
is a placeholder for the function name. First, call aggfn_start
to generate the result buffer, then the relevant data will be divided into multiple row data blocks, and the aggfn
function will be called for each data block to update the intermediate results. Finally, call aggfn_finish
to produce the final result from the intermediate results, which can only contain 0 or 1 result data.
The main parameter descriptions are as follows:
interBuf
: The intermediate result buffer.inputBlock
: The input data block.newInterBuf
: The new intermediate result buffer.result
: The final result.
Initialization and Destruction Interfaces
The initialization and destruction interfaces are shared by both scalar and aggregate functions, with the relevant APIs as follows.
int32_t udf_init()
int32_t udf_destroy()
The udf_init
function performs initialization, while the udf_destroy
function handles cleanup. If there is no initialization work, there is no need to define the udf_init
function; if there is no cleanup work, there is no need to define the udf_destroy
function.
Scalar Function Template
The template for developing scalar functions in C is as follows.
#include "taos.h"
#include "taoserror.h"
#include "taosudf.h"
// Initialization function.
// If no initialization, we can skip definition of it.
// The initialization function shall be concatenation of the udf name and _init suffix.
// @return error number defined in taoserror.h
int32_t scalarfn_init() {
// initialization.
return TSDB_CODE_SUCCESS;
}
// Scalar function main computation function.
// @param inputDataBlock, input data block composed of multiple columns with each column defined by SUdfColumn
// @param resultColumn, output column
// @return error number defined in taoserror.h
int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn* resultColumn) {
// read data from inputDataBlock and process, then output to resultColumn.
return TSDB_CODE_SUCCESS;
}
// Cleanup function.
// If no cleanup related processing, we can skip definition of it.
// The destroy function shall be concatenation of the udf name and _destroy suffix.
// @return error number defined in taoserror.h
int32_t scalarfn_destroy() {
// clean up
return TSDB_CODE_SUCCESS;
}
Aggregate Function Template
The template for developing aggregate functions in C is as follows.
#include "taos.h"
#include "taoserror.h"
#include "taosudf.h"
// Initialization function.
// If no initialization, we can skip definition of it.
// The initialization function shall be concatenation of the udf name and _init suffix.
// @return error number defined in taoserror.h
int32_t aggfn_init() {
// initialization.
return TSDB_CODE_SUCCESS;
}
// Aggregate start function.
// The intermediate value or the state(@interBuf) is initialized in this function.
// The function name shall be concatenation of udf name and _start suffix.
// @param interbuf intermediate value to initialize
// @return error number defined in taoserror.h
int32_t aggfn_start(SUdfInterBuf* interBuf) {
// initialize intermediate value in interBuf
return TSDB_CODE_SUCCESS;
}
// Aggregate reduce function.
// This function aggregate old state(@interbuf) and one data bock(inputBlock) and output a new state(@newInterBuf).
// @param inputBlock input data block
// @param interBuf old state
// @param newInterBuf new state
// @return error number defined in taoserror.h
int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
// read from inputBlock and interBuf and output to newInterBuf
return TSDB_CODE_SUCCESS;
}
// Aggregate function finish function.
// This function transforms the intermediate value(@interBuf) into the final output(@result).
// The function name must be concatenation of aggfn and _finish suffix.
// @interBuf : intermediate value
// @result: final result
// @return error number defined in taoserror.h
int32_t int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result) {
// read data from inputDataBlock and process, then output to result
return TSDB_CODE_SUCCESS;
}
// Cleanup function.
// If no cleanup related processing, we can skip definition of it.
// The destroy function shall be concatenation of the udf name and _destroy suffix.
// @return error number defined in taoserror.h
int32_t aggfn_destroy() {
// clean up
return TSDB_CODE_SUCCESS;
}
Compilation
In TDengine, to implement UDFs, you need to write C source code and compile it into a dynamic link library file according to TDengine's specifications. Following the previously described rules, prepare the source code for the UDF bit_and.c
. For the Linux operating system, execute the following command to compile and obtain the dynamic link library file.
gcc -g -O0 -fPIC -shared bit_and.c -o libbitand.so
To ensure reliable operation, it is recommended to use GCC version 7.5 or above.
C UDF Data Structures
typedef struct SUdfColumnMeta {
int16_t type;
int32_t bytes;
uint8_t precision;
uint8_t scale;
} SUdfColumnMeta;
typedef struct SUdfColumnData {
int32_t numOfRows;
int32_t rowsAlloc;
union {
struct {
int32_t nullBitmapLen;
char *nullBitmap;
int32_t dataLen;
char *data;
} fixLenCol;
struct {
int32_t varOffsetsLen;
int32_t *varOffsets;
int32_t payloadLen;
char *payload;
int32_t payloadAllocLen;
} varLenCol;
};
} SUdfColumnData;
typedef struct SUdfColumn {
SUdfColumnMeta colMeta;
bool hasNull;
SUdfColumnData colData;
} SUdfColumn;
typedef struct SUdfDataBlock {
int32_t numOfRows;
int32_t numOfCols;
SUdfColumn **udfCols;
} SUdfDataBlock;
typedef struct SUdfInterBuf {
int32_t bufLen;
char *buf;
int8_t numOfResult; //zero or one
} SUdfInterBuf;
The structure descriptions are as follows:
SUdfDataBlock
contains the number of rowsnumOfRows
and the number of columnsnumCols
.udfCols[i]
(0 <= i <=numCols
-1) represents each column of data, of typeSUdfColumn*
.SUdfColumn
contains the data type definitioncolMeta
and the datacolData
.- The members of
SUdfColumnMeta
are defined similarly to the data type definitions intaos.h
. SUdfColumnData
can be of variable length, withvarLenCol
defining variable length data andfixLenCol
defining fixed length data.SUdfInterBuf
defines the intermediate structure buffer and the number of results in the buffernumOfResult
.
To better operate on the above data structures, some utility functions are provided, defined in taosudf.h
.
C UDF Example Code
Scalar Function Example bit_and
bit_add
implements the bitwise AND function for multiple columns. If there is only one column, it returns that column. bit_add
ignores null values.
bit_and.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosudf.h"
DLL_EXPORT int32_t bit_and_init() { return 0; }
DLL_EXPORT int32_t bit_and_destroy() { return 0; }
DLL_EXPORT int32_t bit_and(SUdfDataBlock* block, SUdfColumn* resultCol) {
udfTrace("block:%p, processing begins, rows:%d cols:%d", block, block->numOfRows, block->numOfCols);
if (block->numOfCols < 2) {
udfError("block:%p, cols:%d needs to be greater than 2", block, block->numOfCols);
return TSDB_CODE_UDF_INVALID_INPUT;
}
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn* col = block->udfCols[i];
if (col->colMeta.type != TSDB_DATA_TYPE_INT) {
udfError("block:%p, col:%d type:%d should be int(%d)", block, i, col->colMeta.type, TSDB_DATA_TYPE_INT);
return TSDB_CODE_UDF_INVALID_INPUT;
}
}
SUdfColumnData* resultData = &resultCol->colData;
for (int32_t i = 0; i < block->numOfRows; ++i) {
if (udfColDataIsNull(block->udfCols[0], i)) {
udfColDataSetNull(resultCol, i);
udfTrace("block:%p, row:%d result is null since col:0 is null", block, i);
continue;
}
int32_t result = *(int32_t*)udfColDataGetData(block->udfCols[0], i);
udfTrace("block:%p, row:%d col:0 data:%d", block, i, result);
int32_t j = 1;
for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
udfColDataSetNull(resultCol, i);
udfTrace("block:%p, row:%d result is null since col:%d is null", block, i, j);
break;
}
char* colData = udfColDataGetData(block->udfCols[j], i);
result &= *(int32_t*)colData;
udfTrace("block:%p, row:%d col:%d data:%d", block, i, j, *(int32_t*)colData);
}
if (j == block->numOfCols) {
udfColDataSet(resultCol, i, (char*)&result, false);
udfTrace("block:%p, row:%d result is %d", block, i, result);
}
}
resultData->numOfRows = block->numOfRows;
udfTrace("block:%p, processing completed", block);
return TSDB_CODE_SUCCESS;
}
Aggregate Function Example 1 Return Value as Numeric Type l2norm
l2norm
implements the second-order norm of all data in the input column, which means squaring each data point, summing them, and then taking the square root.
l2norm.c
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosudf.h"
DLL_EXPORT int32_t l2norm_init() { return 0; }
DLL_EXPORT int32_t l2norm_destroy() { return 0; }
DLL_EXPORT int32_t l2norm_start(SUdfInterBuf* buf) {
int32_t bufLen = sizeof(double);
if (buf->bufLen < bufLen) {
udfError("failed to execute udf since input buflen:%d < %d", buf->bufLen, bufLen);
return TSDB_CODE_UDF_INVALID_BUFSIZE;
}
udfTrace("start aggregation, buflen:%d used:%d", buf->bufLen, bufLen);
*(int64_t*)(buf->buf) = 0;
buf->bufLen = bufLen;
buf->numOfResult = 0;
return 0;
}
DLL_EXPORT int32_t l2norm(SUdfDataBlock* block, SUdfInterBuf* interBuf, SUdfInterBuf* newInterBuf) {
udfTrace("block:%p, processing begins, cols:%d rows:%d", block, block->numOfCols, block->numOfRows);
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn* col = block->udfCols[i];
if (col->colMeta.type != TSDB_DATA_TYPE_INT && col->colMeta.type != TSDB_DATA_TYPE_DOUBLE) {
udfError("block:%p, col:%d type:%d should be int(%d) or double(%d)", block, i, col->colMeta.type,
TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_DOUBLE);
return TSDB_CODE_UDF_INVALID_INPUT;
}
}
double sumSquares = *(double*)interBuf->buf;
int8_t numNotNull = 0;
for (int32_t i = 0; i < block->numOfCols; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++j) {
SUdfColumn* col = block->udfCols[i];
if (udfColDataIsNull(col, j)) {
udfTrace("block:%p, col:%d row:%d is null", block, i, j);
continue;
}
switch (col->colMeta.type) {
case TSDB_DATA_TYPE_INT: {
char* cell = udfColDataGetData(col, j);
int32_t num = *(int32_t*)cell;
sumSquares += (double)num * num;
udfTrace("block:%p, col:%d row:%d data:%d", block, i, j, num);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
char* cell = udfColDataGetData(col, j);
double num = *(double*)cell;
sumSquares += num * num;
udfTrace("block:%p, col:%d row:%d data:%f", block, i, j, num);
break;
}
default:
break;
}
++numNotNull;
}
udfTrace("block:%p, col:%d result is %f", block, i, sumSquares);
}
*(double*)(newInterBuf->buf) = sumSquares;
newInterBuf->bufLen = sizeof(double);
newInterBuf->numOfResult = 1;
udfTrace("block:%p, result is %f", block, sumSquares);
return 0;
}
DLL_EXPORT int32_t l2norm_finish(SUdfInterBuf* buf, SUdfInterBuf* resultData) {
double sumSquares = *(double*)(buf->buf);
*(double*)(resultData->buf) = sqrt(sumSquares);
resultData->bufLen = sizeof(double);
resultData->numOfResult = 1;
udfTrace("end aggregation, result is %f", *(double*)(resultData->buf));
return 0;
}
Aggregate Function Example 2 Return Value as String Type max_vol
max_vol
finds the maximum voltage from multiple input voltage columns and returns a combined string value composed of device ID + the location (row, column) of the maximum voltage + the maximum voltage value.
Create table:
create table battery(ts timestamp, vol1 float, vol2 float, vol3 float, deviceId varchar(16));
Create custom function:
create aggregate function max_vol as '/root/udf/libmaxvol.so' outputtype binary(64) bufsize 10240 language 'C';
Use custom function:
select max_vol(vol1, vol2, vol3, deviceid) from battery;
max_vol.c
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosudf.h"
#define STR_MAX_LEN 256 // inter buffer length
DLL_EXPORT int32_t max_vol_init() { return 0; }
DLL_EXPORT int32_t max_vol_destroy() { return 0; }
DLL_EXPORT int32_t max_vol_start(SUdfInterBuf *buf) {
int32_t bufLen = sizeof(float) + STR_MAX_LEN;
if (buf->bufLen < bufLen) {
udfError("failed to execute udf since input buflen:%d < %d", buf->bufLen, bufLen);
return TSDB_CODE_UDF_INVALID_BUFSIZE;
}
udfTrace("start aggregation, buflen:%d used:%d", buf->bufLen, bufLen);
memset(buf->buf, 0, sizeof(float) + STR_MAX_LEN);
*((float *)buf->buf) = INT32_MIN;
buf->bufLen = bufLen;
buf->numOfResult = 0;
return 0;
}
DLL_EXPORT int32_t max_vol(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
udfTrace("block:%p, processing begins, cols:%d rows:%d", block, block->numOfCols, block->numOfRows);
float maxValue = *(float *)interBuf->buf;
char strBuff[STR_MAX_LEN] = "inter1buf";
if (block->numOfCols < 2) {
udfError("block:%p, cols:%d needs to be greater than 2", block, block->numOfCols);
return TSDB_CODE_UDF_INVALID_INPUT;
}
// check data type
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn *col = block->udfCols[i];
if (i == block->numOfCols - 1) {
// last column is device id , must varchar
if (col->colMeta.type != TSDB_DATA_TYPE_VARCHAR) {
udfError("block:%p, col:%d type:%d should be varchar(%d)", block, i, col->colMeta.type, TSDB_DATA_TYPE_VARCHAR);
return TSDB_CODE_UDF_INVALID_INPUT;
}
} else {
if (col->colMeta.type != TSDB_DATA_TYPE_FLOAT) {
udfError("block:%p, col:%d type:%d should be float(%d)", block, i, col->colMeta.type, TSDB_DATA_TYPE_FLOAT);
return TSDB_CODE_UDF_INVALID_INPUT;
}
}
}
// calc max voltage
SUdfColumn *lastCol = block->udfCols[block->numOfCols - 1];
for (int32_t i = 0; i < block->numOfCols - 1; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++j) {
SUdfColumn *col = block->udfCols[i];
if (udfColDataIsNull(col, j)) {
udfTrace("block:%p, col:%d row:%d is null", block, i, j);
continue;
}
char *data = udfColDataGetData(col, j);
float voltage = *(float *)data;
if (voltage <= maxValue) {
udfTrace("block:%p, col:%d row:%d data:%f", block, i, j, voltage);
} else {
maxValue = voltage;
char *valData = udfColDataGetData(lastCol, j);
int32_t valDataLen = udfColDataGetDataLen(lastCol, j);
// get device id
char *deviceId = valData + sizeof(uint16_t);
int32_t deviceIdLen = valDataLen < (STR_MAX_LEN - 1) ? valDataLen : (STR_MAX_LEN - 1);
strncpy(strBuff, deviceId, deviceIdLen);
snprintf(strBuff + deviceIdLen, STR_MAX_LEN - deviceIdLen, "_(%d,%d)_%f", j, i, maxValue);
udfTrace("block:%p, col:%d row:%d data:%f, as max_val:%s", block, i, j, voltage, strBuff);
}
}
}
*(float *)newInterBuf->buf = maxValue;
strncpy(newInterBuf->buf + sizeof(float), strBuff, STR_MAX_LEN);
newInterBuf->bufLen = sizeof(float) + strlen(strBuff) + 1;
newInterBuf->numOfResult = 1;
udfTrace("block:%p, result is %s", block, strBuff);
return 0;
}
DLL_EXPORT int32_t max_vol_finish(SUdfInterBuf *buf, SUdfInterBuf *resultData) {
char *str = buf->buf + sizeof(float);
// copy to des
char *des = resultData->buf + sizeof(uint16_t);
strcpy(des, str);
// set binary type len
uint16_t len = strlen(str);
*((uint16_t *)resultData->buf) = len;
// set buf len
resultData->bufLen = len + sizeof(uint16_t);
// set row count
resultData->numOfResult = 1;
udfTrace("end aggregation, result is %s", str);
return 0;
}
Developing UDFs in Python
Preparing the Environment
The specific steps to prepare the environment are as follows:
-
Step 1: Prepare the Python runtime environment.
-
Step 2: Install the Python package
taospyudf
. The command is as follows.pip3 install taospyudf
-
Step 3: Execute the command
ldconfig
. -
Step 4: Start the
taosd
service.
During installation, C++ source code will be compiled, so the system must have cmake
and gcc
. The compiled file libtaospyudf.so
will be automatically copied to the /usr/local/lib/
directory, so if you are a non-root user, you need to add sudo
during installation. After installation, you can check if this file exists in the directory:
root@slave11 ~/udf $ ls -l /usr/local/lib/libtaos*
-rw-r--r-- 1 root root 671344 May 24 22:54 /usr/local/lib/libtaospyudf.so
Interface Definition
When developing UDFs using Python, you need to implement the specified interface functions. The specific requirements are as follows.
- Scalar functions need to implement the scalar interface function
process
. - Aggregate functions need to implement the aggregate interface functions
start
,reduce
, andfinish
. - If initialization is required, implement the
init
function. - If cleanup is required, implement the
destroy
function.
Scalar Function Interface
The interface for scalar functions is as follows.
def process(input: datablock) -> tuple[output_type]:
The main parameter description is as follows:
input
:datablock
similar to a two-dimensional matrix, which reads the Python object located at rowrow
and columncol
through the member methoddata(row, col)
.- The return value is a tuple of Python objects, with each element of the output type.
Aggregate Function Interface
The interface for aggregate functions is as follows.
def start() -> bytes:
def reduce(inputs: datablock, buf: bytes) -> bytes
def finish(buf: bytes) -> output_type:
The above code defines three functions used to implement a custom aggregate function. The specific process is as follows.
First, call the start
function to generate the initial result buffer. This result buffer is used to store the internal state of the aggregate function and will be continuously updated as the input data is processed.
Then, the input data will be divided into multiple row data blocks. For each row data block, the reduce
function will be called, passing the current row data block (inputs
) and the current intermediate result (buf
) as parameters. The reduce
function will update the internal state of the aggregate function based on the input data and current state, returning the new intermediate result.
Finally, when all row data blocks are processed, the finish
function will be called. This function receives the final intermediate result (buf
) as a parameter and generates the final output from it. Due to the nature of aggregate functions, the final output can only contain 0 or 1 piece of data. This output result will be returned to the caller as the computation result of the aggregate function.
Initialization and Destruction Interfaces
The initialization and destruction interfaces are as follows.
def init()
def destroy()
Parameter descriptions:
init
: Completes initialization work.destroy
: Completes cleanup work.
When developing UDFs in Python, it is necessary to define the init
and destroy
functions.
Scalar Function Template
The template for developing scalar functions in Python is as follows.
def init():
# initialization
def destroy():
# destroy
def process(input: datablock) -> tuple[output_type]:
Aggregate Function Template
The template for developing aggregate functions in Python is as follows.
def init():
#initialization
def destroy():
#destroy
def start() -> bytes:
#return serialize(init_state)
def reduce(inputs: datablock, buf: bytes) -> bytes
# deserialize buf to state
# reduce the inputs and state into new_state.
# use inputs.data(i, j) to access python object of location(i, j)
# serialize new_state into new_state_bytes
return new_state_bytes
def finish(buf: bytes) -> output_type:
#return obj of type outputtype
Data Type Mapping
The following table describes the mapping between TDengine SQL data types and Python data types. Any type of NULL value is mapped to Python's None
value.
TDengine SQL Data Type | Python Data Type |
---|---|
TINYINT / SMALLINT / INT / BIGINT | int |
TINYINT UNSIGNED / SMALLINT UNSIGNED / INT UNSIGNED / BIGINT UNSIGNED | int |
FLOAT / DOUBLE | float |
BOOL | bool |
BINARY / VARCHAR / NCHAR | bytes |
TIMESTAMP | int |
JSON and other types | Not supported |
Development Examples
This article contains five example programs, progressing from simple to complex, and also includes a wealth of practical debugging tips.
Logging cannot be output through the print
function within UDFs; you need to write to files yourself or use Python's built-in logging library to write to files.
Example One
Write a UDF function that only accepts a single integer: input n
, output ln(n^2 + 1)
.
First, write a Python file located in a certain system directory, such as /root/udf/myfun.py
, with the following content.
from math import log
def init():
pass
def destroy():
pass
def process(block):
rows, _ = block.shape()
return [log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
This file contains three functions: init
and destroy
are both empty functions; they are the lifecycle functions of the UDF and need to be defined even if they do nothing. The key function is process
, which accepts a data block; this data block object has two methods.
shape()
returns the number of rows and columns in the data block.data(i, j)
returns the data located at rowi
and columnj
.
The process
method of the scalar function needs to return as many rows of data as the number of rows in the input data block. The above code ignores the number of columns because it only needs to calculate the first column of each row.
Next, create the corresponding UDF function by executing the following statement in the TDengine CLI.
create function myfun as '/root/udf/myfun.py' outputtype double language 'Python'
The output is as follows.
taos> create function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
Create OK, 0 row(s) affected (0.005202s)
It seems to go smoothly. Next, check all custom functions in the system to confirm that the creation was successful.
taos> show functions;
name |
=================================
myfun |
Query OK, 1 row(s) in set (0.005767s)
Generate test data by executing the following commands in the TDengine CLI.
create database test;
create table t(ts timestamp, v1 int, v2 int, v3 int);
insert into t values('2023-05-01 12:13:14', 1, 2, 3);
insert into t values('2023-05-03 08:09:10', 2, 3, 4);
insert into t values('2023-05-10 07:06:05', 3, 4, 5);
Test the myfun
function.
taos> select myfun(v1, v2) from t;
DB error: udf function execution failure (0.011088s)
Unfortunately, the execution failed. What is the reason? Check the logs of the udfd
process.
tail -10 /var/log/taos/udfd.log
The following error message is found.
05/24 22:46:28.733545 01665799 UDF ERROR can not load library libtaospyudf.so. error: operation not permitted
05/24 22:46:28.733561 01665799 UDF ERROR can not load python plugin. lib path libtaospyudf.so
The error is clear: the Python plugin libtaospyudf.so
could not be loaded. If you encounter this error, please refer to the preparation environment section above.
After fixing the environment error, execute the command again as follows.
taos> select myfun(v1) from t;
myfun(v1) |
============================
0.693147181 |
1.609437912 |
2.302585093 |
Thus, we have completed the first UDF and learned some simple debugging methods.
Example Two
Although the above myfun
passed the test, it has two shortcomings.
- This scalar function only accepts one column of data as input; if the user passes in multiple columns, it will not raise an exception.
taos> select myfun(v1, v2) from t;
myfun(v1, v2) |
============================
0.693147181 |
1.609437912 |
2.302585093 |
- It does not handle null values. We expect that if there are null values in the input, it will raise an exception and terminate execution. Therefore, the
process
function is improved as follows.
def process(block):
rows, cols = block.shape()
if cols > 1:
raise Exception(f"require 1 parameter but given {cols}")
return [ None if block.data(i, 0) is None else log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
Execute the following statement to update the existing UDF.
create or replace function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
Passing two parameters to myfun
will now cause it to fail.
taos> select myfun(v1, v2) from t;
DB error: udf function execution failure (0.014643s)
The custom exception message is printed in the plugin's log file /var/log/taos/taospyudf.log
.
2023-05-24 23:21:06.790 ERROR [1666188] [doPyUdfScalarProc@507] call pyUdfScalar proc function. context 0x7faade26d180. error: Exception: require 1 parameter but given 2
At:
/var/lib/taos//.udf/myfun_3_1884e1281d9.py(12): process
Thus, we have learned how to update the UDF and check the error logs produced by the UDF.
(Note: If the UDF does not take effect after being updated, in versions of TDengine prior to 3.0.5.0, it is necessary to restart taosd
; in versions 3.0.5.0 and later, there is no need to restart taosd
for it to take effect.)
Example Three
Input (x1, x2, ..., xn), output the sum of each value multiplied by its index: 1 * x1 + 2 * x2 + ... + n * xn
. If x1 to xn contains null, the result is null.
The difference from Example One is that this can accept any number of columns as input and needs to process the values of each column. Write the UDF file /root/udf/nsum.py
.
def init():
pass
def destroy():
pass
def process(block):
rows, cols = block.shape()
result = []
for i in range(rows):
total = 0
for j in range(cols):
v = block.data(i, j)
if v is None:
total = None
break
total += (j + 1) * block.data(i, j)
result.append(total)
return result
Create the UDF.
create function nsum as '/root/udf/nsum.py' outputtype double language 'Python';
Test the UDF.
taos> insert into t values('2023-05-25 09:09:15', 6, null, 8);
Insert OK, 1 row(s) affected (0.003675s)
taos> select ts, v1, v2, v3, nsum(v1, v2, v3) from t;
ts | v1 | v2 | v3 | nsum(v1, v2, v3) |
================================================================================================
2023-05-01 12:13:14.000 | 1 | 2 | 3 | 14.000000000 |
2023-05-03 08:09:10.000 | 2 | 3 | 4 | 20.000000000 |
2023-05-10 07:06:05.000 | 3 | 4 | 5 | 26.000000000 |
2023-05-25 09:09:15.000 | 6 | NULL | 8 | NULL |
Query OK, 4 row(s) in set (0.010653s)
Example Four
Write a UDF that takes a timestamp as input and outputs the next Sunday closest to that time. For example, if today is 2023-05-25, the next Sunday would be 2023-05-28. This function will use the third-party library moment
. First, install this library.
pip3 install moment
Then, write the UDF file /root/udf/nextsunday.py
.
import moment
def init():
pass
def destroy():
pass
def process(block):
rows, cols = block.shape()
if cols > 1:
raise Exception("require only 1 parameter")
if not type(block.data(0, 0)) is int:
raise Exception("type error")
return [moment.unix(block.data(i, 0)).replace(weekday=7).format('YYYY-MM-DD')
for i in range(rows)]
The UDF framework will map TDengine's timestamp type to Python's int type, so this function only accepts an integer representing milliseconds. The process
method first performs parameter checks, then uses the moment
package to replace the weekday of the time with Sunday, and finally formats the output. The output string has a fixed length of 10 characters, so the UDF function can be created as follows.
create function nextsunday as '/root/udf/nextsunday.py' outputtype binary(10) language 'Python';
At this point, test the function; if you started taosd
using systemctl
, you will definitely encounter an error.
taos> select ts, nextsunday(ts) from t;
DB error: udf function execution failure (1.123615s)
tail -20 taospyudf.log
2023-05-25 11:42:34.541 ERROR [1679419] [PyUdf::PyUdf@217] py udf load module failure. error ModuleNotFoundError: No module named 'moment'
This is because the location of "moment" is not in the default library search path of the Python UDF plugin. How can we confirm this? By searching taospyudf.log
with the following command.
grep 'sys path' taospyudf.log | tail -1
The output is as follows:
2023-05-25 10:58:48.554 INFO [1679419] [doPyOpen@592] python sys path: ['', '/lib/python38.zip', '/lib/python3.8', '/lib/python3.8/lib-dynload', '/lib/python3/dist-packages', '/var/lib/taos//.udf']
It shows that the default search path for third-party libraries in the Python UDF plugin is: /lib/python3/dist-packages
, while moment
is installed by default in /usr/local/lib/python3.8/dist-packages
. Now, we will modify the default library search path for the Python UDF plugin.
First, open the Python 3 command line and check the current sys.path
.
>>> import sys
>>> ":".join(sys.path)
'/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages'
Copy the output string from the above script, then edit /var/taos/taos.cfg
to add the following configuration.
UdfdLdLibPath /usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages
After saving, execute systemctl restart taosd
, and then test again without errors.
taos> select ts, nextsunday(ts) from t;
ts | nextsunday(ts) |
===========================================
2023-05-01 12:13:14.000 | 2023-05-07 |
2023-05-03 08:09:10.000 | 2023-05-07 |
2023-05-10 07:06:05.000 | 2023-05-14 |
2023-05-25 09:09:15.000 | 2023-05-28 |
Query OK, 4 row(s) in set (1.011474s)
Example Five
Write an aggregate function to calculate the difference between the maximum and minimum values of a certain column.
The difference between aggregate functions and scalar functions is that scalar functions correspond to multiple outputs for multiple rows of input, while aggregate functions correspond to a single output for multiple rows of input. The execution process of aggregate functions is somewhat like the execution process of the classic map-reduce framework, which divides the data into several blocks, with each mapper processing one block, and the reducer aggregating the results from the mappers. The difference is that in TDengine's Python UDF, the reduce
function has both map and reduce functionalities. The reduce
function takes two parameters: one is the data to be processed, and the other is the result of the reduce function executed by another task. The following example demonstrates this in /root/udf/myspread.py
.
import io
import math
import pickle
LOG_FILE: io.TextIOBase = None
def init():
global LOG_FILE
LOG_FILE = open("/var/log/taos/spread.log", "wt")
log("init function myspead success")
def log(o):
LOG_FILE.write(str(o) + '\n')
def destroy():
log("close log file: spread.log")
LOG_FILE.close()
def start():
return pickle.dumps((-math.inf, math.inf))
def reduce(block, buf):
max_number, min_number = pickle.loads(buf)
log(f"initial max_number={max_number}, min_number={min_number}")
rows, _ = block.shape()
for i in range(rows):
v = block.data(i, 0)
if v > max_number:
log(f"max_number={v}")
max_number = v
if v < min_number:
log(f"min_number={v}")
min_number = v
return pickle.dumps((max_number, min_number))
def finish(buf):
max_number, min_number = pickle.loads(buf)
return max_number - min_number
In this example, we not only define an aggregate function but also add logging functionality to record execution logs.
- The
init
function opens a file for logging. - The
log
function records logs, automatically converting the passed object to a string and adding a newline character. - The
destroy
function closes the log file after execution. - The
start
function returns the initial buffer for storing intermediate results of the aggregate function, initializing the maximum value to negative infinity and the minimum value to positive infinity. - The
reduce
function processes each data block and aggregates the results. - The
finish
function converts the buffer into the final output.
Execute the following SQL statement to create the corresponding UDF.
create or replace aggregate function myspread as '/root/udf/myspread.py' outputtype double bufsize 128 language 'Python';
This SQL statement has two important differences from the SQL statement for creating scalar functions.
- The
aggregate
keyword has been added. - The
bufsize
keyword has been added to specify the memory size for storing intermediate results, in bytes. This value can be greater than the actual size used. In this case, the intermediate result consists of a tuple of two floating-point numbers, and after serialization, it occupies only 32 bytes. However, the specifiedbufsize
is 128, which can be printed using Python to show the actual number of bytes used.
>>> len(pickle.dumps((12345.6789, 23456789.9877)))
32
Test this function, and you will see that the output of myspread
is consistent with the output of the built-in spread
function.
taos> select myspread(v1) from t;
myspread(v1) |
============================
5.000000000 |
Query OK, 1 row(s) in set (0.013486s)
taos> select spread(v1) from t;
spread(v1) |
============================
5.000000000 |
Query OK, 1 row(s) in set (0.005501s)
Finally, check the execution log, and you will see that the reduce
function was executed three times, with the max
value being updated four times and the min
value being updated once.
root@slave11 /var/log/taos $ cat spread.log
init function myspead success
initial max_number=-inf, min_number=inf
max_number=1
min_number=1
initial max_number=1, min_number=1
max_number=2
max_number=3
initial max_number=3, min_number=1
max_number=6
close log file: spread.log
Through this example, we learned how to define aggregate functions and print custom log information.
More Python UDF Example Code
Scalar Function Example pybitand
pybitand
implements the bitwise AND function for multiple columns. If there is only one column, it returns that column. pybitand
ignores null values.
pybitand.py
def init():
pass
def process(block):
(rows, cols) = block.shape()
result = []
for i in range(rows):
r = 2 ** 32 - 1
for j in range(cols):
cell = block.data(i,j)
if cell is None:
result.append(None)
break
else:
r = r & cell
else:
result.append(r)
return result
def destroy():
pass
Aggregate Function Example pyl2norm
pyl2norm
implements the second-order norm of all data in the input column, which means squaring each data point, summing them, and then taking the square root.
pyl2norm.py
import json
import math
def init():
pass
def destroy():
pass
def start():
return json.dumps(0.0).encode('utf-8')
def finish(buf):
sum_squares = json.loads(buf)
result = math.sqrt(sum_squares)
return result
def reduce(datablock, buf):
(rows, cols) = datablock.shape()
sum_squares = json.loads(buf)
for i in range(rows):
for j in range(cols):
cell = datablock.data(i,j)
if cell is not None:
sum_squares += cell * cell
return json.dumps(sum_squares).encode('utf-8')
Aggregate Function Example pycumsum
pycumsum
calculates the cumulative sum of all data in the input column using numpy
.
pycumsum.py
import pickle
import numpy as np
def init():
pass
def destroy():
pass
def start():
return pickle.dumps(0.0)
def finish(buf):
return pickle.loads(buf)
def reduce(datablock, buf):
(rows, cols) = datablock.shape()
state = pickle.loads(buf)
row = []
for i in range(rows):
for j in range(cols):
cell = datablock.data(i, j)
if cell is not None:
row.append(datablock.data(i, j))
if len(row) > 1:
new_state = np.cumsum(row)[-1]
else:
new_state = state
return pickle.dumps(new_state)
Managing UDFs
The process of managing UDFs in the cluster involves creating, using, and maintaining these functions. Users can create and manage UDFs in the cluster via SQL, and once created, all users in the cluster can use these functions in SQL. Since UDFs are stored on the mnode of the cluster, they remain available even after the cluster is restarted.
When creating UDFs, it is necessary to distinguish between scalar functions and aggregate functions. Scalar functions accept zero or more input parameters and return a single value. Aggregate functions accept a set of input values and return a single value through some computation (such as summation, counting, etc.). If the wrong function type is declared during creation, an error will occur when calling the function via SQL.
Additionally, users need to ensure that the input data types match the UDF program, and that the UDF output data types match the outputtype
. This means that when creating a UDF, the correct data types must be specified for both input parameters and output values. This helps ensure that when calling the UDF, the input data can be correctly passed to the UDF, and that the UDF's output value matches the expected data type.
Creating Scalar Functions
The SQL syntax for creating scalar functions is as follows.
CREATE [OR REPLACE] FUNCTION function_name AS library_path OUTPUTTYPE output_type LANGUAGE 'Python';
Parameter descriptions are as follows:
or replace
: If the function already exists, it will modify the existing function properties.function_name
: The name of the scalar function when called in SQL.language
: Supports C language and Python language (version 3.7 and above), defaulting to C.library_path
: If the programming language is C, the path is the absolute path to the dynamic link library containing the UDF implementation, usually pointing to a.so
file. If the programming language is Python, the path is the file path containing the UDF implementation in Python. The path needs to be enclosed in single or double quotes.output_type
: The data type name of the function's computation result.
Creating Aggregate Functions
The SQL syntax for creating aggregate functions is as follows.
CREATE [OR REPLACE] AGGREGATE FUNCTION function_name library_path OUTPUTTYPE output_type BUFSIZE buffer_size LANGUAGE 'Python';
Where buffer_size
indicates the buffer size for intermediate calculation results, measured in bytes. The meanings of other parameters are the same as for scalar functions.
The following SQL creates a UDF named l2norm
.
CREATE AGGREGATE FUNCTION l2norm AS "/home/taos/udf_example/libl2norm.so" OUTPUTTYPE DOUBLE bufsize 8;
Deleting UDFs
The SQL syntax for deleting a UDF with the specified name is as follows.
DROP FUNCTION function_name;
Viewing UDFs
The SQL to display all currently available UDFs in the cluster is as follows.
show functions;
Viewing Function Information
The version number of the UDF increases by 1 each time it is updated.
select * from ins_functions \G;