Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
dcfcc0c
up-0
0AyanamiRei Mar 14, 2026
9065803
Merge branch 'apache:master' into feature-routineload-AWS_Kinesis
0AyanamiRei Mar 14, 2026
dc34235
up-1
0AyanamiRei Mar 14, 2026
c1b0f0a
code format
0AyanamiRei Mar 14, 2026
6204af6
Merge branch 'master' into feature-routineload-AWS_Kinesis
0AyanamiRei Mar 14, 2026
1b55843
fe format
0AyanamiRei Mar 15, 2026
2523ae9
fix some bug and add fast end (EOF)
0AyanamiRei Mar 16, 2026
acedce7
be code format
0AyanamiRei Mar 16, 2026
0ca1e29
add KinesisConf
0AyanamiRei Mar 17, 2026
e8fe283
fix prop name and fix bug
0AyanamiRei Mar 17, 2026
6071570
fix shards merge/split problem
0AyanamiRei Mar 18, 2026
6d85dc7
add case temp
0AyanamiRei Mar 26, 2026
2c0b7dc
code format
0AyanamiRei Mar 31, 2026
1c37522
[fix](routine-load) address kinesis review feedback
0AyanamiRei Mar 31, 2026
1a760b3
[fix](kinesis) validate timeout properties
0AyanamiRei Mar 31, 2026
492582f
[fix](routine-load) remove unused workload_groups field
0AyanamiRei Mar 31, 2026
88acb02
Merge branch 'master' into feature-routineload-AWS_Kinesis
0AyanamiRei Mar 31, 2026
4ee23c0
add metrics and fix
0AyanamiRei Apr 1, 2026
86ea842
case passed fix
0AyanamiRei Apr 1, 2026
ad3b6c1
fixcase : test_kinesis_routine_load_pause_resume
0AyanamiRei Apr 1, 2026
064a576
[test](regression) Improve Kinesis routine load property case
0AyanamiRei Apr 1, 2026
0a813f1
[test](regression) Strengthen Kinesis routine load restart cases
0AyanamiRei Apr 1, 2026
4ff1263
rewrite some case and fix bug
0AyanamiRei Apr 2, 2026
7787172
rewrite case : test_kinesis_routine_load_pause_resume
0AyanamiRei Apr 2, 2026
9c1063a
(fixcase) test_kinesis_routine_load_pause_resume
0AyanamiRei Apr 2, 2026
15c1b0b
[fix](regression) Use history-aware state check after stopping kinesi…
0AyanamiRei Apr 2, 2026
4fe68c9
remove some case
0AyanamiRei Apr 6, 2026
9d8d2b2
[fix](regression) fe restart case fix
0AyanamiRei Apr 6, 2026
de7a8ee
[fix](regression) fix case test_kinesis_routine_load_shard_change
0AyanamiRei Apr 6, 2026
4cefe80
[fix](regression) fix test_kinesis_routine_load_property
0AyanamiRei Apr 6, 2026
a349716
[fix](regression) fix test_kinesis_routine_load_data_quality_error
0AyanamiRei Apr 6, 2026
b346996
[fix](regression) Rewrite case and fix
0AyanamiRei Apr 6, 2026
37d8322
[case](regression) remove useless case and add case for show load
0AyanamiRei Apr 6, 2026
3d8da70
add endpoint
0AyanamiRei Apr 7, 2026
90da0ba
fix p2 problem
0AyanamiRei Apr 7, 2026
36bcbd8
Merge branch 'master' into feature-routineload-AWS_Kinesis
0AyanamiRei Apr 7, 2026
e574203
fix p2 problem
0AyanamiRei Apr 7, 2026
8c0c83a
remove doc
0AyanamiRei Apr 7, 2026
0488c1d
[fix](be) Fix Kinesis progress reset and shard pagination
0AyanamiRei Apr 7, 2026
cbcd39f
[chore](be) Format Kinesis reset helper
0AyanamiRei Apr 7, 2026
b290ff1
[test](regression) Fix Kinesis show routine load assertions
0AyanamiRei Apr 7, 2026
3ff6ccd
[fix](regression) Fix the shards life style
0AyanamiRei Apr 7, 2026
449040e
fix name
0AyanamiRei Apr 8, 2026
3ec5dc1
rewrite
0AyanamiRei Apr 8, 2026
519e207
fix style
0AyanamiRei Apr 8, 2026
160094f
[refactor](code) DataConsumer refactor
0AyanamiRei Apr 8, 2026
2ad4a0f
Merge branch 'master' into feature-routineload-AWS_Kinesis
0AyanamiRei Apr 8, 2026
c49fa30
[refactor](code) DataConsumerGroup refactor
0AyanamiRei Apr 8, 2026
e4c3673
[refactor](routine-load) Reorganize FE routine load file structure
0AyanamiRei Apr 8, 2026
168548b
[format](code)
0AyanamiRei Apr 8, 2026
04e6538
[fix](test) Update test imports after routine load file structure reo…
0AyanamiRei Apr 8, 2026
4f744b6
[fix](fe) Fix routine load package migration compile failures
0AyanamiRei Apr 9, 2026
1f63e65
[fix](fe) Fix import order and redundant imports for checkstyle
0AyanamiRei Apr 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/cmake/thirdparty.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ add_thirdparty(aws-c-s3 LIB64)
add_thirdparty(aws-c-sdkutils LIB64)
add_thirdparty(aws-cpp-sdk-identity-management LIB64)
add_thirdparty(aws-cpp-sdk-sts LIB64)
add_thirdparty(aws-cpp-sdk-kinesis LIB64)
if (NOT OS_MACOSX)
add_thirdparty(aws-s2n LIBNAME "lib/libs2n.a")
endif()
Expand Down
12 changes: 12 additions & 0 deletions be/src/common/metrics/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_get_msg_latency, MetricUnit::M
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_get_msg_count, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_consume_rows, MetricUnit::ROWS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_consume_bytes, MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_kinesis_get_records_latency,
MetricUnit::MILLISECONDS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_kinesis_get_records_count, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_kinesis_throttle_count, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_kinesis_retriable_error_count,
MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_kinesis_closed_shard_count, MetricUnit::NOUNIT);

DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_total, MetricUnit::OPERATIONS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_duration_us, MetricUnit::MICROSECONDS);
Expand Down Expand Up @@ -353,6 +360,11 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_get_msg_count);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_consume_bytes);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_consume_rows);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_kinesis_get_records_latency);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_kinesis_get_records_count);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_kinesis_throttle_count);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_kinesis_retriable_error_count);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, routine_load_kinesis_closed_shard_count);

INT_COUNTER_METRIC_REGISTER(_server_metric_entity, memtable_flush_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, memtable_flush_duration_us);
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/metrics/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ class DorisMetrics {
IntCounter* routine_load_get_msg_count = nullptr;
IntCounter* routine_load_consume_bytes = nullptr;
IntCounter* routine_load_consume_rows = nullptr;
IntCounter* routine_load_kinesis_get_records_latency = nullptr;
IntCounter* routine_load_kinesis_get_records_count = nullptr;
IntCounter* routine_load_kinesis_throttle_count = nullptr;
IntCounter* routine_load_kinesis_retriable_error_count = nullptr;
IntCounter* routine_load_kinesis_closed_shard_count = nullptr;

IntCounter* memtable_flush_total = nullptr;
IntCounter* memtable_flush_duration_us = nullptr;
Expand Down
15 changes: 0 additions & 15 deletions be/src/io/fs/kafka_consumer_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,6 @@ class KafkaConsumerPipe : public StreamLoadPipe {
: StreamLoadPipe(max_buffered_bytes, min_chunk_size) {}

~KafkaConsumerPipe() override = default;

virtual Status append_with_line_delimiter(const char* data, size_t size) {
Status st = append(data, size);
if (!st.ok()) {
return st;
}

// append the line delimiter
st = append("\n", 1);
return st;
}

virtual Status append_json(const char* data, size_t size) {
return append_and_flush(data, size);
}
};
} // namespace io
} // end namespace doris
34 changes: 34 additions & 0 deletions be/src/io/fs/kinesis_consumer_pipe.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "io/fs/stream_load_pipe.h"

