Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b40e2a4
test(stream): add extractScanColsFromPlanJson helper for scan plan as…
wangmm0220 Apr 24, 2026
2744217
test(stream): add Task 2 red test for client AST scan-col pruning
wangmm0220 Apr 27, 2026
ec9b62c
fix(stream): drop calc-only cols leaked into trigger scan plan
wangmm0220 Apr 27, 2026
f07ff2f
test(stream): add Task 4 red test for pre_filter calc compensation
wangmm0220 Apr 27, 2026
bd48c48
feat(stream): inject pre_filter into calc when using %%trows
wangmm0220 Apr 27, 2026
a3271fb
test(stream): add T2-T8 variants for scan col pruning
wangmm0220 Apr 27, 2026
ddae16d
test(stream): add failing T3 for virtual table + %%trows + pre_filter…
wangmm0220 Apr 27, 2026
9efa419
feat(stream): unblock virtual table + pre_filter + %%trows
wangmm0220 Apr 27, 2026
f11782b
Merge branch '3.0' into feat/6490635370-new
wangmm0220 Apr 28, 2026
419878d
test(stream/c): remove StreamReaderTsdbV6Remap fixture
wangmm0220 Apr 28, 2026
b184e2b
feat(stream/c): TSDB reader 接口重构 + 虚拟表 slotId 映射
wangmm0220 Apr 28, 2026
91183b0
refactor(stream): 合并 WAL data 与 WAL calc data handler
wangmm0220 Apr 28, 2026
100a524
feat: stream mnode old plan dual-mode compatibility
wangmm0220 Apr 28, 2026
4b5b785
feat: [TS-6490635370] vnode isOldPlan dual-mode
wangmm0220 Apr 28, 2026
8054491
fix(stream/c): processSlotInfo move metaReader off W lock
wangmm0220 Apr 29, 2026
aec06ae
refactor(stream): address review feedback for TSDB v6 reader
wangmm0220 Apr 30, 2026
dae4bf0
fix(stream): docs & compile error
wangmm0220 Apr 30, 2026
8778800
fix(stream): docs & compile error
wangmm0220 Apr 30, 2026
1d2c4c0
fix(stream): plug parser leak and tidy slot/option helpers
wangmm0220 Apr 30, 2026
dd083b1
test(stream): redesign PR #35233 test suite
wangmm0220 Apr 30, 2026
51a84b5
test(stream): add C5/C6 source-grep guards for vnodeStream helpers
wangmm0220 Apr 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/scripts/check_enum_append_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def check_enum_values_preserved(old_list, new_list, ignore_list):
"EDriverType": {"DRIVER_MAX"},
"EGrantState": {"GRANT_STATE_MAX"},
"EOperType": {"MND_OPER_MAX"},
"ESTriggerPullType": {"*"},
"TSFormatKeywordId": {"*"},
}

