diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json index a89f7adb4ce8..e9d869cc508c 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 3, + "modification": 1, } - + diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index e623d3373a93..0c41d2bcf2fe 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 1, + "modification": 6, } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 74f5a4d09001..db24215e32c7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -311,9 +311,14 @@ public DoFn.StartBundleContext startBundleContext(DoFn { /** A concrete implementation of {@link DoFn.FinishBundleContext}. */ @@ -356,6 +361,11 @@ public DoFn.FinishBundleContext finishBundleContext( public String getErrorContext() { return "SimpleDoFnRunner/FinishBundle"; } + + @Override + public BundleFinalizer bundleFinalizer() { + return stepContext.bundleFinalizer(); + } } /** @@ -1030,7 +1040,7 @@ public void outputWindowedValue( @Override public BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException( - "Bundle finalization is not supported in non-portable pipelines."); + "Bundle finalization is not supported in OnTimer calls."); } } @@ -1289,7 +1299,7 @@ public void outputWindowedValue( @Override public BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException( - "Bundle finalization is not supported in non-portable pipelines."); + "Bundle finalization is not supported in OnWindowExpiration calls."); } } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 49de59fac32c..652c72c323ef 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -205,7 +205,6 @@ def commonLegacyExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesGaugeMetrics', 'org.apache.beam.sdk.testing.UsesTestStream', 'org.apache.beam.sdk.testing.UsesMetricsPusher', - 'org.apache.beam.sdk.testing.UsesBundleFinalizer', 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', // Dataflow QM as of now does not support returning back BoundedTrie in metric result. ] @@ -456,7 +455,9 @@ task validatesRunner { 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful', 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle', 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful', - ] + ], + // Batch legacy worker does not support bundle finalization. + excludedCategories: [ 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ], )) } @@ -490,6 +491,8 @@ task validatesRunnerStreaming { description "Validates Dataflow runner forcing streaming mode" dependsOn(createLegacyWorkerValidatesRunnerTest(validatesRunnerStreamingConfig + [ name: 'validatesRunnerLegacyWorkerTestStreaming', + // Streaming appliance currently fails bundle finalizer tests. + excludedCategories: validatesRunnerStreamingConfig.excludedCategories + [ 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ], ])) } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 9d963af0ecb5..8bda9c20763a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -2737,11 +2737,11 @@ static void verifyDoFnSupported( DataflowRunner.class.getSimpleName())); } boolean isUnifiedWorker = useUnifiedWorker(options); - if (DoFnSignatures.usesBundleFinalizer(fn) && !isUnifiedWorker) { + if (DoFnSignatures.usesBundleFinalizer(fn) && !isUnifiedWorker && !streaming) { throw new UnsupportedOperationException( String.format( - "%s does not currently support %s when not using unified worker because it uses " - + "BundleFinalizers in its implementation. Set the `--experiments=use_runner_v2` " + "%s does not currently support %s in batch mode when not using unified worker because it " + + "uses BundleFinalizers in its implementation. Set the `--experiments=use_runner_v2` " + "option to use this DoFn.", DataflowRunner.class.getSimpleName(), fn.getClass().getSimpleName())); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java index 4473dff8e94a..93c288fea9ea 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java @@ -156,10 +156,7 @@ public DoFnRunner>, OutputT> crea // in the event of a crash. 10000, Duration.standardSeconds(10), - () -> { - throw new UnsupportedOperationException( - "BundleFinalizer unsupported by non-portable Dataflow."); - })); + stepContext::bundleFinalizer)); DoFnRunner>, OutputT> simpleRunner = new SimpleDoFnRunner<>( options, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index b45688771437..f75d452b211b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -35,6 +35,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.concurrent.NotThreadSafe; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; @@ -73,6 +74,7 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.values.PCollectionView; @@ -82,8 +84,10 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBasedTable; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.PeekingIterator; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; @@ -446,11 +450,27 @@ public void invalidateCache() { } } - public Map flushState() { - Map callbacks = new HashMap<>(); + public Map> flushState() { + Map> callbacks = new HashMap<>(); for (StepContext stepContext : getAllStepContexts()) { stepContext.flushState(); + for (Pair bundleFinalizer : + stepContext.flushBundleFinalizerCallbacks()) { + long id = ThreadLocalRandom.current().nextLong(); + callbacks.put( + id, + Pair.of( + bundleFinalizer.getLeft(), + () -> { + try { + bundleFinalizer.getRight().onBundleSuccess(); + } catch (Exception e) { + throw new RuntimeException("Exception while running bundle finalizer", e); + } + })); + outputBuilder.addFinalizeIds(id); + } } if (activeReader != null) { @@ -462,13 +482,15 @@ public Map flushState() { sourceStateBuilder.addFinalizeIds(id); callbacks.put( id, - () -> { - try { - checkpointMark.finalizeCheckpoint(); - } catch (IOException e) { - throw new RuntimeException("Exception while finalizing checkpoint", e); - } - }); + Pair.of( + Instant.now().plus(Duration.standardMinutes(5)), + () -> { + try { + checkpointMark.finalizeCheckpoint(); + } catch (IOException e) { + throw new RuntimeException("Exception while finalizing checkpoint", e); + } + })); @SuppressWarnings("unchecked") Coder checkpointCoder = @@ -699,6 +721,11 @@ public void setStateCleanupTimer( public DataflowStepContext namespacedToUser() { return this; } + + @Override + public BundleFinalizer bundleFinalizer() { + return wrapped.bundleFinalizer(); + } } /** A {@link SideInputReader} that fetches side inputs from the streaming worker's cache. */ @@ -771,6 +798,7 @@ class StepContext extends DataflowExecutionContext.DataflowStepContext // A list of timer keys that were modified by user processing earlier in this bundle. This // serves a tombstone, so that we know not to fire any bundle timers that were modified. private Table modifiedUserTimerKeys = null; + private final WindmillBundleFinalizer bundleFinalizer = new WindmillBundleFinalizer(); public StepContext(DataflowOperationContext operationContext) { super(operationContext.nameContext()); @@ -1043,9 +1071,37 @@ public TimerInternals timerInternals() { return checkNotNull(systemTimerInternals); } + @Override + public BundleFinalizer bundleFinalizer() { + return bundleFinalizer; + } + public TimerInternals userTimerInternals() { ensureStateful("Tried to access user timers"); return checkNotNull(userTimerInternals); } + + public ImmutableList> flushBundleFinalizerCallbacks() { + return bundleFinalizer.flushCallbacks(); + } + } + + private static class WindmillBundleFinalizer implements BundleFinalizer { + private ImmutableList.Builder> callbacks = ImmutableList.builder(); + + private WindmillBundleFinalizer() {} + + private ImmutableList> flushCallbacks() { + ImmutableList> flushedCallbacks = callbacks.build(); + if (!Iterables.isEmpty(flushedCallbacks)) { + callbacks = ImmutableList.builder(); + } + return flushedCallbacks; + } + + @Override + public void afterBundleCommit(Instant callbackExpiry, Callback callback) { + callbacks.add(Pair.of(callbackExpiry, callback)); + } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java index 3608bd1ccacd..4afee8a70df1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java @@ -54,6 +54,7 @@ final class GetWorkResponseChunkAssembler { private final WorkItem.Builder workItemBuilder; // Reused to reduce GC overhead. private ByteString data; private long bufferedSize; + private final List appliedFinalizeIds; GetWorkResponseChunkAssembler() { workTimingInfosTracker = new GetWorkTimingInfosTracker(System::currentTimeMillis); @@ -61,10 +62,11 @@ final class GetWorkResponseChunkAssembler { bufferedSize = 0; metadata = null; workItemBuilder = WorkItem.newBuilder(); + appliedFinalizeIds = new ArrayList<>(); } /** - * Appends the response chunk bytes to the {@link #data }byte buffer. Return the assembled + * Appends the response chunk bytes to the {@link #data} byte buffer. Return the assembled * WorkItem if all response chunks for a WorkItem have been received. */ List append(Windmill.StreamingGetWorkResponseChunk chunk) { @@ -72,6 +74,7 @@ List append(Windmill.StreamingGetWorkResponseChunk chunk) { metadata = ComputationMetadata.fromProto(chunk.getComputationMetadata()); } workTimingInfosTracker.addTimingInfo(chunk.getPerWorkItemTimingInfosList()); + appliedFinalizeIds.addAll(chunk.getAppliedFinalizeIdsList()); List response = new ArrayList<>(); for (int i = 0; i < chunk.getSerializedWorkItemList().size(); i++) { @@ -90,13 +93,14 @@ List append(Windmill.StreamingGetWorkResponseChunk chunk) { } /** - * Attempt to flush the {@link #data} bytes into a {@link WorkItem} w/ it's metadata. Resets the + * Attempt to flush the {@link #data} bytes into a {@link WorkItem} w/ its metadata. Resets the * data byte string and tracking metadata afterwards, whether the {@link WorkItem} deserialization * was successful or not. */ private Optional flushToWorkItem() { try { workItemBuilder.mergeFrom(data); + workItemBuilder.addAllAppliedFinalizeIds(appliedFinalizeIds); return Optional.of( AssembledWorkItem.create( workItemBuilder.build(), @@ -110,6 +114,7 @@ private Optional flushToWorkItem() { workTimingInfosTracker.reset(); data = ByteString.EMPTY; bufferedSize = 0; + appliedFinalizeIds.clear(); } return Optional.empty(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java index 7116aed3a2ba..4266f11f50c9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java @@ -17,14 +17,28 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.work.processing; -import java.time.Duration; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,28 +46,110 @@ @Internal final class StreamingCommitFinalizer { private static final Logger LOG = LoggerFactory.getLogger(StreamingCommitFinalizer.class); - private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY = Duration.ofMinutes(5L); - private final Cache commitFinalizerCache; + + /** A {@link Runnable} and expiry time pair. */ + @AutoValue + public abstract static class FinalizationInfo { + public abstract long getId(); + + public abstract Instant getExpiryTime(); + + public abstract Runnable getCallback(); + + public static FinalizationInfo create(Long id, Instant expiryTime, Runnable callback) { + return new AutoValue_StreamingCommitFinalizer_FinalizationInfo(id, expiryTime, callback); + } + } + + private final ReentrantLock lock = new ReentrantLock(); + private final Condition queueMinChanged = lock.newCondition(); + + @GuardedBy("lock") + private final HashMap commitFinalizationCallbacks = new HashMap<>(); + + @GuardedBy("lock") + private final PriorityQueue cleanUpQueue = + new PriorityQueue<>(11, Comparator.comparing(FinalizationInfo::getExpiryTime)); + + @GuardedBy("lock") + private boolean cleanUpThreadStarted = false; + private final BoundedQueueExecutor finalizationExecutor; - private StreamingCommitFinalizer( - Cache commitFinalizerCache, BoundedQueueExecutor finalizationExecutor) { - this.commitFinalizerCache = commitFinalizerCache; - this.finalizationExecutor = finalizationExecutor; + private StreamingCommitFinalizer(BoundedQueueExecutor finalizationCleanupExecutor) { + finalizationExecutor = finalizationCleanupExecutor; + } + + private void cleanupThreadBody() { + lock.lock(); + try { + while (true) { + final @Nullable FinalizationInfo minValue = cleanUpQueue.peek(); + if (minValue == null) { + // Wait for an element to be added and loop to re-examine the min. + queueMinChanged.await(); + continue; + } + + Instant now = Instant.now(); + Duration timeDifference = new Duration(now, minValue.getExpiryTime()); + if (timeDifference.getMillis() < 0 + || (queueMinChanged.await(timeDifference.getMillis(), TimeUnit.MILLISECONDS) + && cleanUpQueue.peek() == minValue)) { + // The minimum element has an expiry time before now, either because it had elapsed when + // we pulled it or because we awaited it, and it is still the minimum. + checkState(minValue == cleanUpQueue.poll()); + checkState(commitFinalizationCallbacks.remove(minValue.getId()) == minValue); + } + } + } catch (InterruptedException e) { + // We're being shutdown. + } finally { + lock.unlock(); + } } static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) { - return new StreamingCommitFinalizer( - CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_ENTRY_EXPIRY).build(), - workExecutor); + return new StreamingCommitFinalizer(workExecutor); } /** * Stores a map of user worker generated finalization ids and callbacks to execute once a commit * has been successfully committed to the backing state store. */ - void cacheCommitFinalizers(Map commitCallbacks) { - commitFinalizerCache.putAll(commitCallbacks); + public void cacheCommitFinalizers(Map> callbacks) { + for (Map.Entry> entry : callbacks.entrySet()) { + Long finalizeId = entry.getKey(); + final FinalizationInfo info = + FinalizationInfo.create( + finalizeId, entry.getValue().getLeft(), entry.getValue().getRight()); + + lock.lock(); + try { + FinalizationInfo existingInfo = commitFinalizationCallbacks.put(finalizeId, info); + if (existingInfo != null) { + throw new IllegalStateException( + "Expected to not have any past callbacks for bundle " + + finalizeId + + " but had " + + existingInfo); + } + if (!cleanUpThreadStarted) { + // Start the cleanup thread lazily for pipelines that don't use finalization callbacks + // and some tests. + cleanUpThreadStarted = true; + finalizationExecutor.execute(this::cleanupThreadBody, 0); + } + cleanUpQueue.add(info); + @SuppressWarnings("ReferenceEquality") + boolean newMin = cleanUpQueue.peek() == info; + if (newMin) { + queueMinChanged.signal(); + } + } finally { + lock.unlock(); + } + } } /** @@ -61,27 +157,41 @@ void cacheCommitFinalizers(Map commitCallbacks) { * successfully persisted in the backing state store. If the commitCallback for the finalizationId * is still cached it is invoked. */ - void finalizeCommits(Iterable finalizeIds) { - for (long finalizeId : finalizeIds) { - @Nullable Runnable finalizeCommit = commitFinalizerCache.getIfPresent(finalizeId); - // NOTE: It is possible the same callback id may be removed twice if - // windmill restarts. - // TODO: It is also possible for an earlier finalized id to be lost. - // We should automatically discard all older callbacks for the same computation and key. - if (finalizeCommit != null) { - commitFinalizerCache.invalidate(finalizeId); - finalizationExecutor.forceExecute( - () -> { - try { - finalizeCommit.run(); - } catch (OutOfMemoryError oom) { - throw oom; - } catch (Throwable t) { - LOG.error("Source checkpoint finalization failed:", t); - } - }, - 0); + public void finalizeCommits(Iterable finalizeIds) { + if (Iterables.isEmpty(finalizeIds)) { + return; + } + List callbacksToExecute = new ArrayList<>(); + lock.lock(); + try { + for (long finalizeId : finalizeIds) { + @Nullable FinalizationInfo info = commitFinalizationCallbacks.remove(finalizeId); + if (info != null) { + checkState(cleanUpQueue.remove(info)); + callbacksToExecute.add(info.getCallback()); + } + } + } finally { + lock.unlock(); + } + for (Runnable callback : callbacksToExecute) { + try { + finalizationExecutor.forceExecute(callback, 0); + } catch (OutOfMemoryError oom) { + throw oom; + } catch (Throwable t) { + LOG.error("Commit finalization failed:", t); } } } + + @VisibleForTesting + int cleanupQueueSize() { + lock.lock(); + try { + return cleanUpQueue.size(); + } finally { + lock.unlock(); + } + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index bb936831d6ea..b0f3a2899023 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -246,6 +246,7 @@ private void processWork(ComputationState computationState, Work work) { // Before any processing starts, call any pending OnCommit callbacks. Nothing that requires // cleanup should be done before this, since we might exit early here. commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList()); + commitFinalizer.finalizeCommits(workItem.getAppliedFinalizeIdsList()); if (workItem.getSourceState().getOnlyFinalize()) { Windmill.WorkItemCommitRequest.Builder outputBuilder = initializeOutputBuilder(key, workItem); outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true)); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java index 9e45425562a3..9c9f5386f443 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java @@ -36,20 +36,24 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; +import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDistribution; import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn; import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -62,16 +66,20 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; /** Tests for {@link SimpleParDoFn}. */ @RunWith(JUnit4.class) public class SimpleParDoFnTest { + @Rule public ExpectedException thrown = ExpectedException.none(); private PipelineOptions options; @@ -95,6 +103,7 @@ public void setUp() { // TODO: Replace TestDoFn usages with a mock DoFn to reduce boilerplate. static class TestDoFn extends DoFn { + enum State { UNSTARTED, SET_UP, @@ -156,6 +165,7 @@ public void teardown() { } static class TestErrorDoFn extends DoFn { + // Used to test nested stack traces. private void nestedFunctionBeta(String s) { throw new RuntimeException(s); @@ -182,6 +192,7 @@ public void finishBundle() { } static class TestReceiver implements Receiver { + List receivedElems = new ArrayList<>(); @Override @@ -563,10 +574,10 @@ public void testOutputsPerElementCounterDisabledViaExperiment() throws Exception * @param inputData Input elements to process. For each element X, the DoFn will output a string * repeated X times. * @return Delta counter updates extracted after execution. - * @throws Exception */ private List executeParDoFnCounterTest(int... inputData) throws Exception { class RepeaterDoFn extends DoFn { + /** Takes as input the number of times to output a message. */ @ProcessElement public void processElement(ProcessContext c) { @@ -616,6 +627,7 @@ public void processElement(ProcessContext c) { * conversion according to the {@link PCollectionView} and projection to a particular window. */ private static class EmptySideInputReader implements SideInputReader { + private EmptySideInputReader() {} @Override @@ -633,4 +645,101 @@ public T get(PCollectionView view, final BoundedWindow window) { throw new IllegalArgumentException("calling getSideInput() with unknown view"); } } + + @Test + public void testBundleFinalizer() throws Exception { + WithBundleFinalizerDoFn.startBundleCount.set(0); + WithBundleFinalizerDoFn.processElementCount.set(0); + WithBundleFinalizerDoFn.finishBundleCount.set(0); + DoFnInfo fnInfo = + DoFnInfo.forFn( + new WithBundleFinalizerDoFn(), + WindowingStrategy.globalDefault(), + null /* side input views */, + VarLongCoder.of(), + MAIN_OUTPUT, + DoFnSchemaInformation.create(), + Collections.emptyMap()); + DataflowExecutionContext.DataflowStepContext userStepContext = + Mockito.mock( + DataflowExecutionContext.DataflowStepContext.class, + invocation -> { + if (invocation.getMethod().getName().equals("bundleFinalizer")) { + return new BundleFinalizer() { + @Override + public void afterBundleCommit(Instant expiry, Callback callback) { + try { + callback.onBundleSuccess(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + } + return invocation.getMethod().invoke(stepContext, invocation.getArguments()); + }); + + DataflowStepContext stepContextWithBundleFinalizer = + Mockito.mock( + DataflowStepContext.class, + invocation -> { + if (invocation.getMethod().getName().equals("namespacedToUser")) { + return userStepContext; + } + return invocation.getMethod().invoke(stepContext, invocation.getArguments()); + }); + + ParDoFn parDoFn = + new SimpleParDoFn<>( + options, + DoFnInstanceManagers.singleInstance(fnInfo), + new EmptySideInputReader(), + MAIN_OUTPUT, + ImmutableMap.of(MAIN_OUTPUT, 0), + stepContextWithBundleFinalizer, + operationContext, + DoFnSchemaInformation.create(), + Collections.emptyMap(), + SimpleDoFnRunnerFactory.INSTANCE); + + parDoFn.startBundle(new TestReceiver()); + + // Process a few elements + for (int i = 0; i < 5; i++) { + parDoFn.processElement(WindowedValues.valueInGlobalWindow(1L)); + } + + parDoFn.finishBundle(); + + assertThat(WithBundleFinalizerDoFn.startBundleCount.get(), equalTo(1)); + assertThat(WithBundleFinalizerDoFn.processElementCount.get(), equalTo(5)); + assertThat(WithBundleFinalizerDoFn.finishBundleCount.get(), equalTo(1)); + } + + static class WithBundleFinalizerDoFn extends DoFn { + private static final AtomicInteger startBundleCount = new AtomicInteger(0); + private static final AtomicInteger processElementCount = new AtomicInteger(0); + private static final AtomicInteger finishBundleCount = new AtomicInteger(0); + + @StartBundle + public void startBundle(StartBundleContext context, BundleFinalizer bundleFinalizer) { + bundleFinalizer.afterBundleCommit( + Instant.now().plus(Duration.standardMinutes(5)), + () -> startBundleCount.incrementAndGet()); + } + + @ProcessElement + public void processElement(ProcessContext c, BundleFinalizer bundleFinalizer) { + bundleFinalizer.afterBundleCommit( + Instant.now().plus(Duration.standardMinutes(5)), + () -> processElementCount.incrementAndGet()); + } + + @FinishBundle + public void finishBundle(FinishBundleContext context, BundleFinalizer bundleFinalizer) { + bundleFinalizer.afterBundleCommit( + Instant.now().plus(Duration.standardMinutes(5)), + () -> finishBundleCount.incrementAndGet()); + } + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java new file mode 100644 index 000000000000..7361d0be2cd0 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.processing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class StreamingCommitFinalizerTest { + + private StreamingCommitFinalizer finalizer; + private BoundedQueueExecutor executor; + + @Before + public void setUp() { + executor = + new BoundedQueueExecutor( + 10, + 60, + TimeUnit.SECONDS, + 10, + 10000000, + new ThreadFactoryBuilder() + .setNameFormat("FinalizationCallback-%d") + .setDaemon(true) + .build(), + /*useFairMonitor=*/ false); + finalizer = StreamingCommitFinalizer.create(executor); + } + + @Test + public void testCreateAndInit() { + assertEquals(0, finalizer.cleanupQueueSize()); + } + + @Test + public void testCacheCommitFinalizer() { + Runnable callback = mock(Runnable.class); + finalizer.cacheCommitFinalizers( + ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); + assertEquals(1, finalizer.cleanupQueueSize()); + verify(callback, never()).run(); + } + + @Test + public void testThrowErrorOnDuplicateIds() { + Runnable callback1 = mock(Runnable.class); + Instant expiry = Instant.now().plus(Duration.standardHours(1)); + finalizer.cacheCommitFinalizers(ImmutableMap.of(1L, Pair.of(expiry, callback1))); + + Runnable callback2 = mock(Runnable.class); + Map> duplicateCallback = + ImmutableMap.of(1L, Pair.of(expiry, callback2)); + assertThrows( + IllegalStateException.class, () -> finalizer.cacheCommitFinalizers(duplicateCallback)); + } + + @Test + public void testFinalizeCommits() throws Exception { + CountDownLatch callbackExecuted = new CountDownLatch(1); + finalizer.cacheCommitFinalizers( + ImmutableMap.of( + 1L, + Pair.of( + Instant.now().plus(Duration.standardHours(1)), + () -> callbackExecuted.countDown()))); + finalizer.finalizeCommits(Collections.singletonList(1L)); + assertTrue(callbackExecuted.await(30, TimeUnit.SECONDS)); + assertEquals(0, finalizer.cleanupQueueSize()); + } + + @Test + public void testMultipleCommits() throws Exception { + CountDownLatch callback1Executed = new CountDownLatch(1); + CountDownLatch callback2Executed = new CountDownLatch(1); + CountDownLatch callback3Executed = new CountDownLatch(1); + + Instant expiryTime = Instant.now().plus(Duration.standardHours(1)); + finalizer.cacheCommitFinalizers( + ImmutableMap.>builder() + .put(1L, Pair.of(expiryTime, () -> callback1Executed.countDown())) + .put(2L, Pair.of(expiryTime, () -> callback2Executed.countDown())) + .put(3L, Pair.of(expiryTime, () -> callback3Executed.countDown())) + .build()); + // Finalize commits one at a time (in different order from added). + finalizer.finalizeCommits(Collections.singletonList(2L)); + assertTrue(callback2Executed.await(30, TimeUnit.SECONDS)); + + finalizer.finalizeCommits(Collections.singletonList(3L)); + assertTrue(callback3Executed.await(30, TimeUnit.SECONDS)); + + finalizer.finalizeCommits(Collections.singletonList(1L)); + assertTrue(callback1Executed.await(30, TimeUnit.SECONDS)); + + assertEquals(0, finalizer.cleanupQueueSize()); + } + + @Test + public void testIgnoresUnknownIds() throws Exception { + Runnable callback = mock(Runnable.class); + finalizer.cacheCommitFinalizers( + ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); + finalizer.finalizeCommits(Collections.singletonList(2L)); + assertEquals(1, executor.elementsOutstanding()); + verify(callback, never()).run(); + assertEquals(1, finalizer.cleanupQueueSize()); + } + + @Test + public void testCleanupOnExpiration() throws Exception { + CountDownLatch callback1Executed = new CountDownLatch(1); + finalizer.cacheCommitFinalizers( + ImmutableMap.of( + 1L, + Pair.of( + Instant.now().plus(Duration.standardHours(1)), + () -> callback1Executed.countDown()))); + assertEquals(1, finalizer.cleanupQueueSize()); + + Runnable callback2 = mock(Runnable.class); + Runnable callback3 = mock(Runnable.class); + Instant shortTimeout = Instant.now().plus(Duration.millis(100)); + finalizer.cacheCommitFinalizers( + ImmutableMap.>builder() + .put(2L, Pair.of(shortTimeout, callback2)) + .put(3L, Pair.of(shortTimeout, callback3)) + .build()); + + while (finalizer.cleanupQueueSize() > 1) { + // Wait until the two 100ms timeouts expire. + Thread.sleep(200); + } + assertEquals(1, executor.elementsOutstanding()); + finalizer.finalizeCommits(ImmutableList.of(2L, 3L)); + verify(callback2, never()).run(); + verify(callback3, never()).run(); + + finalizer.finalizeCommits(Collections.singletonList(1L)); + assertTrue(callback1Executed.await(30, TimeUnit.SECONDS)); + assertEquals(0, finalizer.cleanupQueueSize()); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index 0b179c4d3386..ca650907ffe5 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -447,6 +447,10 @@ message WorkItem { // elements mapped to a single key to impact pipeline performance. When // present, this field includes metadata associated with any hot key. optional HotKeyInfo hot_key_info = 11; + + repeated int64 applied_finalize_ids = 16; + + reserved 12, 13, 14, 15; } message ComputationWorkItems { @@ -653,10 +657,16 @@ message WorkItemCommitRequest { // Collected work item processing state durations. repeated LatencyAttribution per_work_item_latency_attributions = 27; + // Ids that will be passed back as applied_finalize_ids in a subsequent + // GetWorkResponse once the state in this request has been persisted to disk + // successfully. This is best-effort; it is possible that state is persisted + // but the finalize ids are not sent to the worker. + repeated int64 finalize_ids = 19 [packed = true]; + // DEPRECATED repeated GlobalDataId global_data_id_requests = 9; - reserved 6, 19, 23; + reserved 6, 23; } message ComputationCommitWorkRequest { @@ -791,11 +801,14 @@ message StreamingGetWorkResponseChunk { // from other stream_ids may be interleaved on the physical stream. optional fixed64 stream_id = 4; + // Finalize ids associated with successfully applied work from this worker + repeated int64 applied_finalize_ids = 6 [packed = true]; + // Timing infos for the work item. Windmill Dispatcher and user worker should // propagate critical event timings if the list is not empty. repeated GetWorkStreamTimingInfo per_work_item_timing_infos = 8; - // reserved field 5 + reserved 5, 7; } message ComputationWorkItemMetadata {