diff --git a/docs/en/07-develop/09-udf.md b/docs/en/07-develop/09-udf.md
index 3b8dacc9d63b..cc9ac4554fb7 100644
--- a/docs/en/07-develop/09-udf.md
+++ b/docs/en/07-develop/09-udf.md
@@ -338,6 +338,78 @@ gcc -g -O0 -fPIC -shared extract_avg.c -o libextract_avg.so
+#### Aggregate Function Example 4: Accumulate All Data Then Compute — Permutation Entropy
+
+Permutation Entropy was proposed by Bandt and Pompe in 2002. It measures the complexity of a time series by analyzing the probability distribution of ordinal patterns and is widely used in fault detection, physiological signal analysis, and related fields.
+
+`perm_entropy` is a **full-accumulation** aggregate function: its algorithm requires all data in the window before computation can begin. Each AGG_PROC call therefore only accumulates data; the entropy is computed in the `perm_entropy_finish` callback. This is fundamentally different from functions like `l2norm` that can be computed incrementally row by row.
+
+This pattern involves **two independent memory layers** whose ownership must be kept clear:
+
+| Memory layer | Owner | Typical variable | Allocated / freed by |
+|---|---|---|---|
+| **Framework container** (fixed size = BUFSIZE) | Framework | `interBuf->buf`, `newInterBuf->buf`, `resultData->buf` | Framework `malloc`s before each callback and frees via `freeUdfInterBuf()` after; UDF may only write into it — **must not replace the pointer** |
+| **UDF heap content** (dynamic size) | UDF | `state->values` (a pointer embedded inside the container) | UDF grows it with `realloc` as needed; `freeUdfInterBuf()` frees only the container and is unaware of inner pointers; UDF **must** free it explicitly in `finish` and on every error path |
+
+Responsibilities of each callback:
+
+- `perm_entropy_start`: zero-initialises a `PermEntropyState` with `memset` into the framework-provided `interBuf->buf`; the `values` pointer is left `NULL` (heap content not yet allocated).
+- `perm_entropy` (AGG_PROC):
+ 1. Value-copy the state from `interBuf->buf` into a stack variable `newState`;
+ 2. If the current batch has valid rows, extend the UDF heap content (`newState.values`) with `realloc` and append the data;
+ 3. `memcpy` `newState` (with the updated `values` pointer) into the framework-preallocated `newInterBuf->buf` — **never** replace `newInterBuf->buf` with a fresh `malloc`, or the BUFSIZE bytes the framework allocated are lost on every AGG_PROC call;
+ 4. On `realloc` failure, free the original UDF heap content via `interBuf->buf` and zero the pointer, because `freeUdfInterBuf()` will not follow the inner `values` pointer.
+- `perm_entropy_finish`: compute the permutation entropy from all accumulated data, **free `state->values`**, and write the result into the framework-preallocated `resultData->buf`.
+
+Create tables:
+
+```sql
+-- Plain table for full-table aggregation and time-window queries
+CREATE TABLE vibration (ts TIMESTAMP, val DOUBLE);
+
+-- Super table for per-subtable grouped queries
+CREATE STABLE vibration_stb (ts TIMESTAMP, val DOUBLE) TAGS (device_id INT);
+CREATE TABLE vibration_d1 USING vibration_stb TAGS (1);
+CREATE TABLE vibration_d2 USING vibration_stb TAGS (2);
+```
+
+Generate `.so` file:
+
+```bash
+gcc -g -O0 -fPIC -shared perm_entropy.c -o libperm_entropy.so -lm
+```
+
+Create custom function:
+
+```sql
+CREATE AGGREGATE FUNCTION perm_entropy
+ AS '/path/to/libperm_entropy.so'
+ OUTPUTTYPE DOUBLE
+ BUFSIZE 256;
+```
+
+Use custom function:
+
+```sql
+-- Full-table aggregation: compute permutation entropy over the entire table
+SELECT perm_entropy(val) FROM vibration;
+
+-- Time-window aggregation: compute permutation entropy independently for each window
+SELECT perm_entropy(val) FROM vibration INTERVAL(10s);
+
+-- Grouped by subtable: compute permutation entropy separately for each device
+SELECT perm_entropy(val) FROM vibration_stb PARTITION BY tbname;
+```
+
+
+perm_entropy.c
+
+```c
+{{#include docs/examples/udf/perm_entropy.c}}
+```
+
+
+
## Developing UDFs in Python Language
### Environment Setup
diff --git a/docs/examples/udf/compile_udf.sh b/docs/examples/udf/compile_udf.sh
index 15067a286d9a..c708f6aae054 100755
--- a/docs/examples/udf/compile_udf.sh
+++ b/docs/examples/udf/compile_udf.sh
@@ -1,11 +1,13 @@
set +e
-rm -rf /tmp/udf/libbitand.so /tmp/udf/libsqrsum.so /tmp/udf/libgpd.so
+rm -rf /tmp/udf/libbitand.so /tmp/udf/libl2norm.so /tmp/udf/libgpd.so /tmp/udf/libperm_entropy.so
mkdir -p /tmp/udf
echo "compile udf bit_and and sqr_sum"
gcc -fPIC -shared cases/12-UDFs/sh/bit_and.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libbitand.so
gcc -fPIC -shared cases/12-UDFs/sh/l2norm.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libl2norm.so
gcc -fPIC -shared cases/12-UDFs/sh/gpd.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libgpd.so
+# perm_entropy: aggregate UDF (accumulate-all-data-then-compute pattern)
+gcc -fPIC -shared docs/examples/udf/perm_entropy.c -I../../include/libs/function/ -I../../include/client -I../../include/util -lm -o /tmp/udf/libperm_entropy.so
echo "debug show /tmp/udf/*.so"
ls /tmp/udf/*.so
diff --git a/docs/examples/udf/perm_entropy.c b/docs/examples/udf/perm_entropy.c
new file mode 100644
index 000000000000..8f93f7d516d3
--- /dev/null
+++ b/docs/examples/udf/perm_entropy.c
@@ -0,0 +1,276 @@
+/*
+ * perm_entropy — TDengine C UDF: aggregate permutation entropy
+ *
+ * Permutation entropy (Bandt & Pompe, PRL 2002) measures the complexity
+ * of a time series by computing the Shannon entropy of the distribution of
+ * ordinal patterns (permutation patterns) found in overlapping embedding
+ * windows of dimension EMBED_DIM. The result is normalised to [0, 1].
+ *
+ * This function exemplifies the "accumulate-all-data-then-compute" pattern,
+ * which is required whenever the algorithm cannot produce a partial result
+ * from a single data chunk:
+ *
+ * perm_entropy_start — initialise state in the framework-provided buffer
+ * perm_entropy — append each delivered chunk to a heap-allocated
+ * values array kept inside the intermediate buffer
+ * perm_entropy_finish — compute the final result from all accumulated
+ * values, release the heap array, write the result
+ *
+ * =========================================================================
+ * Memory management rules for TDengine aggregate UDFs
+ * =========================================================================
+ * Rule 1 — Never replace the framework-provided buffer pointer.
+ * The framework calls taosMemoryMalloc(bufSize) before every AGG_PROC
+ * invocation and stores the result in interBuf->buf / newInterBuf->buf.
+ * If the UDF overwrites these pointers with its own malloc the original
+ * allocation leaks (bufSize × number-of-AGG_PROC-calls bytes total).
+ * Always write state into the pre-allocated buffer via memcpy.
+ *
+ * Rule 2 — The UDF owns every heap pointer stored inside the state struct.
+ * freeUdfInterBuf() frees only the container buffer (interBuf->buf), not
+ * any pointers embedded in the state. The UDF must release state->values
+ * in perm_entropy_finish and in every error path that abandons the state.
+ *
+ * =========================================================================
+ * Compile:
+ * gcc -fPIC -shared perm_entropy.c \
+ * -I/usr/local/taos/include \
+ * -lm -o libperm_entropy.so
+ *
+ * Register:
+ * CREATE AGGREGATE FUNCTION perm_entropy
+ * AS '/path/to/libperm_entropy.so'
+ * OUTPUTTYPE DOUBLE
+ * BUFSIZE 256;
+ * =========================================================================
+ */
+
+#include
+#include
+#include
+#include "taos.h"
+#include "taoserror.h"
+#include "taosudf.h"
+
+#define EMBED_DIM 5
+#define DELAY 1
+#define MAX_EMBED_DIM 8
+
+/* Intermediate state stored inside interBuf->buf.
+ * The 'values' pointer is heap-allocated by the UDF and must be freed
+ * by perm_entropy_finish. */
+typedef struct {
+ int embed_dim;
+ int delay;
+ int64_t values_count;
+ int64_t values_capacity;
+ double *values; /* heap array – owned by the UDF, NOT by the framework */
+} PermEntropyState;
+
+/* ------------------------------------------------------------------ helpers */
+
+static int32_t ensure_capacity(PermEntropyState *state, int64_t required) {
+ if (required <= state->values_capacity) return TSDB_CODE_SUCCESS;
+
+ int64_t new_cap = state->values_capacity > 0 ? state->values_capacity : 1024;
+ while (new_cap < required) {
+ if (new_cap > INT64_MAX / 2) { new_cap = required; break; }
+ new_cap *= 2;
+ }
+ if (new_cap > (int64_t)(SIZE_MAX / sizeof(double))) return TSDB_CODE_OUT_OF_MEMORY;
+
+ double *p = (double *)realloc(state->values, (size_t)new_cap * sizeof(double));
+ if (p == NULL) return TSDB_CODE_OUT_OF_MEMORY;
+ state->values = p;
+ state->values_capacity = new_cap;
+ return TSDB_CODE_SUCCESS;
+}
+
+static double compute_perm_entropy(const double *data, int n, int embed_dim, int delay) {
+ if (data == NULL || n < embed_dim || embed_dim <= 1 || embed_dim > MAX_EMBED_DIM)
+ return 0.0;
+
+ int n_windows = n - (embed_dim - 1) * delay;
+ if (n_windows <= 0) return 0.0;
+ int n_patterns = 1;
+ for (int i = 2; i <= embed_dim; i++) n_patterns *= i;
+
+ int *counts = (int *)calloc(n_patterns, sizeof(int));
+ if (counts == NULL) return NAN;
+ for (int w = 0; w < n_windows; w++) {
+ double v[MAX_EMBED_DIM];
+ int idx[MAX_EMBED_DIM];
+ int rank[MAX_EMBED_DIM];
+
+ for (int j = 0; j < embed_dim; j++) { v[j] = data[w + j * delay]; idx[j] = j; }
+
+ /* insertion sort by value (stable: tie-break by index) */
+ for (int j = 0; j < embed_dim - 1; j++)
+ for (int k = j + 1; k < embed_dim; k++)
+ if (v[idx[j]] > v[idx[k]] ||
+ (v[idx[j]] == v[idx[k]] && idx[j] > idx[k])) {
+ int t = idx[j]; idx[j] = idx[k]; idx[k] = t;
+ }
+
+ for (int j = 0; j < embed_dim; j++) rank[idx[j]] = j;
+
+ /* Lehmer code → pattern index */
+ int pat = 0;
+ for (int j = 0; j < embed_dim; j++) {
+ int c = 0;
+ for (int k = j + 1; k < embed_dim; k++) if (rank[k] < rank[j]) c++;
+ pat = pat * (embed_dim - j) + c;
+ }
+ counts[pat]++;
+ }
+
+ double entropy = 0.0;
+ for (int i = 0; i < n_patterns; i++) {
+ if (counts[i] > 0) {
+ double p = (double)counts[i] / n_windows;
+ entropy -= p * log2(p);
+ }
+ }
+ free(counts);
+
+ double max_entropy = log2((double)n_patterns);
+ return max_entropy > 0 ? entropy / max_entropy : 0.0;
+}
+
+/* ------------------------------------------------------------------ UDF API */
+
+DLL_EXPORT int32_t perm_entropy_init() { return TSDB_CODE_SUCCESS; }
+DLL_EXPORT int32_t perm_entropy_destroy() { return TSDB_CODE_SUCCESS; }
+
+DLL_EXPORT int32_t perm_entropy_start(SUdfInterBuf *interBuf) {
+ if (interBuf->bufLen < (int32_t)sizeof(PermEntropyState)) {
+ udfError("perm_entropy_start: bufLen %d < required %d",
+ interBuf->bufLen, (int32_t)sizeof(PermEntropyState));
+ return TSDB_CODE_UDF_INVALID_BUFSIZE;
+ }
+ /* Write directly into the framework-provided buffer – do NOT malloc. */
+ PermEntropyState *state = (PermEntropyState *)interBuf->buf;
+ memset(state, 0, sizeof(PermEntropyState));
+ state->embed_dim = EMBED_DIM;
+ state->delay = DELAY;
+ return TSDB_CODE_SUCCESS;
+}
+
+DLL_EXPORT int32_t perm_entropy(SUdfDataBlock *block, SUdfInterBuf *interBuf,
+ SUdfInterBuf *newInterBuf) {
+ if (block->numOfCols != 1) return TSDB_CODE_UDF_INVALID_INPUT;
+ SUdfColumn *col = block->udfCols[0];
+
+ /* Reject non-numeric column types up front. */
+ switch (col->colMeta.type) {
+ case TSDB_DATA_TYPE_TINYINT:
+ case TSDB_DATA_TYPE_SMALLINT:
+ case TSDB_DATA_TYPE_INT:
+ case TSDB_DATA_TYPE_BIGINT:
+ case TSDB_DATA_TYPE_FLOAT:
+ case TSDB_DATA_TYPE_DOUBLE:
+ break;
+ default:
+ return TSDB_CODE_UDF_INVALID_INPUT;
+ }
+
+ /* Count valid (non-NULL) rows in this chunk. */
+ int64_t valid = 0;
+ for (int32_t i = 0; i < block->numOfRows; i++)
+ if (!udfColDataIsNull(col, i)) valid++;
+
+ /* Work on a value copy of the current state. At the end we commit it
+ * back into the framework container newInterBuf->buf via memcpy.
+ */
+ PermEntropyState newState = *(PermEntropyState *)interBuf->buf;
+
+ if (valid > 0) {
+ int32_t code = ensure_capacity(&newState, newState.values_count + valid);
+ if (code != TSDB_CODE_SUCCESS) {
+ /* realloc leaves the original pointer intact on failure, so
+ * newState.values still aliases interBuf->buf->values.
+ * Free the heap array through the framework buffer;
+ * freeUdfInterBuf() only frees the container, not this pointer. */
+ PermEntropyState *orig = (PermEntropyState *)interBuf->buf;
+ free(orig->values);
+ orig->values = NULL;
+ return code;
+ }
+ for (int32_t i = 0; i < block->numOfRows; i++) {
+ if (udfColDataIsNull(col, i)) continue;
+ char *raw = udfColDataGetData(col, i);
+ double v = 0.0;
+ switch (col->colMeta.type) {
+ case TSDB_DATA_TYPE_TINYINT: v = (double)(*(int8_t *)raw); break;
+ case TSDB_DATA_TYPE_SMALLINT: v = (double)(*(int16_t *)raw); break;
+ case TSDB_DATA_TYPE_INT: v = (double)(*(int32_t *)raw); break;
+ case TSDB_DATA_TYPE_BIGINT: v = (double)(*(int64_t *)raw); break;
+ case TSDB_DATA_TYPE_FLOAT: v = (double)(*(float *)raw); break;
+ case TSDB_DATA_TYPE_DOUBLE: v = *(double *)raw; break;
+ default: continue;
+ }
+ newState.values[newState.values_count++] = v;
+ }
+ }
+
+ /*
+ * Commit the updated state into the framework's pre-allocated
+ * newInterBuf->buf via memcpy. The framework owns this buffer;
+ * never replace the pointer with a new allocation.
+ */
+ if (newInterBuf->buf == NULL ||
+ newInterBuf->bufLen < (int32_t)sizeof(PermEntropyState)) {
+ udfError("perm_entropy: newInterBuf too small or NULL (bufLen=%d, required=%d)",
+ newInterBuf->bufLen, (int32_t)sizeof(PermEntropyState));
+ /* Free the heap array unconditionally and clear orig->values:
+ * - realloc moved array: newState.values is the new block;
+ * orig->values is already dangling (freed internally by realloc).
+ * - realloc in-place or no realloc: newState.values == orig->values;
+ * freeing once via newState.values is correct.
+ * Clear orig->values in both cases so freeUdfInterBuf() cannot double-free. */
+ PermEntropyState *orig = (PermEntropyState *)interBuf->buf;
+ free(newState.values);
+ newState.values = NULL;
+ if (orig != NULL) orig->values = NULL;
+ return TSDB_CODE_UDF_INVALID_BUFSIZE;
+ }
+ memcpy(newInterBuf->buf, &newState, sizeof(PermEntropyState));
+ newInterBuf->bufLen = sizeof(PermEntropyState);
+ newInterBuf->numOfResult = 0;
+ return TSDB_CODE_SUCCESS;
+}
+
+DLL_EXPORT int32_t perm_entropy_finish(SUdfInterBuf *interBuf,
+ SUdfInterBuf *resultData) {
+ if (interBuf->buf == NULL) { resultData->numOfResult = 0; return TSDB_CODE_SUCCESS; }
+
+ PermEntropyState *state = (PermEntropyState *)interBuf->buf;
+
+ if (state->values_count < state->embed_dim || state->values == NULL) {
+ /* Free heap array before returning an insufficient-data result. */
+ if (state->values != NULL) { free(state->values); state->values = NULL; }
+ resultData->numOfResult = 0;
+ return TSDB_CODE_SUCCESS;
+ }
+
+ double entropy = compute_perm_entropy(state->values, (int)state->values_count,
+ state->embed_dim, state->delay);
+
+ /* resultData->buf is also pre-allocated by the framework. */
+ if (resultData->buf == NULL ||
+ resultData->bufLen < (int32_t)sizeof(double)) {
+ udfError("perm_entropy_finish: resultData buf too small or NULL");
+ free(state->values); state->values = NULL;
+ return TSDB_CODE_UDF_INVALID_BUFSIZE;
+ }
+ *(double *)resultData->buf = entropy;
+ resultData->bufLen = sizeof(double);
+ resultData->numOfResult = 1;
+
+ /* Free the heap array now that computation is complete. */
+ free(state->values);
+ state->values = NULL;
+ state->values_count = 0;
+ state->values_capacity = 0;
+ return TSDB_CODE_SUCCESS;
+}
diff --git a/docs/zh/07-develop/09-udf.md b/docs/zh/07-develop/09-udf.md
index 78d791589ce9..e9ccf9d202ed 100644
--- a/docs/zh/07-develop/09-udf.md
+++ b/docs/zh/07-develop/09-udf.md
@@ -343,6 +343,78 @@ gcc -g -O0 -fPIC -shared extract_vag.c -o libextract_avg.so
+#### 聚合函数示例 4 全量累积后计算——排列熵
+
+排列熵(Permutation Entropy)由 Bandt 和 Pompe 于 2002 年提出,通过统计时间序列中各种有序排列模式的概率分布来度量序列的复杂度,广泛应用于故障检测、生理信号分析等领域。
+
+`perm_entropy` 是一种**全量累积型**聚合函数:其计算算法要求在获取窗口内全部数据后才能开始,因此各次 AGG_PROC 调用仅完成数据积累,在 `perm_entropy_finish` 回调中统一执行排列熵计算。这与 `l2norm` 等可逐行累进计算的聚合函数有本质区别。
+
+该模式涉及**两层独立的内存**,必须分清所有权:
+
+| 内存层 | 持有者 | 典型变量 | 分配/释放方 |
+|--------|--------|----------|-------------|
+| **框架容器**(固定大小,等于 BUFSIZE) | 框架 | `interBuf->buf`、`newInterBuf->buf`、`resultData->buf` | 框架在每次回调前 `malloc`,回调后 `freeUdfInterBuf()` 释放;UDF 只能写入,**不得替换指针** |
+| **UDF 堆内容**(动态大小) | UDF | `state->values`(嵌入在容器内的指针) | UDF 用 `realloc` 按需增长;框架的 `freeUdfInterBuf()` 只释放容器本身,**不感知**内部指针;UDF 必须在 `finish` 及所有错误路径中显式释放 |
+
+各回调的职责如下:
+
+- `perm_entropy_start`:将 `PermEntropyState` 以 `memset` 初始化方式写入框架提供的 `interBuf->buf`,`values` 指针置为 `NULL`(尚未分配堆内容)。
+- `perm_entropy`(AGG_PROC):
+ 1. 以值拷贝的方式将 `interBuf->buf` 的状态复制到栈变量 `newState`;
+ 2. 若本批有效行数 > 0,通过 `realloc` 扩展 UDF 堆内容(`newState.values`)并追加数据;
+ 3. 将 `newState`(含更新后的 `values` 指针)以 `memcpy` 写入框架预分配的 `newInterBuf->buf`,**绝不**用新的 `malloc` 替换 `newInterBuf->buf`,否则框架原有的 BUFSIZE 字节分配在每次 AGG_PROC 调用后丢失;
+ 4. 若 `realloc` 失败,需通过 `interBuf->buf` 释放原有的 UDF 堆内容并将指针清零,因为框架的 `freeUdfInterBuf()` 仅释放容器,不会释放其内部的 `values` 指针。
+- `perm_entropy_finish`:使用全部累积数据计算排列熵,**释放 `state->values`** 并将结果写入框架预分配的 `resultData->buf`。
+
+创建表:
+
+```sql
+-- 普通表,用于全表聚合和时间窗口查询
+CREATE TABLE vibration (ts TIMESTAMP, val DOUBLE);
+
+-- 超级表,用于按子表分组查询
+CREATE STABLE vibration_stb (ts TIMESTAMP, val DOUBLE) TAGS (device_id INT);
+CREATE TABLE vibration_d1 USING vibration_stb TAGS (1);
+CREATE TABLE vibration_d2 USING vibration_stb TAGS (2);
+```
+
+生成 `.so` 文件:
+
+```bash
+gcc -g -O0 -fPIC -shared perm_entropy.c -o libperm_entropy.so -lm
+```
+
+创建自定义函数:
+
+```sql
+CREATE AGGREGATE FUNCTION perm_entropy
+ AS '/path/to/libperm_entropy.so'
+ OUTPUTTYPE DOUBLE
+ BUFSIZE 256;
+```
+
+使用自定义函数:
+
+```sql
+-- 全表聚合,计算整张表的排列熵
+SELECT perm_entropy(val) FROM vibration;
+
+-- 时间窗口聚合,对每个窗口独立计算排列熵
+SELECT perm_entropy(val) FROM vibration INTERVAL(10s);
+
+-- 按子表分组,分别计算每个设备的排列熵
+SELECT perm_entropy(val) FROM vibration_stb PARTITION BY tbname;
+```
+
+
+perm_entropy.c
+
+```c
+{{#include docs/examples/udf/perm_entropy.c}}
+```
+
+
+
## 用 Python 语言开发 UDF
### 准备环境
diff --git a/source/libs/function/CMakeLists.txt b/source/libs/function/CMakeLists.txt
index c088113c76e6..643c6bd029f5 100644
--- a/source/libs/function/CMakeLists.txt
+++ b/source/libs/function/CMakeLists.txt
@@ -68,8 +68,6 @@ if(UNIX AND NOT APPLE)
)
endif()
-if(${BUILD_TEST})
- add_subdirectory(test)
-endif()
+add_subdirectory(test)
endif(${BUILD_WITH_UDF})
diff --git a/source/libs/function/test/CMakeLists.txt b/source/libs/function/test/CMakeLists.txt
index 2456b72b9da0..0173cc4fba51 100644
--- a/source/libs/function/test/CMakeLists.txt
+++ b/source/libs/function/test/CMakeLists.txt
@@ -5,15 +5,25 @@ include_directories("${TD_SOURCE_DIR}/include/client")
include_directories("${TD_SOURCE_DIR}/include/os")
include_directories("${CMAKE_CURRENT_SOURCE_DIR}/../inc")
+if(${BUILD_TEST})
add_executable(runUdf runUdf.c)
target_link_libraries(
runUdf
PRIVATE os util common nodes function
)
+endif(${BUILD_TEST})
add_library(udf1 STATIC MODULE udf1.c)
target_link_libraries(udf1 PUBLIC os)
+# perm_entropy: aggregate UDF example demonstrating the
+# "accumulate-all-data-then-compute" pattern (permutation entropy).
+add_library(perm_entropy STATIC MODULE perm_entropy.c)
+target_link_libraries(perm_entropy PUBLIC os)
+if(NOT TD_WINDOWS)
+ target_link_libraries(perm_entropy PRIVATE m)
+endif()
+
add_library(udf2 STATIC MODULE udf2.c)
target_link_libraries(udf2 PUBLIC os)
diff --git a/source/libs/function/test/perm_entropy.c b/source/libs/function/test/perm_entropy.c
new file mode 100644
index 000000000000..8205e5d87d16
--- /dev/null
+++ b/source/libs/function/test/perm_entropy.c
@@ -0,0 +1,205 @@
+/*
+ * perm_entropy — TDengine C UDF: aggregate permutation entropy
+ *
+ * Permutation entropy (Bandt & Pompe, PRL 2002) measures the complexity
+ * of a time series by computing the Shannon entropy of the distribution of
+ * ordinal patterns found in overlapping embedding windows of dimension
+ * EMBED_DIM. The result is normalised to [0, 1].
+ *
+ * This file is the CI build copy used by the function-test suite.
+ * The canonical documented version lives at docs/examples/udf/perm_entropy.c.
+ */
+
+#include
+#include
+#include
+#include "taosudf.h"
+
+#define EMBED_DIM 5
+#define DELAY 1
+#define MAX_EMBED_DIM 8
+
+typedef struct {
+ int embed_dim;
+ int delay;
+ int64_t values_count;
+ int64_t values_capacity;
+ double *values;
+} PermEntropyState;
+
+static int32_t ensure_capacity(PermEntropyState *state, int64_t required) {
+ if (required <= state->values_capacity) return TSDB_CODE_SUCCESS;
+
+ int64_t new_cap = state->values_capacity > 0 ? state->values_capacity : 1024;
+ while (new_cap < required) {
+ if (new_cap > INT64_MAX / 2) { new_cap = required; break; }
+ new_cap *= 2;
+ }
+ if (new_cap > (int64_t)(SIZE_MAX / sizeof(double))) return TSDB_CODE_OUT_OF_MEMORY;
+
+ double *p = (double *)realloc(state->values, (size_t)new_cap * sizeof(double));
+ if (p == NULL) return TSDB_CODE_OUT_OF_MEMORY;
+ state->values = p;
+ state->values_capacity = new_cap;
+ return TSDB_CODE_SUCCESS;
+}
+
+static double compute_perm_entropy(const double *data, int n, int embed_dim, int delay) {
+ if (data == NULL || n < embed_dim || embed_dim <= 1 || embed_dim > MAX_EMBED_DIM) return 0.0;
+
+ int n_windows = n - (embed_dim - 1) * delay;
+ if (n_windows <= 0) return 0.0;
+ int n_patterns = 1;
+ for (int i = 2; i <= embed_dim; i++) n_patterns *= i;
+
+ int *counts = (int *)calloc(n_patterns, sizeof(int));
+ if (counts == NULL) return NAN;
+ for (int w = 0; w < n_windows; w++) {
+ double v[MAX_EMBED_DIM];
+ int idx[MAX_EMBED_DIM];
+ int rank[MAX_EMBED_DIM];
+ for (int j = 0; j < embed_dim; j++) {
+ v[j] = data[w + j * delay];
+ idx[j] = j;
+ }
+ for (int j = 0; j < embed_dim - 1; j++)
+ for (int k = j + 1; k < embed_dim; k++)
+ if (v[idx[j]] > v[idx[k]] || (v[idx[j]] == v[idx[k]] && idx[j] > idx[k])) {
+ int t = idx[j];
+ idx[j] = idx[k];
+ idx[k] = t;
+ }
+ for (int j = 0; j < embed_dim; j++) rank[idx[j]] = j;
+ int pat = 0;
+ for (int j = 0; j < embed_dim; j++) {
+ int c = 0;
+ for (int k = j + 1; k < embed_dim; k++)
+ if (rank[k] < rank[j]) c++;
+ pat = pat * (embed_dim - j) + c;
+ }
+ counts[pat]++;
+ }
+
+ double entropy = 0.0;
+ for (int i = 0; i < n_patterns; i++) {
+ if (counts[i] > 0) {
+ double p = (double)counts[i] / n_windows;
+ entropy -= p * log2(p);
+ }
+ }
+ free(counts);
+
+ double max_entropy = log2((double)n_patterns);
+ return max_entropy > 0 ? entropy / max_entropy : 0.0;
+}
+
+DLL_EXPORT int32_t perm_entropy_init() { return TSDB_CODE_SUCCESS; }
+DLL_EXPORT int32_t perm_entropy_destroy() { return TSDB_CODE_SUCCESS; }
+
+DLL_EXPORT int32_t perm_entropy_start(SUdfInterBuf *interBuf) {
+ if (interBuf->bufLen < (int32_t)sizeof(PermEntropyState))
+ return TSDB_CODE_UDF_INVALID_BUFSIZE;
+ PermEntropyState *state = (PermEntropyState *)interBuf->buf;
+ memset(state, 0, sizeof(PermEntropyState));
+ state->embed_dim = EMBED_DIM;
+ state->delay = DELAY;
+ return TSDB_CODE_SUCCESS;
+}
+
+DLL_EXPORT int32_t perm_entropy(SUdfDataBlock *block, SUdfInterBuf *interBuf,
+ SUdfInterBuf *newInterBuf) {
+ if (block->numOfCols != 1) return TSDB_CODE_UDF_INVALID_INPUT;
+ SUdfColumn *col = block->udfCols[0];
+
+ switch (col->colMeta.type) {
+ case TSDB_DATA_TYPE_TINYINT:
+ case TSDB_DATA_TYPE_SMALLINT:
+ case TSDB_DATA_TYPE_INT:
+ case TSDB_DATA_TYPE_BIGINT:
+ case TSDB_DATA_TYPE_FLOAT:
+ case TSDB_DATA_TYPE_DOUBLE:
+ break;
+ default:
+ return TSDB_CODE_UDF_INVALID_INPUT;
+ }
+
+ int64_t valid = 0;
+ for (int32_t i = 0; i < block->numOfRows; i++)
+ if (!udfColDataIsNull(col, i)) valid++;
+
+ PermEntropyState newState = *(PermEntropyState *)interBuf->buf;
+
+ if (valid > 0) {
+ int32_t code = ensure_capacity(&newState, newState.values_count + valid);
+ if (code != TSDB_CODE_SUCCESS) {
+ PermEntropyState *orig = (PermEntropyState *)interBuf->buf;
+ free(orig->values);
+ orig->values = NULL;
+ return code;
+ }
+ for (int32_t i = 0; i < block->numOfRows; i++) {
+ if (udfColDataIsNull(col, i)) continue;
+ char *raw = udfColDataGetData(col, i);
+ double v = 0.0;
+ switch (col->colMeta.type) {
+ case TSDB_DATA_TYPE_TINYINT: v = (double)(*(int8_t *)raw); break;
+ case TSDB_DATA_TYPE_SMALLINT: v = (double)(*(int16_t *)raw); break;
+ case TSDB_DATA_TYPE_INT: v = (double)(*(int32_t *)raw); break;
+ case TSDB_DATA_TYPE_BIGINT: v = (double)(*(int64_t *)raw); break;
+ case TSDB_DATA_TYPE_FLOAT: v = (double)(*(float *)raw); break;
+ case TSDB_DATA_TYPE_DOUBLE: v = *(double *)raw; break;
+ default: continue;
+ }
+ newState.values[newState.values_count++] = v;
+ }
+ }
+
+ if (newInterBuf->buf == NULL ||
+ newInterBuf->bufLen < (int32_t)sizeof(PermEntropyState)) {
+ /* Free the heap array unconditionally and clear orig->values:
+ * - realloc moved array: newState.values is the new block;
+ * orig->values is already dangling (freed internally by realloc).
+ * - realloc in-place or no realloc: newState.values == orig->values;
+ * freeing once via newState.values is correct.
+ * Clear orig->values so freeUdfInterBuf() cannot double-free. */
+ PermEntropyState *orig = (PermEntropyState *)interBuf->buf;
+ free(newState.values);
+ newState.values = NULL;
+ if (orig != NULL) orig->values = NULL;
+ return TSDB_CODE_UDF_INVALID_BUFSIZE;
+ }
+ memcpy(newInterBuf->buf, &newState, sizeof(PermEntropyState));
+ newInterBuf->bufLen = sizeof(PermEntropyState);
+ newInterBuf->numOfResult = 0;
+ return TSDB_CODE_SUCCESS;
+}
+
+DLL_EXPORT int32_t perm_entropy_finish(SUdfInterBuf *interBuf,
+ SUdfInterBuf *resultData) {
+ if (interBuf->buf == NULL) { resultData->numOfResult = 0; return TSDB_CODE_SUCCESS; }
+
+ PermEntropyState *state = (PermEntropyState *)interBuf->buf;
+
+ if (state->values_count < state->embed_dim || state->values == NULL) {
+ if (state->values != NULL) { free(state->values); state->values = NULL; }
+ resultData->numOfResult = 0;
+ return TSDB_CODE_SUCCESS;
+ }
+
+ double entropy = compute_perm_entropy(state->values, (int)state->values_count,
+ state->embed_dim, state->delay);
+
+ if (resultData->buf == NULL || resultData->bufLen < (int32_t)sizeof(double)) {
+ free(state->values); state->values = NULL;
+ return TSDB_CODE_UDF_INVALID_BUFSIZE;
+ }
+ *(double *)resultData->buf = entropy;
+ resultData->bufLen = sizeof(double);
+ resultData->numOfResult = 1;
+
+ free(state->values);
+ state->values = NULL;
+ state->values_count = 0;
+ state->values_capacity = 0;
+ return TSDB_CODE_SUCCESS;
+}
diff --git a/test/cases/12-UDFs/test_udf_restart_taosd.py b/test/cases/12-UDFs/test_udf_restart_taosd.py
index dd400b4768cf..db973953a858 100644
--- a/test/cases/12-UDFs/test_udf_restart_taosd.py
+++ b/test/cases/12-UDFs/test_udf_restart_taosd.py
@@ -4,6 +4,8 @@
import time
import os
import platform
+import math
+import random
import subprocess
@@ -37,6 +39,31 @@ def prepare_udf_so(self):
self.libudf1 = self.libudf1.replace('\r','').replace('\n','')
self.libudf2 = self.libudf2.replace('\r','').replace('\n','')
+ def prepare_perm_entropy_so(self):
+ """Locate libperm_entropy.so in the build tree (same approach as libudf1/libudf2)."""
+ selfPath = os.path.dirname(os.path.realpath(__file__))
+ projPath = selfPath[:selfPath.find("community")] if "community" in selfPath else selfPath[:selfPath.find("tests")]
+ if platform.system().lower() == 'windows':
+ self.libperm_entropy = subprocess.Popen(
+ '(for /r %s %%i in ("perm_entropy.d*") do @echo %%i)|grep lib|head -n1' % projPath,
+ shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
+ ).stdout.read().decode("utf-8")
+ if not tdDnodes.dnodes[0].remoteIP == "":
+ tdDnodes.dnodes[0].remote_conn.get(
+ tdDnodes.dnodes[0].config["path"] + '/debug/build/lib/libperm_entropy.so',
+ projPath + "\\debug\\build\\lib\\"
+ )
+ self.libperm_entropy = self.libperm_entropy.replace('perm_entropy.dll', 'libperm_entropy.so')
+ else:
+ self.libperm_entropy = subprocess.Popen(
+ 'find %s -name "libperm_entropy.so" | grep lib | head -n1' % projPath,
+ shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
+ ).stdout.read().decode("utf-8")
+ self.libperm_entropy = self.libperm_entropy.replace('\r', '').replace('\n', '')
+ if not self.libperm_entropy:
+ tdLog.exit("libperm_entropy.so not found under %s. Build it first." % projPath)
+ tdLog.info("libperm_entropy path: %s" % self.libperm_entropy)
+
def prepare_data(self):
@@ -639,7 +666,232 @@ def test_udf_restart_taosd(self):
self.multi_cols_udf()
self.restart_taosd_query_udf()
+ # ------------------------------------------------------------------ perm_entropy tests
+
+ def test_perm_entropy(self):
+ """perm_entropy aggregate UDF – accumulate-all-data-then-compute pattern
+
+ perm_entropy is a special aggregate UDF: it cannot produce any result
+ until ALL rows in the window have been delivered. During processing
+ (AGG_PROC calls) it only accumulates values into a heap-allocated array
+ carried inside the intermediate buffer. The actual permutation-entropy
+ computation happens entirely inside perm_entropy_finish after all data
+ has been collected.
+
+ Tests:
+ 1. Locate libperm_entropy.so in the build tree.
+ 2. Create a database with monotonically increasing and mixed data.
+ 3. Register the perm_entropy aggregate UDF (BUFSIZE 256).
+ 4. Verify correctness:
+ - Monotonically increasing data → entropy = 0.0 (single permutation pattern).
+ - Fewer rows than embed_dim (5) → NULL result (no result).
+ - Interval window partitioning → each window returns its own entropy.
+ 5. Verify the UDF survives a taosd restart and returns the same results.
+
+ Since: v3.0.0.0
+
+ Labels: common,ci
+
+ Jira: None
+
+ History:
+ - 2026-04-21 Created for perm_entropy accumulate-all-data UDF CI coverage
+ """
+ self.prepare_perm_entropy_so()
+
+ # Use a unique database name per run to avoid "Database in dropping status"
+ # when a previous test run was terminated before DROP completed.
+ db_name = "perm_db_%d" % random.randint(10000, 99999)
+
+ # ---- create a dedicated database
+ tdSql.execute("create database %s duration 100" % db_name)
+ tdSql.execute("use %s" % db_name)
+
+ # ---- monotonically increasing data: 20 rows, values 0.0 … 19.0
+ # Expected perm_entropy = 0.0 (all windows have the same ascending rank
+ # pattern → single permutation → p=1 → -log2(1)=0 → normalised 0.0)
+ tdSql.execute("create table mono_t (ts timestamp, val double)")
+ # align ts_base to a 10-second boundary so INTERVAL(10s) yields
+ # exactly the expected number of windows without overflow rows
+ ts_base = 1652517450000 # 1652517450s, divisible by 10
+ for i in range(20):
+ tdSql.execute("insert into mono_t values (%d, %f)" % (ts_base + i * 1000, float(i)))
+
+ # ---- mixed sinusoidal data split across two child tables of a supertable
+ tdSql.execute(
+ "create stable perm_stb (ts timestamp, val double) tags (grp int)"
+ )
+ tdSql.execute("create table perm_t0 using perm_stb tags(0)")
+ tdSql.execute("create table perm_t1 using perm_stb tags(1)")
+ for i in range(30):
+ v = math.sin(i * 0.3)
+ tdSql.execute("insert into perm_t0 values (%d, %f)" % (ts_base + i * 1000, v))
+ tdSql.execute("insert into perm_t1 values (%d, %f)" % (ts_base + i * 1000, -v))
+
+ # ---- register perm_entropy as an aggregate UDF
+ # BUFSIZE only needs to cover sizeof(PermEntropyState) ≈ 40 bytes;
+ # 256 bytes is more than sufficient.
+
+ tdSql.execute(
+ "create aggregate function perm_entropy as '%s' "
+ "outputtype double bufsize 256" % self.libperm_entropy
+ )
+ tdLog.info("perm_entropy UDF registered from %s" % self.libperm_entropy)
+
+ # ---- test 1: monotonically increasing data → entropy = 0.0
+ tdSql.query("select perm_entropy(val) from mono_t")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 0.000000000)
+ tdLog.info("test1 pass: monotonic data → entropy=0.0")
+
+ # ---- test 2: too few rows (< embed_dim=5) → NULL / 0 rows
+ tdSql.execute("create table tiny_t (ts timestamp, val double)")
+ for i in range(3):
+ tdSql.execute("insert into tiny_t values (%d, %f)" % (ts_base + i * 1000, float(i)))
+ tdSql.query("select perm_entropy(val) from tiny_t")
+ # perm_entropy_finish returns numOfResult=0 for values_count < embed_dim.
+ # Depending on the engine version this surfaces as either 0 rows or 1 NULL row.
+ rows = tdSql.queryRows
+ if rows == 0:
+ tdLog.info("test2 pass: too few rows → empty result set")
+ else:
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, None)
+ tdLog.info("test2 pass: too few rows → NULL")
+
+ # ---- test 3: interval window – each window accumulates independently
+ # 30 rows @ 1s interval → three 10s windows; each should return a value
+ tdSql.query(
+ "select perm_entropy(val) from perm_t0 interval(10s)"
+ )
+ tdSql.checkRows(3)
+ tdLog.info("test3 pass: interval window returns 3 rows")
+
+ # ---- test 4: partition by subtable via supertable query
+ tdSql.query(
+ "select perm_entropy(val) from perm_stb partition by tbname"
+ )
+ tdSql.checkRows(2)
+ tdLog.info("test4 pass: partition by tbname returns 2 rows")
+
+ # ---- test 5: taosd restart – perm_entropy must still work after restart
+ tdDnodes.stop(1)
+ tdDnodes.start(1)
+ time.sleep(5) # give taosd enough time to re-load UDF registry
+ tdSql.execute("use %s" % db_name)
+ tdSql.query("select perm_entropy(val) from mono_t")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 0.000000000)
+ tdLog.info("test5 pass: perm_entropy works after taosd restart")
+
+ # ---- cleanup
+ tdSql.execute("drop function perm_entropy")
+ tdSql.execute("drop database %s" % db_name)
+
+ # ------------------------------------------------------------------ helpers
+
+ def _get_taosudf_rss_kb(self):
+ """Return the combined RSS (kB) of all running taosudf processes via /proc."""
+ total = 0
+ try:
+ for entry in os.listdir("/proc"):
+ if not entry.isdigit():
+ continue
+ try:
+ with open("/proc/%s/comm" % entry) as f:
+ if f.read().strip() != "taosudf":
+ continue
+ with open("/proc/%s/status" % entry) as f:
+ for line in f:
+ if line.startswith("VmRSS:"):
+ total += int(line.split()[1])
+ break
+ except (FileNotFoundError, ValueError, PermissionError):
+ pass
+ except OSError:
+ pass
+ return total
+
+ def test_perm_entropy_rss_leak(self):
+ """perm_entropy aggregate UDF – repeated query to exercise accumulate/finish path.
+
+ Runs REPEAT_ROUNDS of diverse aggregate queries (supertable partition,
+ interval window, single-table) and logs taosudf RSS across rounds.
+ Memory leak detection is handled by ASAN on taosd shutdown.
+
+ Tests:
+ 1. Register perm_entropy UDF from the build tree.
+ 2. Insert synthetic data into a supertable with multiple child tables.
+ 3. Run REPEAT_ROUNDS of aggregate queries per round.
+
+ Since: v3.0.0.0
+ Labels: common,ci
+ Jira: None
+
+ History:
+ - 2026-04-21 Created for perm_entropy UDF memory-leak investigation
+ """
+ REPEAT_ROUNDS = 5
+ ROWS_PER_TABLE = 80
+ CHILD_TABLES = 2
+
+ self.prepare_perm_entropy_so()
+
+ db_name = "perm_leak_%d" % random.randint(10000, 99999)
+ stable = "vibration"
+ tdSql.execute("create database %s vgroups 2" % db_name)
+ tdSql.execute("use %s" % db_name)
+ tdSql.execute(
+ "create stable %s (ts timestamp, val double) tags (tid int)" % stable
+ )
+ for i in range(CHILD_TABLES):
+ tdSql.execute("create table t%d using %s tags(%d)" % (i, stable, i))
+
+ base_ts = 1700000000000
+ for i in range(CHILD_TABLES):
+ rows_sql = ",".join(
+ "(%d, %f)" % (base_ts + j * 1000, float(j % 100) + 0.1 * i)
+ for j in range(ROWS_PER_TABLE)
+ )
+ tdSql.execute("insert into t%d values %s" % (i, rows_sql))
+ tdLog.info("inserted %d rows across %d tables" % (CHILD_TABLES * ROWS_PER_TABLE, CHILD_TABLES))
+
+ tdSql.execute(
+ "create aggregate function perm_entropy as '%s' outputtype double bufsize 256"
+ % self.libperm_entropy
+ )
+ tdLog.info("perm_entropy UDF registered from %s" % self.libperm_entropy)
+
+ queries = [
+ "select perm_entropy(val) from %s partition by tbname" % stable,
+ "select perm_entropy(val) from %s interval(10s)" % stable,
+ "select perm_entropy(val) from t0" ,
+ ]
+
+ rss_samples = []
+ for r in range(REPEAT_ROUNDS):
+ rss_before = self._get_taosudf_rss_kb()
+ for sql in queries:
+ t0 = time.time()
+ tdSql.query(sql)
+ elapsed = time.time() - t0
+ tdLog.info("[round %d] rows=%d elapsed=%.3fs %s" % (r, tdSql.getRows(), elapsed, sql))
+ time.sleep(0.5)
+ rss_after = self._get_taosudf_rss_kb()
+ rss_samples.append(rss_after)
+ tdLog.info("[round %d] taosudf RSS before=%d KB after=%d KB" % (r, rss_before, rss_after))
+
+ if len(rss_samples) >= 2:
+ first = next((v for v in rss_samples if v > 0), 0)
+ last = rss_samples[-1]
+ tdLog.info("taosudf RSS: start=%d KB end=%d KB growth=%d KB (%.1f MB)"
+ % (first, last, last - first, (last - first) / 1024.0))
+ else:
+ tdLog.info("taosudf not observed via /proc – skipping RSS log")
+ # ---- cleanup
+ tdSql.execute("drop function perm_entropy")
+ tdSql.execute("drop database %s" % db_name)