Skip to content

[feature](RoutineLoad) Support the Amazon Kinesis#61325

Open
0AyanamiRei wants to merge 53 commits intoapache:masterfrom
0AyanamiRei:feature-routineload-AWS_Kinesis
Open

[feature](RoutineLoad) Support the Amazon Kinesis#61325
0AyanamiRei wants to merge 53 commits intoapache:masterfrom
0AyanamiRei:feature-routineload-AWS_Kinesis

Conversation

@0AyanamiRei
Copy link
Copy Markdown
Contributor

@0AyanamiRei 0AyanamiRei commented Mar 14, 2026

What problem does this PR solve?

Issue Number: close #xxx

Related PR: this pr should merge after #62184

Problem Summary:

support the Amazon Kinesis for routine load.

CREATE ROUTINE LOAD [db_name.]job_name ON table_name
[load_properties]
[job_properties]
FROM KINESIS
(
    "aws.region" = "your_region",
    "kinesis_stream" = "your_stream_name",
    "aws.access_key" = "your_access_key",
    "aws.secret_key" = "your_secret_key"
);

compare AWS Kinesis with Kafka:

Kinesis Stream <=> Kafka Topic
shards <=> partition

doc pr:apache/doris-website#3521

Release note

None

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

Cloud UT Coverage Report

Increment line coverage 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 79.24% (1798/2269)
Line Coverage 64.56% (32298/50026)
Region Coverage 65.44% (16166/24702)
Branch Coverage 55.88% (8615/15416)

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 1.15% (10/866) 🎉
Increment coverage report
Complete coverage report

Comment thread gensrc/thrift/BackendService.thrift Outdated
@liaoxin01
Copy link
Copy Markdown
Contributor

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found 3 correctness issues in this PR.

  1. be/src/load/stream_load/stream_load_executor.cpp: the Kinesis error path does not reset ctx->kinesis_info->cmt_sequence_number when plan execution fails. KinesisDataConsumerGroup::start_all() has already copied the last consumed sequence numbers into the context before the fragment/txn result is known, so a failed attempt can leave advanced progress in memory for the retried task. Kafka explicitly rewinds here; Kinesis needs the same protection to avoid skipping records after an aborted batch.

  2. fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisRoutineLoadJob.java: cloud mode is not blocked, but the cloud routine-load transaction path is still Kafka-only. RoutineLoadManager can create Kinesis jobs in cloud mode, KinesisTaskInfo sets cloudCluster, and then TxnUtil.rlTaskTxnCommitAttachmentToPb() still casts attachment.getProgress() to KafkaProgress. The first cloud Kinesis commit will therefore fail with a ClassCastException instead of persisting progress.

  3. regression-test/conf/regression-conf.groovy: the new load_p0/kinesis_routine_load directory is added to excludeDirectories, so the entire new regression suite is skipped by default. That leaves the feature effectively untested in CI even though the PR adds many Kinesis cases.

Critical checkpoint conclusions:

  • Goal of the task / correctness / proof: The PR clearly aims to add end-to-end Kinesis routine-load support, but the current code does not fully achieve that because retry safety and cloud-mode behavior are still broken. Tests were added, but the regression config currently excludes them.
  • Small / clear / focused: Not fully. This is a broad cross-layer feature (FE, BE, thrift/proto, persistence, tests), so the risk is naturally high and missing parity with existing Kafka paths matters.
  • Concurrency: The feature introduces new consumer-group and concurrent progress-tracking paths. I did not find a primary lock-order bug in the reviewed hunks, but the retry/reset issue shows the lifecycle between consumer progress and transaction failure is not yet safe.
  • Lifecycle / initialization: No static initialization issue identified in the reviewed code.
  • Configuration items: New Kinesis properties are added. I did not validate dynamic-config behavior because these are routine-load job properties rather than mutable process configs.
  • Compatibility changes: Yes. New thrift/proto enums and structs are introduced. The most concrete compatibility/runtime gap I found is the cloud transaction attachment path remaining Kafka-specific.
  • Parallel code paths: Yes. Kafka already resets progress on failure and has a complete cloud progress path; Kinesis needs the same treatment.
  • Special conditional checks: The explicit Config.isCloudMode() branches in Kinesis code currently lead to incomplete behavior and should either be implemented fully or rejected early.
  • Test coverage: Functional tests were added, including restart/error scenarios, but they are excluded in regression-conf.groovy, so effective coverage is currently missing.
  • Observability: Basic logs are present; no major observability blocker found in the reviewed paths.
  • Transaction / persistence: Yes, this feature touches transaction commit attachments and persisted progress. The cloud attachment serialization path is currently incorrect for Kinesis.
  • Data write / modification: Yes. Retry safety is not guaranteed because failed attempts can retain advanced Kinesis progress.
  • FE/BE variable passing: The new FE/BE Kinesis progress structures are mostly wired, but cloud-specific passing is incomplete.
  • Performance: I did not identify a primary performance regression worth blocking on relative to the correctness issues above.
  • Other issues: No additional blocker beyond the three findings above.

