Implement pluggable Lineage in Java SDK#36781
Conversation
| ? (JmsTextMessage message) -> { | ||
| if (message == null) { | ||
| return null; | ||
| } |
There was a problem hiding this comment.
irrelevant change to fix flaky tests
| assertTrue( | ||
| String.format("Too many unacknowledged messages: %d", unackRecords), | ||
| unackRecords < OPTIONS.getNumberOfRecords() * 0.003); | ||
| unackRecords < OPTIONS.getNumberOfRecords() * 0.005); |
There was a problem hiding this comment.
irrelevant change to fix flaky tests
e8b6a7e to
661f5c7
Compare
23b45a2 to
4065277
Compare
|
Assigning reviewers: R: @kennknowles for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #36781 +/- ##
============================================
- Coverage 56.87% 56.86% -0.01%
+ Complexity 3417 3414 -3
============================================
Files 1178 1178
Lines 187492 187495 +3
Branches 3590 3590
============================================
- Hits 106628 106620 -8
- Misses 77472 77480 +8
- Partials 3392 3395 +3
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Reminder, please take a look at this pr: @kennknowles |
032292b to
6d7a434
Compare
|
|
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
|
|
@Abacn or @kennknowles would you mind taking a look? The change is low risk and introduces some important flexibility |
|
Reminder, please take a look at this pr: @Abacn |
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
|
|
Reminder, please take a look at this pr: @kennknowles |
6d7a434 to
f61e592
Compare
| // Reserved characters are backtick, colon, whitespace (space, \t, \n) and dot. | ||
| private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.`]"); | ||
|
|
||
| private final LineageBase delegate; |
There was a problem hiding this comment.
FYI
We are using Lineage as a facade around LineageBase, so we don't expose the latter and we avoid any (breaking) changes in other parts of Beam
e116899 to
7bcab65
Compare
|
@kennknowles Thanks for the feedback! I've updated the PR description with a detailed design section addressing your questions. Selection Model: Single active plugin (first match wins via ServiceLoader), fallback to I've added a "Why Pluggable Lineage?" section to the PR description with four key use cases:
let me know if you have any questions! |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
|
@rohitsinha54 could you take a look please? |
|
Sorry for the delay in response. I have not had a chance to look into this even though I want to due to other priorities and commitments. I do really appreciate the contribution and believe it is will be very helpful to many Beam users. To not have this blocked on me any further I requested @Abacn to help with review. Thank you @Abacn. |
|
@Abacn could you please take a look? |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a new plugin mechanism for lineage implementations in Apache Beam. It refactors the existing Lineage class to act as a facade, delegating to LineageBase implementations discovered via LineageRegistrar. A default MetricsLineage implementation is provided, and the FileSystems class is updated for proper initialization. Comprehensive tests for the new plugin system are also included. A review comment suggests refactoring the constructor logic in MetricsLineage.java for improved readability.
| public MetricsLineage(final Lineage.LineageDirection direction) { | ||
| // Derive Metrics-specific Type from LineageDirection | ||
| Lineage.Type type = | ||
| (direction == Lineage.LineageDirection.SOURCE) ? Lineage.Type.SOURCE : Lineage.Type.SINK; | ||
|
|
||
| if (MetricsFlag.lineageRollupEnabled()) { | ||
| this.metric = | ||
| Metrics.boundedTrie( | ||
| Lineage.LINEAGE_NAMESPACE, | ||
| direction == Lineage.LineageDirection.SOURCE | ||
| ? Lineage.Type.SOURCEV2.toString() | ||
| : Lineage.Type.SINKV2.toString()); | ||
| } else { | ||
| this.metric = Metrics.stringSet(Lineage.LINEAGE_NAMESPACE, type.toString()); | ||
| } | ||
| } |
There was a problem hiding this comment.
The constructor logic can be simplified for better readability. The type variable is only used in the else block but is calculated unconditionally, and the if block re-evaluates direction. A clearer approach is to determine the metric type within each branch of the if statement.
public MetricsLineage(Lineage.LineageDirection direction) {
if (MetricsFlag.lineageRollupEnabled()) {
Lineage.Type metricType =
(direction == Lineage.LineageDirection.SOURCE)
? Lineage.Type.SOURCEV2
: Lineage.Type.SINKV2;
this.metric = Metrics.boundedTrie(Lineage.LINEAGE_NAMESPACE, metricType.toString());
} else {
Lineage.Type metricType =
(direction == Lineage.LineageDirection.SOURCE) ? Lineage.Type.SOURCE : Lineage.Type.SINK;
this.metric = Metrics.stringSet(Lineage.LINEAGE_NAMESPACE, metricType.toString());
}
}| public void add(final Iterable<String> rollupSegments) { | ||
| ImmutableList<String> segments = ImmutableList.copyOf(rollupSegments); | ||
| if (MetricsFlag.lineageRollupEnabled()) { | ||
| ((BoundedTrie) this.metric).add(segments); |
There was a problem hiding this comment.
There was an assumption (and coupling) between lineageRollupEnabled<->BoundedTrie. With common interface we can now avoid unchecked cast by implementing CounterMetricsLineage and TrieMetricsLineage separately.
| PipelineOptions options, Lineage.LineageDirection direction) { | ||
| // Only activate if explicitly enabled via TestLineageOptions | ||
| TestLineageOptions testOptions = options.as(TestLineageOptions.class); | ||
| if (testOptions.getEnableTestLineage()) { |
There was a problem hiding this comment.
This work around is needed because AutoService gets loaded whenever it's in class path. I'm thinking about whether we should use approach similar to --runner selection instead of FileSystemRegistrar. Using a AutoService backed registrar to discover supported filesystems make sense as they won't conflict with each other. But pluggable lineage does---when there is registrar found, the default path (metrics based lineage) is disabled.
| LineageBase reporter = registrar.fromOptions(options, direction); | ||
| if (reporter != null) { | ||
| LOG.info("Using {} for lineage direction {}", reporter.getClass().getName(), direction); | ||
| return new Lineage(reporter); |
There was a problem hiding this comment.
If multiple registrars in classpath, the activated one is the alphabetically smallest one (for Tree Set). This could be confusing. Consider using a pipeline option to select a Lineage implementation.
Addresses #36790: "[Feature Request]: Make lineage tracking pluggable"
Changes
org.apache.beam.sdk.lineage.LineageBase- Plugin interface with singleadd()methodorg.apache.beam.sdk.metrics.Lineage- Hardcoded metrics → Delegation toLineageBasepluginsorg.apache.beam.sdk.lineage.LineageRegistrar- Plugin discovery interface (ServiceLoader)org.apache.beam.sdk.metrics.MetricsLineage- Default metrics-based implementation (implementsLineageBase)FileSystems.setDefaultPipelineOptions()Architecture
Before (master):
Lineagewas a concrete class hardcoded to use Beam metrics:After (this PR): Clean separation via composition pattern:
Plugin Selection: ServiceLoader discovery, first match wins, fallback to
MetricsLineage.Backward Compatibility: ✅ All existing code works unchanged (24+ call sites, static utilities, enums).
Why Pluggable Lineage?
1. Runner Fragmentation
Metrics-based lineage is scattered across runners with inconsistent support:
Impact: Multi-runner organizations must consolidate lineage from different metrics backends, each with different APIs and formats.
Plugin Solution: Single implementation works consistently across all runners.
2. Enterprise Integration
Organizations with existing lineage infrastructure need:
Example: Flyte workflow executing a Beam pipeline needs to tag lineage with Flyte execution ID and cost allocation. This context exists in the orchestrator, not in Beam workers' metrics.
3. Standard Formats
OpenLineage is the industry standard. Plugin enables direct emission vs. export metrics → parse → transform → send.
Initialization
Lineage.setDefaultPipelineOptions(options)is called fromFileSystems.setDefaultPipelineOptions()(same pattern asMetrics).Rationale:
FileSystems.setDefaultPipelineOptions()is called at 48+ locations covering all execution scenarios (pipeline construction, worker startup, deserialization).Known Limitation: Follows existing
FileSystemspattern despite known issues (#18430). Architectural improvements would address all subsystems together.Thread Safety
Uses
AtomicReferencewithcompareAndSetloop (same pattern asFileSystems/Metrics):AtomicReference<KV<Long, Integer>>tracks PipelineOptions identityAtomicReference<Lineage>for SOURCES/SINKS instancesExample: OpenLineage Plugin
For demonstration only (OpenLineage integration out of scope)