Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the analytical capabilities by introducing the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces the LAG and LEAD functions, a significant feature enhancement. The implementation is extensive, touching multiple components from parsing to execution, and is accompanied by a very thorough test suite. The overall quality is high. However, I have identified a potential memory safety issue in operator.c due to the reordering of cleanup function calls, which I've flagged as high severity. Additionally, I've suggested a performance improvement for the function grouping logic in projectoperator.c to handle queries with a large number of lag/lead calls more efficiently.
| cleanupExprSuppWithoutFilter(pExprSupp); | ||
| cleanupAggSup(pSup); |
There was a problem hiding this comment.
The order of cleanupExprSuppWithoutFilter and cleanupAggSup has been swapped. This looks suspicious. Typically, a 'supporter' object should be cleaned up before the object it depends on. SAggSupporter likely depends on SExprSupp. If so, cleanupAggSup should be called before cleanupExprSuppWithoutFilter to avoid potential use-after-free bugs. Please verify if this change is correct and intended. If the new order is correct, please add a comment explaining the dependency.
cleanupAggSup(pSup);
cleanupExprSuppWithoutFilter(pExprSupp);| if (processByRowFunctionCtx && taosArrayGetSize(processByRowFunctionCtx) > 0) { | ||
| SqlFunctionCtx** pfCtx = taosArrayGet(processByRowFunctionCtx, 0); | ||
| TSDB_CHECK_NULL(pfCtx, code, lino, _exit, terrno); | ||
| int32_t processByRowSize = taosArrayGetSize(processByRowFunctionCtx); | ||
| pProcessedFuncIds = taosArrayInit(4, sizeof(int32_t)); | ||
| TSDB_CHECK_NULL(pProcessedFuncIds, code, lino, _exit, terrno); | ||
|
|
||
| for (int32_t i = 0; i < processByRowSize; ++i) { | ||
| SqlFunctionCtx** ppCurrCtx = taosArrayGet(processByRowFunctionCtx, i); | ||
| TSDB_CHECK_NULL(ppCurrCtx, code, lino, _exit, terrno); | ||
| TSDB_CHECK_NULL(*ppCurrCtx, code, lino, _exit, terrno); | ||
|
|
||
| bool processed = false; | ||
| int32_t processedNum = taosArrayGetSize(pProcessedFuncIds); | ||
| for (int32_t j = 0; j < processedNum; ++j) { | ||
| int32_t* pFuncId = taosArrayGet(pProcessedFuncIds, j); | ||
| TSDB_CHECK_NULL(pFuncId, code, lino, _exit, terrno); | ||
| if (*pFuncId == (*ppCurrCtx)->functionId) { | ||
| processed = true; | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| if (processed) { | ||
| continue; | ||
| } | ||
|
|
||
| pGroupedCtxArray = taosArrayInit(2, sizeof(SqlFunctionCtx*)); | ||
| TSDB_CHECK_NULL(pGroupedCtxArray, code, lino, _exit, terrno); | ||
|
|
||
| for (int32_t j = i; j < processByRowSize; ++j) { | ||
| SqlFunctionCtx** ppTmpCtx = taosArrayGet(processByRowFunctionCtx, j); | ||
| TSDB_CHECK_NULL(ppTmpCtx, code, lino, _exit, terrno); | ||
| TSDB_CHECK_NULL(*ppTmpCtx, code, lino, _exit, terrno); | ||
|
|
||
| if ((*ppTmpCtx)->functionId == (*ppCurrCtx)->functionId) { | ||
| void* px = taosArrayPush(pGroupedCtxArray, ppTmpCtx); | ||
| TSDB_CHECK_NULL(px, code, lino, _exit, terrno); | ||
| } | ||
| } | ||
|
|
||
| TAOS_CHECK_EXIT((*ppCurrCtx)->fpSet.processFuncByRow(pGroupedCtxArray)); | ||
| taosArrayDestroy(pGroupedCtxArray); | ||
| pGroupedCtxArray = NULL; | ||
|
|
||
| TAOS_CHECK_EXIT((*pfCtx)->fpSet.processFuncByRow(processByRowFunctionCtx)); | ||
| numOfRows = (*pfCtx)->resultInfo->numOfRes; | ||
| void* px = taosArrayPush(pProcessedFuncIds, &(*ppCurrCtx)->functionId); | ||
| TSDB_CHECK_NULL(px, code, lino, _exit, terrno); | ||
|
|
||
| numOfRows = (*ppCurrCtx)->resultInfo->numOfRes; | ||
| } | ||
|
|
||
| taosArrayDestroy(pProcessedFuncIds); | ||
| pProcessedFuncIds = NULL; | ||
| } |
There was a problem hiding this comment.
The new logic to group process by row functions has a time complexity of O(N^2), where N is the number of such functions in the query. While N is typically small, this could become a performance bottleneck for queries with many lag/lead calls.
Consider refactoring this to use a hash map for grouping, which would reduce the complexity to O(N).
Example logic:
- Create a hash map where keys are
functionIdand values are arrays ofSqlFunctionCtx*. - Iterate through
processByRowFunctionCtxonce, populating the hash map. - Iterate through the hash map and call
processFuncByRowfor each group.
There was a problem hiding this comment.
Pull request overview
Adds built-in lag() / lead() timeline selection functions with process-by-row execution support, plus CI coverage to validate correctness across common query shapes (partitioning, subqueries, streams, large offsets, null handling).
Changes:
- Register
lag/leadas built-in indefinite-rows, process-by-row functions with type validation for optional defaults. - Implement row-by-row execution logic (including cross-block state) and adjust executor handling for process-by-row functions.
- Add a dedicated Python regression test and wire it into CI.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| test/ci/cases.task | Adds the new lag/lead test to the CI task list. |
| test/cases/11-Functions/03-Selection/test_fun_select_lag_lead.py | New end-to-end test coverage for lag/lead across many scenarios. |
| source/libs/parser/src/parTranslater.c | Allows coexistence of certain process-by-row indefinite functions (lag/lead). |
| source/libs/function/src/functionMgt.c | Adds coexistence check helper for indefinite-rows functions. |
| source/libs/function/src/builtinsimpl.c | Implements lag/lead execution state, by-row processing, and cleanup. |
| source/libs/function/src/builtins.c | Registers lag/lead built-ins and validates default parameter type compatibility. |
| source/libs/function/inc/builtinsimpl.h | Declares lag/lead builtin implementation hooks. |
| source/libs/executor/src/projectoperator.c | Adjusts indefinite function execution behavior and process-by-row dispatching. |
| source/libs/executor/src/operator.c | Minor cleanup ordering change. |
| source/libs/executor/src/executil.c | Ensures exec function pointers are loaded for process-by-row functions. |
| include/libs/function/functionMgt.h | Adds FUNCTION_TYPE_LEAD and exposes coexistence helper declaration. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| SArray* pRemain = taosArrayInit(TMAX(pendingSize, 1), sizeof(SLeadPendingItem)); | ||
| if (pRemain == NULL) { | ||
| return terrno; | ||
| } | ||
|
|
||
| for (int32_t i = 0; i < pendingSize; ++i) { | ||
| SLeadPendingItem* pItem = taosArrayGet(pState->pLeadPending, i); | ||
| if (pItem == NULL) { | ||
| return terrno; | ||
| } | ||
|
|
||
| if (pItem->needIdx < currSize) { | ||
| SLagLeadRowValue* pTarget = taosArrayGet(pCurrValues, pItem->needIdx); | ||
| if (pTarget == NULL) { | ||
| return terrno; | ||
| } | ||
|
|
||
| int32_t code = setLagLeadOutputFromValue(pCtx, pItem->outputPos, pTarget); | ||
| if (code != TSDB_CODE_SUCCESS) { | ||
| return code; | ||
| } | ||
| } else { | ||
| pItem->needIdx -= currSize; | ||
| if (NULL == taosArrayPush(pRemain, pItem)) { | ||
| taosArrayDestroy(pRemain); | ||
| return terrno; | ||
| } | ||
| } | ||
| } | ||
|
|
| for (int32_t i = 0; i < pSup->numOfExprs; ++i) { | ||
| EFunctionType type = fmGetFuncTypeFromId(pSup->pCtx[i].functionId); | ||
| if (type == FUNCTION_TYPE_LAG || type == FUNCTION_TYPE_LEAD) { | ||
| return true; | ||
| } |
|
|
||
| doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo); | ||
| if (pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) { | ||
| if (!noSplitOutput && pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) { |
Description
Issue(s)
Checklist
Please check the items in the checklist if applicable.