namespace doris {
namespace io {

class KinesisConsumerPipe : public StreamLoadPipe {
public:
KinesisConsumerPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024)
: StreamLoadPipe(max_buffered_bytes, min_chunk_size) {}

~KinesisConsumerPipe() override = default;
};

} // namespace io
} // namespace doris
9 changes: 9 additions & 0 deletions be/src/io/fs/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
Status append(const char* data, size_t size) override;
Status append(const ByteBufferPtr& buf) override;

virtual Status append_with_line_delimiter(const char* data, size_t size) {
RETURN_IF_ERROR(append(data, size));
return append("\n", 1);
}

virtual Status append_json(const char* data, size_t size) {
return append_and_flush(data, size);
}

const Path& path() const override { return _path; }

size_t size() const override { return 0; }
Expand Down
96 changes: 96 additions & 0 deletions be/src/load/routine_load/consumer_group_helpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <map>
#include <string>
#include <vector>

namespace doris {

// Helper class for partitioning work items (partitions/shards) across consumers
template <typename KeyType, typename ValueType>
class WorkPartitioner {
public:
// Divide work items equally across N consumers using round-robin
static std::vector<std::map<KeyType, ValueType>> partition_round_robin(
const std::map<KeyType, ValueType>& work_items, int consumer_count) {
std::vector<std::map<KeyType, ValueType>> result(consumer_count);
int i = 0;
for (const auto& [key, value] : work_items) {
int idx = i % consumer_count;
result[idx].emplace(key, value);
i++;
}
return result;
}
};

// Helper class for managing format-based append operations
class FormatAppender {
public:
// Get the appropriate append function pointer based on format type
template <typename PipeType>
static auto get_append_function(TFileFormatType::type format)
-> Status (PipeType::*)(const char*, size_t) {
return (format == TFileFormatType::FORMAT_JSON) ? &PipeType::append_json
: &PipeType::append_with_line_delimiter;
}
};

// Helper class for tracking consumption progress
class ConsumptionProgress {
public:
ConsumptionProgress(int64_t max_time_ms, int64_t max_rows, int64_t max_bytes)
: _initial_time(max_time_ms),
_initial_rows(max_rows),
_initial_bytes(max_bytes),
_left_time(max_time_ms),
_left_rows(max_rows),
_left_bytes(max_bytes) {}

// Check if any limit is reached
bool is_limit_reached() const { return _left_time <= 0 || _left_rows <= 0 || _left_bytes <= 0; }

// Update progress after consuming one item
void consume_item(int64_t bytes) {
_left_rows--;
_left_bytes -= bytes;
}

// Update time progress
void update_time(int64_t elapsed_us) { _left_time = _initial_time - elapsed_us / 1000; }

// Getters
int64_t left_time() const { return _left_time; }
int64_t left_rows() const { return _left_rows; }
int64_t left_bytes() const { return _left_bytes; }
int64_t consumed_rows() const { return _initial_rows - _left_rows; }
int64_t consumed_bytes() const { return _initial_bytes - _left_bytes; }
int64_t consumed_time() const { return _initial_time - _left_time; }

private:
int64_t _initial_time;
int64_t _initial_rows;
int64_t _initial_bytes;
int64_t _left_time;
int64_t _left_rows;
int64_t _left_bytes;
};

} // namespace doris
61 changes: 61 additions & 0 deletions be/src/load/routine_load/consumer_helpers.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "load/routine_load/consumer_helpers.h"

#include <algorithm>
#include <thread>

#include "common/metrics/doris_metrics.h"