Because of the issues above, I cannot consider the current implementation correct yet.

Comment thread be/src/load/stream_load/stream_load_executor.cpp Outdated
Comment thread regression-test/conf/regression-conf.groovy Outdated
@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found 2 correctness issues.

  1. be/src/load/stream_load/stream_load_context.h: Kinesis progress is initialized/reset to the requested begin sequence number, but BE later resumes numeric positions with AFTER_SEQUENCE_NUMBER. If a task commits without consuming any records from a shard, the next task will skip the first requested record instead of retrying from it.
  2. be/src/load/routine_load/data_consumer.cpp: shard discovery uses a single ListShards call and ignores pagination. Streams whose shard list spans multiple pages will silently lose later shards from FE scheduling.

Critical checkpoint conclusions:

  • Goal of current task: Adds Kinesis routine load support end to end. The implementation is close, but these two bugs mean it does not yet reliably preserve correct consumption progress and full shard discovery.
  • Modification size/focus: Broad but still centered on routine-load Kinesis support.
  • Concurrency: Reviewed FE job locking and BE consumer-group threading; I did not find a lock-order or thread-safety bug in the touched paths.
  • Lifecycle/static initialization: No static initialization issue found. Consumer/task lifecycle is understandable, but the Kinesis progress lifecycle has the bug above.
  • Configuration items: No new Doris config item requiring dynamic-config review.
  • Incompatible changes: FE/BE thrift/proto additions are additive; no storage-format incompatibility found in the supported shared-nothing path.
  • Functionally parallel paths: Kafka remains the parallel path. Kinesis should mirror Kafka's progress semantics more closely; currently the numeric-sequence resume path diverges incorrectly.
  • Special conditional checks: No extra issue beyond the two reported logic branches.
  • Test coverage: FE unit tests and regression cases were added, but coverage is still missing for explicit numeric start positions with an empty first batch and for paginated shard discovery.
  • Observability: Added metrics/logging are adequate for the new BE path.
  • Transaction/persistence: Shared-nothing routine-load attachment wiring is present for the supported mode; no additional supported-mode edit-log gap was confirmed in this review.
  • Data writes/modifications: Not safe yet because the explicit-sequence resume bug can skip data.
  • FE/BE variable passing: The new variables are wired through the supported FE/BE path.
  • Performance: No major hot-path performance regression identified beyond the correctness issue in shard discovery.
  • Other issues: None beyond the 2 findings above.

Overall opinion: not ready to merge until these correctness issues are fixed.

Comment thread be/src/load/stream_load/stream_load_context.h Outdated
Comment thread be/src/load/routine_load/data_consumer.cpp Outdated
Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 5 issues that should be addressed before this feature is considered correct end-to-end.

  1. SHOW CREATE ROUTINE LOAD prints Kinesis property keys that the analyzer does not accept, so the generated SQL cannot recreate the job.
  2. BE advances committed Kinesis sequence numbers before records are durably handed off, which can skip unread rows at batch boundaries.
  3. BE shard discovery ignores ListShards pagination and can miss shards on larger streams.
  4. Explicit shard start sequence numbers are treated as exclusive (AFTER_SEQUENCE_NUMBER), so the user-specified first record is skipped.
  5. The cloud routine-load txn attachment protobuf path is still Kafka-only, so Kinesis progress is not serialized/replayed correctly there.