Expand Down
40 changes: 39 additions & 1 deletion include/common/streamMsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ typedef struct {
void* triggerFilterCols; // nodelist of SColumnNode
void* triggerCols; // nodelist of SColumnNode
void* partitionCols; // nodelist of SColumnNode
// In-memory only. Set in mndStreamActionDecode/tDecodeSStreamObj when SDB
// sver indicates a legacy (sver=8) record so that downstream reader can
// pick the legacy plan-execution path (dual-mode runtime).
bool isOldPlan;
SArray* outCols; // array of SFieldWithOptions
SArray* outTags; // array of SFieldWithOptions
int64_t maxDelay; // precision is ms
Expand Down Expand Up @@ -435,6 +439,9 @@ typedef struct {
// void* triggerPrevFilter;
void* triggerScanPlan;
void* calcCacheScanPlan;
// Propagated from SCMCreateStreamReq.isOldPlan; tells reader to use the
// legacy plan-execution path for streams persisted under sver=8 (dual-mode).
int8_t isOldPlan;
} SStreamReaderDeployFromTrigger;

typedef struct {
Expand Down Expand Up @@ -692,6 +699,17 @@ typedef enum ESTriggerPullType {
STRIGGER_PULL_WAL_DATA_NEW,
STRIGGER_PULL_WAL_META_DATA_NEW,
STRIGGER_PULL_WAL_CALC_DATA_NEW,
STRIGGER_PULL_TSDB_DATA_NEW, // F5 first pull (non-vtable trigger)
STRIGGER_PULL_TSDB_DATA_NEW_NEXT, // F5 continuation
STRIGGER_PULL_TSDB_DATA_NEW_CALC, // F6 first pull (non-vtable calc)
STRIGGER_PULL_TSDB_DATA_NEW_CALC_NEXT, // F6 continuation
STRIGGER_PULL_TSDB_DATA_VTABLE_NEW, // F7 first pull (vtable trigger)
STRIGGER_PULL_TSDB_DATA_VTABLE_NEW_NEXT, // F7 continuation
STRIGGER_PULL_TSDB_DATA_VTABLE_NEW_CALC, // F8 first pull (vtable calc)
STRIGGER_PULL_TSDB_DATA_VTABLE_NEW_CALC_NEXT, // F8 continuation
// F9 history vtable list swap; unrelated to cache, performs an atomic swap
// of the three history maps (see DS §6.4).
STRIGGER_PULL_SET_TABLE_HISTORY,
STRIGGER_PULL_TYPE_MAX,
} ESTriggerPullType;

Expand Down Expand Up @@ -801,6 +819,23 @@ typedef struct SSTriggerWalDataNewRequest {
SSHashObj* ranges; // SSHash<gid, {skey, ekey}>
} SSTriggerWalDataNewRequest;

typedef struct SSTriggerTsdbDataNewRequest {
SSTriggerPullRequest base;
int64_t gid; // gid==0 means cross-uid full table
int64_t skey;
int64_t ekey;
int8_t order; // 1 asc / 2 desc
} SSTriggerTsdbDataNewRequest;

typedef struct SSTriggerTsdbDataVTableNewRequest {
SSTriggerPullRequest base;
int64_t suid;
int64_t uid;
int64_t skey;
int64_t ekey;
int8_t order;
} SSTriggerTsdbDataVTableNewRequest;

typedef struct SSTriggerWalMetaDataNewRequest {
SSTriggerPullRequest base;
int64_t lastVer;
Expand Down Expand Up @@ -852,7 +887,7 @@ void tDestroySTriggerOrigTableInfoRsp(SSTriggerOrigTableInfoRsp* pReq);

typedef union SSTriggerPullRequestUnion {
SSTriggerPullRequest base;
SSTriggerSetTableRequest setTableReq;
SSTriggerSetTableRequest setTableReq; // also F9 SET_TABLE_HISTORY
SSTriggerLastTsRequest lastTsReq;
SSTriggerFirstTsRequest firstTsReq;
SSTriggerTsdbMetaRequest tsdbMetaReq;
Expand All @@ -867,6 +902,9 @@ typedef union SSTriggerPullRequestUnion {
SSTriggerVirTableInfoRequest virTableInfoReq;
SSTriggerVirTablePseudoColRequest virTablePseudoColReq;
SSTriggerOrigTableInfoRequest origTableInfoReq;
// v3.4.2 DS v6.1 §6.1.2 new request members.
SSTriggerTsdbDataNewRequest tsdbDataNewReq; // F5/F6
SSTriggerTsdbDataVTableNewRequest tsdbDataVTableNewReq; // F7/F8
} SSTriggerPullRequestUnion;

int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTriggerPullRequest* pReq);
Expand Down
5 changes: 5 additions & 0 deletions include/libs/new-stream/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ extern "C" {
#define STREAM_MAX_THREAD_NUM 5
#define STREAM_RETURN_ROWS_NUM 4096
#define STREAM_RETURN_BLOCK_NUM 4096
// v3.4.2 sub-project C DS v6.1 §6.1.4 / §7.3 - TSDB-only batch threshold (F13).
// WAL keeps 4096 (latency); TSDB scales ~12x (throughput). Only TSDB-data-new helpers
// reference this constant; WAL paths must keep STREAM_RETURN_ROWS_NUM.
#define STREAM_RETURN_ROWS_TSDB_NUM 50000
// #define STREAM_RETURN_ROWS_NUM_NEW 1000000
#define STREAM_READER_MAX_VTABLE_INNERS_PER_TASK 1000000

#define STREAM_ACT_MIN_DELAY_MSEC (STREAM_MAX_GROUP_NUM * STREAM_HB_INTERVAL_MS)

Expand Down
78 changes: 68 additions & 10 deletions include/libs/new-stream/streamReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "plannodes.h"
#include "stream.h"
#include "streamMsg.h"
#include "tarray.h"
#include "tdatablock.h"
#include "thash.h"

Expand Down Expand Up @@ -41,23 +42,28 @@ typedef struct StreamTableListInfo {
int64_t version;
} StreamTableListInfo;

typedef struct slotInfo{
SArray* schemas;
int32_t* slotIdList;
} SlotInfo;

typedef struct SStreamTriggerReaderInfo {
void* pTask;
int32_t order;
STimeWindow twindows;
uint64_t suid;
uint64_t uid;
int8_t isOldPlan;
int8_t tableType;
int8_t isVtableStream; // whether is virtual table stream
int8_t isVtableOnlyTs;
int8_t deleteReCalc;
int8_t deleteOutTbl;
SNode* pTagCond;
SNode* pTagIndexCond;
SNode* pConditions;
SNodeList* partitionCols;
SNodeList* triggerCols;
SNodeList* triggerPseudoCols;
SNodeList* calcCols;
SHashObj* streamTaskMap;
SHashObj* groupIdMap;
SSubplan* triggerAst;
Expand All @@ -73,10 +79,8 @@ typedef struct SStreamTriggerReaderInfo {
int32_t numOfExprTriggerTag;
SExprInfo* pExprInfoCalcTag;
int32_t numOfExprCalcTag;
SSHashObj* uidHashTrigger; // < uid -> SHashObj < slotId -> colId > >
SSHashObj* uidHashCalc; // < uid -> SHashObj < slotId -> colId > >
void* historyTableList;
SFilterInfo* pFilterInfo;
SFilterInfo* pFilterInfoTrigger;
SFilterInfo* pFilterInfoCalc;
SHashObj* pTableMetaCacheTrigger;
SHashObj* pTableMetaCacheCalc;
SHashObj* triggerTableSchemaMapVTable; // key: uid, value: STSchema*
Expand All @@ -87,10 +91,60 @@ typedef struct SStreamTriggerReaderInfo {
SRWLatch lock;

StreamTableListInfo tableList;
StreamTableListInfo vSetTableList;

StreamTableListInfo vSetTableList;
SSHashObj* uidHashTrigger; // < uid -> SHashObj < slotId -> colId > >
SSHashObj* uidHashCalc; // < uid -> SHashObj < slotId -> colId > >

// ===== v3.4.2 sub-project C DS v6.1 §6.1.3 =====
// F7/F8 virtual-table two-layer cache; outer key = getSessionKey(sessionId, firstType),
// value = SHashObj* uidTaskMap (inner key = uid, value = SStreamReaderTaskInner*).
SHashObj* vtableTaskMap;

// F9 history-side triple (replaced atomically by STRIGGER_PULL_SET_TABLE_HISTORY; see DS §6.4).
StreamTableListInfo vSetTableListHistory;
SSHashObj* uidHashTriggerHistory; // <(suid,uid)[2] -> SHashObj<slotId,colId>>
SSHashObj* uidHashCalcHistory; // <(suid,uid)[2] -> SHashObj<slotId,colId>>
SSHashObj* uidHashTriggerHistorySlotInfo; // <uid -> SlotInfo>
SSHashObj* uidHashCalcHistorySlotInfo; // <uid -> SlotInfo>
} SStreamTriggerReaderInfo;

// v3.4.2 DS v6.1 §6.1.3 - normalize NEXT type to the FIRST pull type used as cache key.
// Cache key always derives from the first type, regardless of whether the request is a
// first pull or a continuation pull (see DS §6.2.1 / §6.3.2).
static inline ESTriggerPullType getFirstTypeFromNext(ESTriggerPullType t) {
switch (t) {
case STRIGGER_PULL_TSDB_DATA_NEW_NEXT: return STRIGGER_PULL_TSDB_DATA_NEW;
case STRIGGER_PULL_TSDB_DATA_NEW_CALC_NEXT: return STRIGGER_PULL_TSDB_DATA_NEW_CALC;
case STRIGGER_PULL_TSDB_DATA_VTABLE_NEW_NEXT: return STRIGGER_PULL_TSDB_DATA_VTABLE_NEW;
case STRIGGER_PULL_TSDB_DATA_VTABLE_NEW_CALC_NEXT: return STRIGGER_PULL_TSDB_DATA_VTABLE_NEW_CALC;
default: return t;
}
}

static inline bool isFirstPullType(ESTriggerPullType t) {
return getFirstTypeFromNext(t) == t;
}

// dual-mode helper: only true plan reuses a calc-side filter; old plans share trigger filter
static inline bool isNewCalc(SStreamTriggerReaderInfo* pInfo, bool isCalc) {
return !pInfo->isOldPlan && isCalc;
}

// Resource-selection helpers: pick calc-side or trigger-side resource depending on
// dual-mode state. Eliminates repeated verbose ternary chains across TSDB handlers.
static inline SSDataBlock* getResBlock(SStreamTriggerReaderInfo* pInfo, bool isCalc) {
return isNewCalc(pInfo, isCalc) ? pInfo->calcResBlock : pInfo->triggerResBlock;
}

static inline SNodeList* getScanCols(SStreamTriggerReaderInfo* pInfo, bool isCalc) {
return isNewCalc(pInfo, isCalc) ? pInfo->calcCols : pInfo->triggerCols;
}

static inline SFilterInfo* getFilterInfo(SStreamTriggerReaderInfo* pInfo, bool isCalc) {
return isNewCalc(pInfo, isCalc) ? pInfo->pFilterInfoCalc : pInfo->pFilterInfoTrigger;
}

typedef struct SStreamTriggerReaderCalcInfo {
void* pTask;
void* pFilterInfo;
Expand Down Expand Up @@ -139,7 +193,12 @@ void* qStreamGetReaderInfo(int64_t streamId, int64_t taskId, void** taskAddr);
void qStreamSetTaskRunning(int64_t streamId, int64_t taskId);
int32_t streamBuildFetchRsp(SArray* pResList, bool hasNext, void** data, size_t* size, int8_t precision);

int32_t qBuildVTableList(SStreamTriggerReaderInfo* sStreamReaderInfo);
int32_t qBuildVTableList(SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo, StreamTableListInfo* dst,
SSHashObj** uidInfoTrigger, SSHashObj** uidInfoCalc);

int32_t qBuildVTableListInto(SStreamTriggerReaderInfo* sStreamReaderInfo,
StreamTableListInfo* dst,
SSHashObj* uidInfoTrigger);

int32_t createStreamTask(void* pVnode, SStreamOptions* options, SStreamReaderTaskInner** ppTask,
SSDataBlock* pResBlock, STableKeyInfo* pList, int32_t pNum, SStorageAPI* storageApi);
Expand All @@ -156,8 +215,7 @@ int32_t qStreamGetTableListGroupNum(SStreamTriggerReaderInfo* sStreamReaderInfo
int32_t qStreamGetTableListNum(SStreamTriggerReaderInfo* sStreamReaderInfo);
SArray* qStreamGetTableArrayList(SStreamTriggerReaderInfo* sStreamReaderInfo);
int32_t qStreamIterTableList(StreamTableListInfo* sStreamReaderInfo, STableKeyInfo** pKeyInfo, int32_t* size, int64_t* suid);
uint64_t qStreamGetGroupIdFromOrigin(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid);
uint64_t qStreamGetGroupIdFromSet(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid);
uint64_t qStreamGetGroupId(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid, bool lock);
int32_t qStreamRemoveTableList(StreamTableListInfo* pTableListInfo, int64_t uid);

#ifdef __cplusplus
Expand Down
80 changes: 77 additions & 3 deletions source/common/src/msg/streamMsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ int32_t tEncodeSStreamReaderDeployFromTrigger(SEncoder* pEncoder, const SStreamR
//TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, pMsg->triggerPrevFilter == NULL ? 0 : (int32_t)strlen(pMsg->triggerPrevFilter) + 1));
TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, pMsg->triggerScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->triggerScanPlan) + 1));
TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, pMsg->calcCacheScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcCacheScanPlan) + 1));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isOldPlan));

_exit:

Expand Down Expand Up @@ -1112,6 +1113,7 @@ int32_t tDecodeSStreamReaderDeployFromTrigger(SDecoder* pDecoder, SStreamReaderD
TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerCols, NULL));
TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isOldPlan));