namespace doris {

// ConsumerMetrics implementation
void ConsumerMetrics::track_get_msg(int64_t latency_us) {
_get_msg_count++;
DorisMetrics::instance()->routine_load_get_msg_count->increment(1);
DorisMetrics::instance()->routine_load_get_msg_latency->increment(latency_us / 1000);
}

void ConsumerMetrics::track_consume_bytes(int64_t bytes) {
_consume_bytes += bytes;
DorisMetrics::instance()->routine_load_consume_bytes->increment(bytes);
}

void ConsumerMetrics::track_consume_rows(int64_t rows) {
_consume_rows += rows;
DorisMetrics::instance()->routine_load_consume_rows->increment(rows);
}

// RetryPolicy implementation
void RetryPolicy::retry_with_backoff() {
_retry_count++;
if (_retry_count <= _max_retries) {
std::this_thread::sleep_for(std::chrono::milliseconds(_backoff_ms));
}
}

// ThrottleBackoff implementation
void ThrottleBackoff::backoff_and_sleep() {
_throttle_count++;
// Exponential backoff: initial_ms * 2^(count-1), capped at max_ms
int backoff_ms = std::min(_initial_backoff_ms * (1 << std::min(_throttle_count - 1, 3)),
_max_backoff_ms);
std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms));
}

} // namespace doris
119 changes: 119 additions & 0 deletions be/src/load/routine_load/consumer_helpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <chrono>
#include <string>
#include <unordered_map>

namespace doris {

// Helper class for tracking metrics in data consumers
class ConsumerMetrics {
public:
ConsumerMetrics() = default;

// Track a single message fetch operation
void track_get_msg(int64_t latency_us);

// Track consumed bytes
void track_consume_bytes(int64_t bytes);

// Track consumed rows
void track_consume_rows(int64_t rows);

// Get accumulated metrics
int64_t get_msg_count() const { return _get_msg_count; }
int64_t consume_bytes() const { return _consume_bytes; }
int64_t consume_rows() const { return _consume_rows; }

private:
int64_t _get_msg_count = 0;
int64_t _consume_bytes = 0;
int64_t _consume_rows = 0;
};

// Helper class for retry logic with configurable backoff
class RetryPolicy {
public:
explicit RetryPolicy(int max_retries = 3, int backoff_ms = 200)
: _max_retries(max_retries), _backoff_ms(backoff_ms), _retry_count(0) {}

// Check if should retry
bool should_retry() const { return _retry_count < _max_retries; }

// Increment retry count and sleep with backoff
void retry_with_backoff();

// Reset retry counter (call on success)
void reset() { _retry_count = 0; }

// Get current retry count
int retry_count() const { return _retry_count; }

private:
int _max_retries;
int _backoff_ms;
int _retry_count;
};

// Helper class for exponential backoff (used for throttling)
class ThrottleBackoff {
public:
explicit ThrottleBackoff(int initial_backoff_ms = 1000, int max_backoff_ms = 10000)
: _initial_backoff_ms(initial_backoff_ms),
_max_backoff_ms(max_backoff_ms),
_throttle_count(0) {}

// Increment throttle count and sleep with exponential backoff
void backoff_and_sleep();

// Reset throttle counter (call on success)
void reset() { _throttle_count = 0; }

// Get current throttle count
int throttle_count() const { return _throttle_count; }

private:
int _initial_backoff_ms;
int _max_backoff_ms;
int _throttle_count;
};

// Helper class for comparing custom properties between consumers
class PropertyMatcher {
public:
// Check if two property maps match exactly
template <typename MapType1, typename MapType2>
static bool properties_match(const MapType1& props1, const MapType2& props2) {
if (props1.size() != props2.size()) {
return false;
}

for (const auto& [key, value] : props1) {
auto it = props2.find(key);
if (it == props2.end() || it->second != value) {
return false;
}
}

return true;
}
};

} // namespace doris
Loading
Loading