Critical checkpoint conclusions:

  • Goal of current task: Partially achieved. The PR adds FE/BE Kinesis routine-load plumbing and tests, but the issues above mean core correctness and operability are still incomplete.
  • Small, clear, focused change: No. The change spans FE, BE, thrift/proto, persistence, metrics, thirdparty, and regression, which increases the need for exhaustive path parity checks.
  • Concurrency: Not safe yet. The BE consumer/queue/progress interaction can commit progress ahead of actual ingestion at task stop boundaries.
  • Lifecycle / static initialization: No special lifecycle or static-init issue stood out in the touched code.
  • Configuration items: New Kinesis/AWS properties were added, but property naming is inconsistent between emitted SQL and accepted SQL.
  • Incompatible / protocol changes: Not fully handled. New routine-load protocol/persistence paths were added, but the cloud protobuf attachment path remains Kafka-only.
  • Parallel code paths: Not fully updated. Shared-nothing thrift handling supports Kinesis, while the cloud transaction serialization path does not.
  • Special conditional checks: The new shard/open-shard assumptions rely on BE behavior that still has edge-case gaps such as paginated discovery.
  • Test coverage: Incomplete. The added tests do not cover SHOW CREATE round-trip, explicit sequence-number semantics, batch-boundary progress safety, or cloud attachment serialization.
  • Observability: Acceptable but incomplete for debugging skipped-record scenarios; the new metrics help, but they do not prevent the correctness bug above.
  • Transaction / persistence: Not fully correct because Kinesis routine-load progress is not wired through the cloud protobuf attachment path.
  • Data writes / modifications: Not fully correct because committed progress can move past data that was not actually loaded.
  • New FE-BE variables/path propagation: Not fully covered across all paths; shared-nothing is wired, cloud protobuf is not.
  • Performance review: I did not find a blocking hot-path regression, but missing shard pagination will create functional starvation on larger streams.
  • Other issues: None beyond the findings above.

Residual risk if those are fixed: closed-shard filtering after split/merge still deserves another pass because FE now assumes shard discovery returns only OPEN shards.

Comment thread be/src/load/routine_load/data_consumer.cpp
Comment thread be/src/load/routine_load/data_consumer.cpp Outdated
Comment thread be/src/load/routine_load/kinesis_conf.cpp
### What problem does this PR solve?

Issue Number: None

Related PR: apache#61325

Problem Summary: Fix Kinesis routine load so empty batches do not advance numeric shard sequence progress, and shard discovery walks every ListShards page instead of only the first response.

### Release note

None

### Check List (For Author)

- Test: No need to test (with reason)
    - Per local agent.md constraints for this turn, no tests or UTs were run.
- Behavior changed: Yes (with explanation)
    - Kinesis routine load now keeps numeric begin positions stable across empty batches and discovers shards across paginated ListShards responses.
- Does this need documentation: No
### What problem does this PR solve?

Issue Number: None

Related PR: apache#61325

Problem Summary: Align the Kinesis reset_sequence_numbers helper with the formatter output to avoid style-only CI diffs.

### Release note

None

### Check List (For Author)

- Test: No need to test (with reason)
    - Formatting-only change; per local agent.md constraints, no tests were run.
- Behavior changed: No
- Does this need documentation: No
@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 2 issues.

  1. be/src/load/routine_load/data_consumer.cpp: ListShards still appends every shard returned by AWS, but the comment and FE logic assume this list contains only OPEN shards. Kinesis ListShards also returns non-expired CLOSED parent shards after split/merge, so the next reschedule will re-add those retired parents to openKinesisShards, initialize missing progress from TRIM_HORIZON, and re-consume old data.
  2. fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisRoutineLoadJob.java: lag cache updates use Math::max, which keeps the historical worst MillisBehindLatest forever. Once a shard has ever lagged, SHOW ROUTINE LOAD / getStatistic() can keep reporting inflated totalMillisBehindLatest and maxMillisBehindLatest even after the consumer has caught up, which breaks observability and any operator decisions based on these values.