_exit:

Expand Down Expand Up @@ -3095,7 +3097,11 @@ void tDestroySTriggerPullRequest(SSTriggerPullRequestUnion* pReq) {
taosArrayDestroy(pRequest->cols);
pRequest->cols = NULL;
}
} else if (pReq->base.type == STRIGGER_PULL_SET_TABLE) {
} else if (pReq->base.type == STRIGGER_PULL_SET_TABLE ||
pReq->base.type == STRIGGER_PULL_SET_TABLE_HISTORY) {
// v3.4.2 DS v6.1 §6.4 - SET_TABLE and SET_TABLE_HISTORY share the same
// request struct; on success the maps are TSWAP'd into the reader info,
// otherwise we free them here.
SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
tSimpleHashCleanup(pRequest->uidInfoTrigger);
tSimpleHashCleanup(pRequest->uidInfoCalc);
Expand Down Expand Up @@ -3182,7 +3188,8 @@ int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTrigger
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));

switch (pReq->type) {
case STRIGGER_PULL_SET_TABLE: {
case STRIGGER_PULL_SET_TABLE:
case STRIGGER_PULL_SET_TABLE_HISTORY: {
SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoTrigger));
TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoCalc));
Expand Down Expand Up @@ -3327,6 +3334,42 @@ int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTrigger
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
break;
}
// v3.4.2 DS v6.1 §6.1.2 - F5/F6 non-virtual-table TSDB data first/continuation pulls.
// Continuation (_NEXT) carries no payload (cache hit by sessionId+firstType only).
case STRIGGER_PULL_TSDB_DATA_NEW:
case STRIGGER_PULL_TSDB_DATA_NEW_CALC: {
SSTriggerTsdbDataNewRequest* pRequest = (SSTriggerTsdbDataNewRequest*)pReq;
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
break;
}
case STRIGGER_PULL_TSDB_DATA_NEW_NEXT:
case STRIGGER_PULL_TSDB_DATA_NEW_CALC_NEXT: {
break;
}
// v3.4.2 DS v6.1 §6.1.2 - F7/F8 virtual-table TSDB data first/continuation pulls.
// First-pull carries ver+uid+skey+ekey+order; continuation only needs uid.
// (uid is globally unique, so suid is not transmitted; reader recovers it
// from its (suid, uid)-keyed slotColMap maps via uid-only scan.)
case STRIGGER_PULL_TSDB_DATA_VTABLE_NEW:
case STRIGGER_PULL_TSDB_DATA_VTABLE_NEW_CALC: {
SSTriggerTsdbDataVTableNewRequest* pRequest = (SSTriggerTsdbDataVTableNewRequest*)pReq;
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
break;
}
case STRIGGER_PULL_TSDB_DATA_VTABLE_NEW_NEXT:
case STRIGGER_PULL_TSDB_DATA_VTABLE_NEW_CALC_NEXT: {
SSTriggerTsdbDataVTableNewRequest* pRequest = (SSTriggerTsdbDataVTableNewRequest*)pReq;
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
break;
}

