Subscription: support table-model column-filter for topics#17936
Subscription: support table-model column-filter for topics#17936VGalaxies wants to merge 5 commits into
Conversation
Independent ColumnFilter.g4 grammar; parser/validator/evaluator/binder/matcher/TabletColumnPruner; node-commons ColumnMetadata; column pruning in consensus WAL + pipe tablet + TsFile reseal paths; legacy 'column' regex removed; timeSelected flag wired end-to-end (per-table); CREATE/ALTER TOPIC validation + DataNode refreshColumnFilter cache hot-refresh; UT + IT coverage. Implements V2.0 subscription column-filter design. NOT YET VERIFIED: end-to-end datanode/confignode compile and full IT run.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17936 +/- ##
============================================
+ Coverage 41.07% 41.18% +0.11%
Complexity 318 318
============================================
Files 5257 5265 +8
Lines 365010 365873 +863
Branches 47180 47337 +157
============================================
+ Hits 149918 150697 +779
- Misses 215092 215176 +84 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR adds a table-model column-filter topic attribute for subscriptions and wires it through DataNode runtime so emitted tablets/TsFiles are pruned to only the allowed columns (with TAG retention and ATTRIBUTE exclusion). It also propagates timeSelected (global + per-table) through poll responses to let consumers optionally hide the time column while remaining backward-compatible with older wire formats.
Changes:
- Introduces a dedicated
ColumnFilter.g4grammar plus parser/validator/evaluator, and a binder that snapshots allowed columns per (database, table) from the current schema. - Applies column pruning across IoTConsensus WAL conversion, pipe tablet batching/TsFile reseal, and TsFile pass-through decisions.
- Extends poll responses and client-side record handling to carry
timeSelected(and per-table overrides) and conditionally expose/hide the time column.
Reviewed changes
Copilot reviewed 51 out of 51 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/ColumnFilter.g4 | Adds standalone column-filter grammar. |
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java | Includes column-filter in extractor attributes for table topics. |
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/columnfilter/ColumnMetadata.java | Introduces metadata model used for filter evaluation. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatchTest.java | Tests empty-inner-batch behavior for tsfile batching. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/columnfilter/TabletColumnPrunerTest.java | Adds unit tests for tablet pruning behavior. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/columnfilter/ColumnFilterParserTest.java | Adds parser/validator/evaluator tests and rejection cases. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/columnfilter/ColumnFilterBinderTest.java | Tests binding snapshot logic and time selection behavior. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusLogToTabletConverterTest.java | Updates consensus WAL converter tests to new matcher + null/bitmap semantics. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgentTest.java | Tests when column-filter hot-refresh should trigger. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatchTest.java | Tests that fully-pruned tablets are released (not retained). |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java | Threads timeSelected + per-table map through event construction. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java | Adds timeSelected/timeSelectedByTable to FILE_* poll responses. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java | Adds timeSelected/timeSelectedByTable to TABLETS poll responses. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java | Adds constructors carrying timeSelected/timeSelectedByTable. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java | Adds caching constructors for new poll response fields. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java | Adds table-model tablet pruning while building TsFiles. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java | Adds pruning for table-model tablets in tablet-batch path. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/columnfilter/TabletColumnPruner.java | Implements column-level pruning for table-model tablets. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/columnfilter/ColumnFilterValidator.java | Restricts supported AST constructs/fields and validates patterns. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/columnfilter/ColumnFilterParser.java | Implements parsing + syntax prechecks and AST building. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/columnfilter/ColumnFilterMatcher.java | Adds matchers for bound snapshots / selected names / expressions + timeSelected APIs. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/columnfilter/ColumnFilterEvaluator.java | Evaluates validated AST against ColumnMetadata. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/columnfilter/ColumnFilterBinder.java | Binds expression to per-table selected-column snapshots from schema. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/columnfilter/BoundColumnFilter.java | Holds bound per-table selected columns + timeSelected flags. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java | Adds topicName getter and blocks tsfile pass-through when filter is non-trivial. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionSetupHandler.java | Refreshes column filter during consensus queue/converter setup. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusPrefetchingQueue.java | Propagates timeSelected + per-table map on TABLETS responses. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusLogToTabletConverter.java | Switches table column selection to ColumnFilterMatcher + TabletColumnPruner. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java | Hot-refreshes/drops cached matchers on topic meta changes. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java | Caches and refreshes per-topic ColumnFilterMatcher via schema snapshots. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java | Exposes a safe table snapshot for binding. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java | Rejects column-filter attribute on tree topics. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java | Validates/normalizes column-filter for table topics (SQL path). |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java | Adds optional pruner hook and avoids retaining fully-pruned tablets. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java | Treats “consumed but no payload” events by immediately releasing references. |
| iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTopicValidationTest.java | Updates validation tests from legacy column to column-filter. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java | Validates column-filter presence/uniqueness/emptiness and table-only constraint. |
| iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/payload/SubscriptionRecordHandlerTest.java | Tests time hiding and per-table timeSelected behavior. |
| iotdb-client/subscription/src/test/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponseTest.java | Tests wire-format round-trip + old-format defaulting. |
| iotdb-client/subscription/src/test/java/org/apache/iotdb/rpc/subscription/config/TopicConfigTest.java | Tests case-insensitive column-filter key and trivial defaults. |
| iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionRecordHandler.java | Adds timeSelected/per-table handling and hides time column when needed. |
| iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessage.java | Carries timeSelected through message construction and equality/hash. |
| iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java | Aggregates timeSelected + per-table maps across multi-part tablet responses. |
| iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java | Extends poll response serialization with timeSelected + per-table map. |
| iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java | Replaces legacy column with column-filter constants/default. |
| iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java | Adds column-filter accessors/trivial checks and source-attr export. |
| integration-test/src/test/java/org/apache/iotdb/subscription/it/local/tablemodel/IoTDBSubscriptionPermissionIT.java | Adds permission coverage for creating/alting topics with column-filter. |
| integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionFilterTableIT.java | Updates/extends consensus ITs for column-filter semantics and alter/rebind. |
| integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionColumnFilterClusterIT.java | Adds cluster IT for owner transfer + alter + rebind behavior. |
| integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/ConsensusSubscriptionTableITSupport.java | Updates helper APIs and polling/drain logic for new column-filter behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| final String[] measurements = node.getMeasurements(); | ||
| final TSDataType[] dataTypes = node.getDataTypes(); | ||
| final long[] times = node.getTimes(); | ||
| final Object[] columns = node.getColumns(); | ||
| final BitMap[] bitMaps = node.getBitMaps(); | ||
| final int rowCount = node.getRowCount(); | ||
| final List<Integer> matchedColumnIndices = | ||
| getMatchedTableColumnIndices( | ||
| measurements, dataTypes, columns, node.getColumnCategories(), true); | ||
| if (matchedColumnIndices.isEmpty()) { | ||
| if (Objects.isNull(measurements) | ||
| || Objects.isNull(dataTypes) | ||
| || Objects.isNull(node.getColumns())) { |
|



Purpose
Add a
column-filtertopic attribute for table-model subscriptions so a topic canrestrict which columns consumers receive, based on column metadata, applied uniformly
across historical, real-time pipe, and IoTConsensus WAL paths.
What
column-filter(table-model only); unset ==true.Restricted boolean predicate over column-metadata fields aligned with
information_schema.columns:database,table_name,column_name,datatype,category. Supports=/!=,IN,LIKE,REGEXP,IS [NOT] NULL,AND/OR/NOT.ColumnFilter.g4grammar (decoupled from the query grammar) producing thereused relational AST nodes; a whitelist validator rejects unsupported syntax/fields.
current schema, cached per topic on the DataNode and used for O(1) runtime pruning.
TAG columns are retained; ATTRIBUTE columns are not transmittable.
(prune at tablet granularity; raw TsFile passthrough kept only when the filter is trivial).
timeSelectedpropagated as an optional, backward-compatible per-table flag on the pollresponse (defaults to exposing time for old consumers).
columnregex attribute.Tests
realtime consistency, ALTER rebind, empty-result progress, snapshot semantics, custom time
column, altered data type, tree-view source-column binding, attribute drop, 1C3D cluster
ALTER-after-owner-transfer, DataNode restart re-snapshot, invalid-expression rejection, permission.
Notes
from the current schema on CREATE/ALTER/restart/owner-transfer; not frozen for the topic lifetime.