Critical checkpoints:

  • Goal / correctness: The PR adds Kinesis routine load end-to-end, but the current code does not fully meet the goal because resharding can trigger duplicate consumption and lag reporting becomes stale. Existing tests do not cover either case.
  • Change size / focus: The feature is large and touches FE, BE, thrift/proto, metrics, and regression tests; the risky parts are concentrated in shard lifecycle and lag bookkeeping.
  • Concurrency: I did not find a primary locking/deadlock bug in the reviewed paths, but the shared state assumptions between FE shard tracking and BE ListShards output are incorrect.
  • Lifecycle / persistence: FE gson registration is present. I did notice cloud PB conversion is still Kafka-only, but Kinesis is explicitly blocked in cloud mode here, so that is not a blocking issue for this PR.
  • Config / compatibility: New FE-BE protocol fields are added on both sides; no immediate mixed-path omission found in the shared-nothing flow reviewed.
  • Parallel code paths: Kafka has analogous logic, but Kinesis-specific shard lifecycle differs enough that it needs its own closed-shard filtering and lag cache semantics.
  • Special checks: The OPEN-shard assumption needs to be enforced in code, not only in comments.
  • Test coverage: FE unit tests exist, but I did not see coverage for split/merge parent-shard filtering or for lag cache freshness after catching up.
  • Observability: Metrics were added, but the lag values exposed to users are currently wrong after any transient backlog.
  • Transaction / persistence / writes: No additional blocking issue found in commit attachment handling for the non-cloud path.
  • FE/BE variable passing: The new Kinesis thrift/proto fields used in the shared-nothing path appear wired through.
  • Performance: No primary performance blocker found beyond the correctness issues above.
  • Other issues: None beyond the two findings below.

Comment thread be/src/load/routine_load/data_consumer.cpp
### What problem does this PR solve?

Issue Number: None

Related PR: apache#61325

Problem Summary: Make the Kinesis SHOW ROUTINE LOAD regression case align with actual Doris behavior by avoiding Groovy GString/String assertion mismatches, waiting for open shard display fields to be populated, and using supported SHOW ROUTINE LOAD syntax.

### Release note

None

### Check List (For Author)

- Test: Regression test
    - ./run-regression-test.sh --run -d load_p0/kinesis_routine_load -s test_kinesis_show_routine_load
- Behavior changed: No
- Does this need documentation: No
@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 2 issues.

  1. High: fe/fe-core/src/main/java/org/apache/doris/datasource/kinesis/KinesisUtil.java sends Kinesis metadata requests to any load-available BE, but this PR adds a brand-new kinesis_meta_request protobuf field and BE handler. During a rolling upgrade, an old BE will ignore the unknown field and still return OK, so FE gets an empty shard list and the job can stay unscheduled or pause even though the stream is healthy. This path needs a capability/version gate or a fail-fast check instead of silently accepting pre-Kinesis BEs.
  2. Medium: fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisRoutineLoadJob.java emits "kinesis_region" from getDataSourceProperties(), but the analyzer only accepts aws.region. SHOW CREATE ROUTINE LOAD therefore produces SQL that cannot recreate the same Kinesis job.