default: {
uError("unknown pull type %d", pReq->type);
code = TSDB_CODE_INVALID_PARA;
Expand Down Expand Up @@ -3409,7 +3452,8 @@ int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPull
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->sessionId));

switch (type) {
case STRIGGER_PULL_SET_TABLE: {
case STRIGGER_PULL_SET_TABLE:
case STRIGGER_PULL_SET_TABLE_HISTORY: {
SSTriggerSetTableRequest* pRequest = &(pReq->setTableReq);
TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoTrigger));
TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoCalc));
Expand Down Expand Up @@ -3563,6 +3607,36 @@ int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPull

break;
}
// v3.4.2 DS v6.1 §6.1.2 - F5/F6 deserialization mirrors serialization above.
case STRIGGER_PULL_TSDB_DATA_NEW:
case STRIGGER_PULL_TSDB_DATA_NEW_CALC: {
SSTriggerTsdbDataNewRequest* pRequest = &(pReq->tsdbDataNewReq);
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
break;
}
case STRIGGER_PULL_TSDB_DATA_NEW_NEXT:
case STRIGGER_PULL_TSDB_DATA_NEW_CALC_NEXT: {
break;
}
case STRIGGER_PULL_TSDB_DATA_VTABLE_NEW:
case STRIGGER_PULL_TSDB_DATA_VTABLE_NEW_CALC: {
SSTriggerTsdbDataVTableNewRequest* pRequest = &(pReq->tsdbDataVTableNewReq);
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
break;
}
case STRIGGER_PULL_TSDB_DATA_VTABLE_NEW_NEXT:
case STRIGGER_PULL_TSDB_DATA_VTABLE_NEW_CALC_NEXT: {
SSTriggerTsdbDataVTableNewRequest* pRequest = &(pReq->tsdbDataVTableNewReq);
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
break;
}
default: {
uError("unknown pull type %d", type);
code = TSDB_CODE_INVALID_PARA;
Expand Down
7 changes: 6 additions & 1 deletion source/dnode/mnode/impl/inc/mndStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,13 @@ static const char* gMndStreamState[] = {"X", "W", "N"};
#define STREAM_ACT_RECALC (1 << 4)

#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_VER_NUMBER 8
#define MND_STREAM_VER_NUMBER 9
#define MND_STREAM_COMPATIBLE_VER_NUMBER 7
// sver value at which the JSON-encoded SCMCreateStreamReq still carries the
// legacy plan-execution form. SDB records persisted under this sver are
// loaded via the JSON path but flagged isOldPlan=true so reader picks the
// legacy execution path (dual-mode runtime). See PR#35196 / project B.
#define MND_STREAM_OLD_TRIGGER_COLS 8
#define MND_STREAM_TRIGGER_NAME_SIZE 20
#define MND_STREAM_DEFAULT_NUM 100
#define MND_STREAM_DEFAULT_TASK_NUM 200
Expand Down
5 changes: 4 additions & 1 deletion source/dnode/mnode/impl/src/mndDef.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
TAOS_CHECK_EXIT(terrno);
}

if (MND_STREAM_VER_NUMBER == sver) {
if (sver >= MND_STREAM_OLD_TRIGGER_COLS) {
TAOS_CHECK_RETURN(tDeserializeSCMCreateStreamReqImpl(pDecoder, pObj->pCreate));
// Legacy sver=8 records share the JSON wire format with sver=9 but their
// serialized plan bytes are the old form; mark so reader runs dual-mode.
pObj->pCreate->isOldPlan = (sver == MND_STREAM_OLD_TRIGGER_COLS);
} else {
TAOS_CHECK_RETURN(
tDeserializeSCMCreateStreamReqImplOld(pDecoder, pObj->pCreate, 21));
Expand Down
Loading
Loading