Critical checkpoint conclusions:

  • Goal of current task: Partially met. The PR wires Kinesis routine load through FE/BE and adds tests, but the mixed-version FE/BE path and SHOW CREATE round-trip are still broken.
  • Small, clear, focused change: Reasonably focused for a new data source, but protocol compatibility handling is missing for the new FE/BE surface.
  • Concurrency review: No blocking-under-lock or obvious lock-order issue stood out in the reviewed FE/BE Kinesis paths.
  • Lifecycle/static initialization: No special lifecycle or static-init-order issue found in reviewed code.
  • Configuration items: No critical dynamic-config issue found in the reviewed Kinesis path.
  • Incompatible changes / compatibility: Not satisfied. New thrift/proto/enum protocol additions are not guarded for mixed-version clusters.
  • Parallel code paths: Kafka and Kinesis separation is fine, but the Kinesis metadata path should preserve the same operational safety expectations as existing routine-load metadata fetches.
  • Special conditional checks: The shard open/closed handling is documented well enough in the reviewed code.
  • Test coverage: There is useful FE UT and regression coverage for shard lifecycle, but no coverage for mixed-version routing or SHOW CREATE round-trip.
  • Observability: Added logging/metrics are adequate for the reviewed path.
  • Transaction / persistence: No additional edit-log or replay correctness issue found beyond the points above.
  • Data writes / modifications: No additional data-write atomicity issue found in the reviewed path beyond the points above.
  • FE/BE variable passing: The new Kinesis fields are wired through the reviewed thrift/proto paths.
  • Performance: No critical hot-path regression identified from the reviewed snippets.
  • Other issues: The two findings above are the main blocking risks I found in this review.

DCHECK(_queue.get_size() == 0);
}

Status KinesisDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reused kafkaDataConsumerGroup logic

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* - LATEST: Start from the newest record (records arriving after the iterator is created)
* - Specific sequence number: Start from or after a specific sequence number
*/
public class KinesisProgress extends RoutineLoadProgress {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove to org.apache.doris.load.routineload.kinesis package.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

0AyanamiRei and others added 6 commits April 8, 2026 18:00
Move Kafka and Kinesis specific files into their respective subdirectories:
- Move KafkaProgress, KafkaRoutineLoadJob, KafkaTaskInfo to routineload/kafka/
- Move KinesisProgress, KinesisRoutineLoadJob, KinesisTaskInfo to routineload/kinesis/
- Update package declarations and imports across all affected files

This improves code organization by grouping data source specific implementations together.
@0AyanamiRei 0AyanamiRei requested a review from w41ter as a code owner April 8, 2026 21:47
…rganization

Fix test file imports to use new package paths:
- kafka.KafkaProgress, kafka.KafkaRoutineLoadJob, kafka.KafkaTaskInfo
- kinesis.KinesisProgress, kinesis.KinesisRoutineLoadJob

This fixes compilation errors in test files after moving Kafka and Kinesis
classes to their respective subdirectories.
### What problem does this PR solve?

Issue Number: None

Related PR: None

Problem Summary: Fix FE compilation failures introduced by routine load package migration (kafka/kinesis subpackages) by aligning method visibility for cross-package overrides and补齐 missing imports in routine load source and related FE tests.

### Release note

None

### Check List (For Author)

- Test: Manual test
    - Manual test: FE build `FE_MAVEN_THREADS=16 FE_MAVEN_RETRY_THREADS=4 DISABLE_BUILD_UI=ON DISABLE_BUILD_HIVE_UDF=ON DISABLE_BE_JAVA_EXTENSIONS=ON DISABLE_JAVA_CHECK_STYLE=ON MAVEN_OPTS='-Xms1g -Xmx4g -XX:+UseG1GC -XX:ActiveProcessorCount=16' MVN_OPT='-U -s /tmp/doris-maven-settings-doris.xml' bash build.sh --fe -j 16` (BUILD SUCCESS)
- Behavior changed: No (compile and test-compile compatibility fix)
- Does this need documentation: No
### What problem does this PR solve?

Issue Number: None

Related PR: None

Problem Summary: FE checkstyle failed with import ordering, unused import, and redundant same-package import errors in routine load and related tests.

### Release note

None

### Check List (For Author)

- Test: Manual test

    - Manual test (user run): ./build.sh --fe -j16

- Behavior changed: No

- Does this need documentation: No
@sollhui
Copy link
Copy Markdown
Contributor

sollhui commented Apr 10, 2026

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 33.59% (303/902) 🎉
Increment coverage report
Complete coverage report

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants