diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index db24215e32c7..51038957d871 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -565,6 +565,22 @@ public Instant timestamp(DoFn doFn) { return timestamp(); } + @Override + public @Nullable String currentRecordId(DoFn doFn) { + return currentRecordId(); + } + + @Override + public @Nullable Long currentRecordOffset(DoFn doFn) { + return currentRecordOffset(); + } + + @Override + public Instant fireTimestamp(DoFn doFn) { + throw new UnsupportedOperationException( + "Cannot access fire timestamp outside of @OnTimer method."); + } + @Override public CausedByDrain causedByDrain(DoFn doFn) { return elem.causedByDrain(); @@ -846,6 +862,23 @@ public Instant timestamp(DoFn doFn) { return timestamp(); } + @Override + public @Nullable String currentRecordId(DoFn doFn) { + throw new UnsupportedOperationException( + "Cannot access record id outside of @ProcessElement method."); + } + + @Override + public @Nullable Long currentRecordOffset(DoFn doFn) { + throw new UnsupportedOperationException( + "Cannot access record offset outside of @ProcessElement method."); + } + + @Override + public Instant fireTimestamp(DoFn doFn) { + return fireTimestamp(); + } + @Override public CausedByDrain causedByDrain(DoFn doFn) { return causedByDrain; @@ -1144,6 +1177,24 @@ public CausedByDrain causedByDrain(DoFn doFn) { throw new UnsupportedOperationException("CausedByDrain parameters are not supported."); } + @Override + public @Nullable String currentRecordId(DoFn doFn) { + throw new UnsupportedOperationException( + "Cannot access record id outside of @ProcessElement method."); + } + + @Override + public @Nullable Long currentRecordOffset(DoFn doFn) { + throw new UnsupportedOperationException( + "Cannot access record offset outside of @ProcessElement method."); + } + + @Override + public Instant fireTimestamp(DoFn doFn) { + throw new UnsupportedOperationException( + "Cannot access fire timestamp outside of @OnTimer method."); + } + @Override public String timerId(DoFn doFn) { throw new UnsupportedOperationException("Timer parameters are not supported."); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 41beb93a5cbe..c83179a9953e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -590,8 +590,10 @@ public interface MultiOutputReceiver { * *

The method annotated with {@code @OnTimer} may have parameters according to the same logic * as {@link ProcessElement}, but limited to the {@link BoundedWindow}, {@link State} subclasses, - * and {@link Timer}. State and timer parameters must be annotated with their {@link StateId} and - * {@link TimerId} respectively. + * {@link Timer}, {@link FireTimestamp}, {@link Timestamp}, {@link Key}, {@link TimeDomain}, + * {@link PipelineOptions}, {@link OutputReceiver}, {@link MultiOutputReceiver}, and {@link + * org.apache.beam.sdk.values.CausedByDrain CausedByDrain}. State and timer parameters must be + * annotated with their {@link StateId} and {@link TimerId} respectively. */ @Documented @Retention(RetentionPolicy.RUNTIME) @@ -608,8 +610,10 @@ public interface MultiOutputReceiver { * *

The method annotated with {@code @OnTimerFamily} may have parameters according to the same * logic as {@link ProcessElement}, but limited to the {@link BoundedWindow}, {@link State} - * subclasses, and {@link org.apache.beam.sdk.state.TimerMap}. State and timer parameters must be - * annotated with their {@link StateId} and {@link TimerId} respectively. + * subclasses, {@link org.apache.beam.sdk.state.TimerMap}, {@link Timestamp}, {@link Key}, {@link + * TimeDomain}, {@link PipelineOptions}, {@link OutputReceiver}, {@link MultiOutputReceiver}, and + * {@link org.apache.beam.sdk.values.CausedByDrain CausedByDrain}. State and timer parameters must + * be annotated with their {@link StateId} and {@link TimerId} respectively. */ @Documented @Retention(RetentionPolicy.RUNTIME) @@ -700,6 +704,12 @@ public interface MultiOutputReceiver { *

  • If one of its arguments is tagged with the {@link Element} annotation, then it will be * passed the current element being processed. The argument type must match the input type * of this DoFn exactly, or both types must have equivalent schemas registered. + *
  • If one of its arguments is tagged with the {@link RecordId} annotation, then it will be + * passed the record id of the current element being processed; the argument must be of type + * {@link String}. + *
  • If one of its arguments is tagged with the {@link RecordOffset} annotation, then it will + * be passed the record offset of the current element being processed; the argument must be + * of type {@link Long}. *
  • If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be * passed the timestamp of the current element being processed; the argument must be of type * {@link Instant}. @@ -790,6 +800,12 @@ public interface MultiOutputReceiver { *
  • If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be * passed the timestamp of the current element being processed; the argument must be of type * {@link Instant}. + *
  • If one of its arguments is tagged with the {@link CurrentRecordId} annotation, then it + * will be passed the record ID of the current element being processed; the argument must be + * of type {@link String}. + *
  • If one of its arguments is tagged with the {@link CurrentRecordOffset} annotation, then + * it will be passed the record offset of the current element being processed; the argument + * must be of type {@link Long}. *
  • If one of its arguments is of the type {@link WatermarkEstimator}, then it will be passed * the watermark estimator. *
  • If one of its arguments is of the type {@link ManualWatermarkEstimator}, then it will be @@ -833,6 +849,18 @@ public interface MultiOutputReceiver { @Target(ElementType.PARAMETER) public @interface Element {} + /** Parameter annotation for the input element record id for {@link ProcessElement}. */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.PARAMETER) + public @interface CurrentRecordId {} + + /** Parameter annotation for the input element record offset for {@link ProcessElement}. */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.PARAMETER) + public @interface CurrentRecordOffset {} + /** * Parameter annotation for the restriction for {@link GetSize}, {@link SplitRestriction}, {@link * GetInitialWatermarkEstimatorState}, {@link NewWatermarkEstimator}, and {@link NewTracker} @@ -855,6 +883,12 @@ public interface MultiOutputReceiver { @Target(ElementType.PARAMETER) public @interface Timestamp {} + /** Parameter annotation for the firing timestamp for {@link OnTimer}. */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.PARAMETER) + public @interface FireTimestamp {} + /** Parameter annotation for the SideInput for a {@link ProcessElement} method. */ @Documented @Retention(RetentionPolicy.RUNTIME) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index 54d630d92fe4..9adbe3a12cf4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -77,8 +77,11 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BundleFinalizerParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.CausedByDrainParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.CurrentRecordIdParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.CurrentRecordOffsetParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ElementParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.FinishBundleContextParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.FireTimestampParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OnTimerContextParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.PaneInfoParameter; @@ -128,6 +131,9 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { public static final String SCHEMA_ELEMENT_PARAMETER_METHOD = "schemaElement"; public static final String TIMESTAMP_PARAMETER_METHOD = "timestamp"; public static final String CAUSED_BY_DRAIN_PARAMETER_METHOD = "causedByDrain"; + public static final String CURRENT_RECORD_ID_PARAMETER_METHOD = "currentRecordId"; + public static final String CURRENT_RECORD_OFFSET_PARAMETER_METHOD = "currentRecordOffset"; + public static final String FIRE_TIMESTAMP_PARAMETER_METHOD = "fireTimestamp"; public static final String BUNDLE_FINALIZER_PARAMETER_METHOD = "bundleFinalizer"; public static final String OUTPUT_ROW_RECEIVER_METHOD = "outputRowReceiver"; public static final String TIME_DOMAIN_PARAMETER_METHOD = "timeDomain"; @@ -1265,6 +1271,33 @@ public StackManipulation dispatch(DoFnSignature.Parameter.TimerIdParameter p) { TIMER_ID_PARAMETER_METHOD, DoFn.class))); } + @Override + public StackManipulation dispatch(CurrentRecordIdParameter p) { + return new StackManipulation.Compound( + pushDelegate, + MethodInvocation.invoke( + getExtraContextFactoryMethodDescription( + CURRENT_RECORD_ID_PARAMETER_METHOD, DoFn.class))); + } + + @Override + public StackManipulation dispatch(CurrentRecordOffsetParameter p) { + return new StackManipulation.Compound( + pushDelegate, + MethodInvocation.invoke( + getExtraContextFactoryMethodDescription( + CURRENT_RECORD_OFFSET_PARAMETER_METHOD, DoFn.class))); + } + + @Override + public StackManipulation dispatch(FireTimestampParameter p) { + return new StackManipulation.Compound( + pushDelegate, + MethodInvocation.invoke( + getExtraContextFactoryMethodDescription( + FIRE_TIMESTAMP_PARAMETER_METHOD, DoFn.class))); + } + @Override public StackManipulation dispatch(DoFnSignature.Parameter.KeyParameter p) { return new StackManipulation.Compound( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index a615761292aa..1f122f1bf661 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -218,6 +218,17 @@ interface ArgumentProvider { /** Provide a reference to the input element timestamp. */ Instant timestamp(DoFn doFn); + /** Provide a reference to the record id of the current element. */ + @Nullable + String currentRecordId(DoFn doFn); + + /** Provide a reference to the record offset of the current element. */ + @Nullable + Long currentRecordOffset(DoFn doFn); + + /** Provide a reference to the firing timestamp of the current timer. */ + Instant fireTimestamp(DoFn doFn); + /** Provide a reference to the caused by drain. */ CausedByDrain causedByDrain(DoFn doFn); @@ -329,6 +340,24 @@ public Instant timestamp(DoFn doFn) { String.format("Timestamp unsupported in %s", getErrorContext())); } + @Override + public @Nullable String currentRecordId(DoFn doFn) { + throw new UnsupportedOperationException( + String.format("RecordId unsupported in %s", getErrorContext())); + } + + @Override + public @Nullable Long currentRecordOffset(DoFn doFn) { + throw new UnsupportedOperationException( + String.format("RecordOffset unsupported in %s", getErrorContext())); + } + + @Override + public Instant fireTimestamp(DoFn doFn) { + throw new UnsupportedOperationException( + String.format("FireTimestamp unsupported in %s", getErrorContext())); + } + @Override public CausedByDrain causedByDrain(DoFn doFn) { throw new UnsupportedOperationException( @@ -524,6 +553,21 @@ public Instant timestamp(DoFn doFn) { return delegate.timestamp(doFn); } + @Override + public @Nullable String currentRecordId(DoFn doFn) { + return delegate.currentRecordId(doFn); + } + + @Override + public @Nullable Long currentRecordOffset(DoFn doFn) { + return delegate.currentRecordOffset(doFn); + } + + @Override + public Instant fireTimestamp(DoFn doFn) { + return delegate.fireTimestamp(doFn); + } + @Override public CausedByDrain causedByDrain(DoFn doFn) { return delegate.causedByDrain(doFn); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index af0353c902a6..6262c55cce4c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -344,6 +344,12 @@ public ResultT match(Cases cases) { return cases.dispatch((BundleFinalizerParameter) this); } else if (this instanceof CausedByDrainParameter) { return cases.dispatch((CausedByDrainParameter) this); + } else if (this instanceof CurrentRecordIdParameter) { + return cases.dispatch((CurrentRecordIdParameter) this); + } else if (this instanceof CurrentRecordOffsetParameter) { + return cases.dispatch((CurrentRecordOffsetParameter) this); + } else if (this instanceof FireTimestampParameter) { + return cases.dispatch((FireTimestampParameter) this); } else if (this instanceof KeyParameter) { return cases.dispatch((KeyParameter) this); } else { @@ -374,6 +380,12 @@ public interface Cases { ResultT dispatch(TaggedOutputReceiverParameter p); + ResultT dispatch(CurrentRecordIdParameter p); + + ResultT dispatch(CurrentRecordOffsetParameter p); + + ResultT dispatch(FireTimestampParameter p); + ResultT dispatch(OnTimerContextParameter p); ResultT dispatch(WindowParameter p); @@ -461,6 +473,21 @@ public ResultT dispatch(TimeDomainParameter p) { return dispatchDefault(p); } + @Override + public ResultT dispatch(CurrentRecordIdParameter p) { + return dispatchDefault(p); + } + + @Override + public ResultT dispatch(CurrentRecordOffsetParameter p) { + return dispatchDefault(p); + } + + @Override + public ResultT dispatch(FireTimestampParameter p) { + return dispatchDefault(p); + } + @Override public ResultT dispatch(OnTimerContextParameter p) { return dispatchDefault(p); @@ -565,6 +592,12 @@ public ResultT dispatch(KeyParameter p) { new AutoValue_DoFnSignature_Parameter_CausedByDrainParameter(); private static final OnWindowExpirationContextParameter ON_WINDOW_EXPIRATION_CONTEXT_PARAMETER = new AutoValue_DoFnSignature_Parameter_OnWindowExpirationContextParameter(); + private static final CurrentRecordIdParameter CURRENT_RECORD_ID_PARAMETER = + new AutoValue_DoFnSignature_Parameter_CurrentRecordIdParameter(); + private static final CurrentRecordOffsetParameter CURRENT_RECORD_OFFSET_PARAMETER = + new AutoValue_DoFnSignature_Parameter_CurrentRecordOffsetParameter(); + private static final FireTimestampParameter FIRE_TIMESTAMP_PARAMETER = + new AutoValue_DoFnSignature_Parameter_FireTimestampParameter(); /** Returns a {@link ProcessContextParameter}. */ public static ProcessContextParameter processContext() { @@ -591,6 +624,21 @@ public static CausedByDrainParameter causedByDrainParameter() { return CAUSED_BY_DRAIN_PARAMETER; } + /** Returns a {@link CurrentRecordIdParameter}. */ + public static CurrentRecordIdParameter currentRecordIdParameter() { + return CURRENT_RECORD_ID_PARAMETER; + } + + /** Returns a {@link CurrentRecordOffsetParameter}. */ + public static CurrentRecordOffsetParameter currentRecordOffsetParameter() { + return CURRENT_RECORD_OFFSET_PARAMETER; + } + + /** Returns a {@link FireTimestampParameter}. */ + public static FireTimestampParameter fireTimestampParameter() { + return FIRE_TIMESTAMP_PARAMETER; + } + public static ElementParameter elementParameter(TypeDescriptor elementT) { return new AutoValue_DoFnSignature_Parameter_ElementParameter(elementT); } @@ -753,6 +801,36 @@ public abstract static class CausedByDrainParameter extends Parameter { CausedByDrainParameter() {} } + /** + * Descriptor for a {@link Parameter} of type {@link DoFn.RecordId}. + * + *

    All such descriptors are equal. + */ + @AutoValue + public abstract static class CurrentRecordIdParameter extends Parameter { + CurrentRecordIdParameter() {} + } + + /** + * Descriptor for a {@link Parameter} of type {@link DoFn.RecordOffset}. + * + *

    All such descriptors are equal. + */ + @AutoValue + public abstract static class CurrentRecordOffsetParameter extends Parameter { + CurrentRecordOffsetParameter() {} + } + + /** + * Descriptor for a {@link Parameter} of type {@link DoFn.FireTimestamp}. + * + *

    All such descriptors are equal. + */ + @AutoValue + public abstract static class FireTimestampParameter extends Parameter { + FireTimestampParameter() {} + } + /** * Descriptor for a {@link Parameter} of type {@link DoFn.Element}. * diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 3dcf7ff1f9d0..ec624696fc7c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -140,6 +140,8 @@ private DoFnSignatures() {} Parameter.StateParameter.class, Parameter.SideInputParameter.class, Parameter.TimerFamilyParameter.class, + Parameter.CurrentRecordIdParameter.class, + Parameter.CurrentRecordOffsetParameter.class, Parameter.CausedByDrainParameter.class, Parameter.BundleFinalizerParameter.class); @@ -157,6 +159,8 @@ private DoFnSignatures() {} Parameter.RestrictionTrackerParameter.class, Parameter.WatermarkEstimatorParameter.class, Parameter.SideInputParameter.class, + Parameter.CurrentRecordIdParameter.class, + Parameter.CurrentRecordOffsetParameter.class, Parameter.CausedByDrainParameter.class, Parameter.BundleFinalizerParameter.class); @@ -188,6 +192,7 @@ private DoFnSignatures() {} Parameter.StateParameter.class, Parameter.TimerFamilyParameter.class, Parameter.TimerIdParameter.class, + Parameter.FireTimestampParameter.class, Parameter.CausedByDrainParameter.class, Parameter.KeyParameter.class); @@ -205,6 +210,7 @@ private DoFnSignatures() {} Parameter.StateParameter.class, Parameter.TimerFamilyParameter.class, Parameter.TimerIdParameter.class, + Parameter.FireTimestampParameter.class, Parameter.CausedByDrainParameter.class, Parameter.KeyParameter.class); @@ -1348,6 +1354,19 @@ private static Parameter analyzeExtraParameter( rawType.equals(Instant.class), "@Timestamp argument must have type org.joda.time.Instant."); return Parameter.timestampParameter(); + } else if (hasAnnotation(DoFn.CurrentRecordId.class, param.getAnnotations())) { + methodErrors.checkArgument( + rawType.equals(String.class), "@CurrentRecordId argument must have type String."); + return Parameter.currentRecordIdParameter(); + } else if (hasAnnotation(DoFn.CurrentRecordOffset.class, param.getAnnotations())) { + methodErrors.checkArgument( + rawType.equals(Long.class), "@CurrentRecordOffset argument must have type Long."); + return Parameter.currentRecordOffsetParameter(); + } else if (hasAnnotation(DoFn.FireTimestamp.class, param.getAnnotations())) { + methodErrors.checkArgument( + rawType.equals(Instant.class), + "@FireTimestamp argument must have type org.joda.time.Instant."); + return Parameter.fireTimestampParameter(); } else if (hasAnnotation(DoFn.Key.class, param.getAnnotations())) { methodErrors.checkArgument( KV.class.equals(inputT.getRawType()), diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java index 6d058b3b6ada..1e7083629843 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java @@ -689,6 +689,21 @@ public Long currentRecordOffset() { return outerContext.currentRecordOffset(); } + @Override + public @Nullable String currentRecordId(DoFn doFn) { + return outerContext.currentRecordId(); + } + + @Override + public @Nullable Long currentRecordOffset(DoFn doFn) { + return outerContext.currentRecordOffset(); + } + + @Override + public Instant fireTimestamp(DoFn doFn) { + throw new IllegalStateException(); + } + @Override public Object watermarkEstimatorState() { throw new UnsupportedOperationException( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index 3369e18519ba..5a5353482c95 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -57,8 +57,11 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BundleFinalizerParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.CausedByDrainParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.CurrentRecordIdParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.CurrentRecordOffsetParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ElementParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.FinishBundleContextParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.FireTimestampParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.PaneInfoParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.PipelineOptionsParameter; @@ -101,6 +104,37 @@ @SuppressWarnings("unused") public class DoFnSignaturesTest { + @Test + public void testRecordIdOnTimerError() throws Exception { + final String timerId = "some-timer-id"; + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Illegal parameter type"); + thrown.expectMessage("CurrentRecordIdParameter"); + DoFnSignatures.getSignature( + new DoFn() { + @TimerId(timerId) + private final TimerSpec myfield1 = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void process(ProcessContext c) {} + + @OnTimer(timerId) + public void onTimer(@DoFn.CurrentRecordId String id) {} + }.getClass()); + } + + @Test + public void testFireTimestampOnProcessElementError() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Illegal parameter type"); + thrown.expectMessage("FireTimestampParameter"); + DoFnSignatures.getSignature( + new DoFn() { + @ProcessElement + public void process(@FireTimestamp Instant ts) {} + }.getClass()); + } + @Rule public ExpectedException thrown = ExpectedException.none(); @Test @@ -153,6 +187,24 @@ public void process( sig.processElement().extraParameters().get(9), instanceOf(CausedByDrainParameter.class)); } + @Test + public void testProcessElementRecordIdAndOffset() throws Exception { + DoFnSignature sig = + DoFnSignatures.getSignature( + new DoFn() { + @ProcessElement + public void process( + @DoFn.CurrentRecordId String id, @DoFn.CurrentRecordOffset Long offset) {} + }.getClass()); + + assertThat(sig.processElement().extraParameters().size(), equalTo(2)); + assertThat( + sig.processElement().extraParameters().get(0), instanceOf(CurrentRecordIdParameter.class)); + assertThat( + sig.processElement().extraParameters().get(1), + instanceOf(CurrentRecordOffsetParameter.class)); + } + @Test public void testBasicDoFnMultiOutputReceiver() throws Exception { DoFnSignature sig = @@ -647,6 +699,30 @@ public void onTimer( instanceOf(WindowParameter.class)); } + @Test + public void testFireTimestampOnTimer() throws Exception { + final String timerId = "some-timer-id"; + final String timerDeclarationId = TimerDeclaration.PREFIX + timerId; + + DoFnSignature sig = + DoFnSignatures.getSignature( + new DoFn() { + @TimerId(timerId) + private final TimerSpec myfield1 = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void process(ProcessContext c) {} + + @OnTimer(timerId) + public void onTimer(@FireTimestamp Instant fireTimestamp) {} + }.getClass()); + + assertThat(sig.onTimerMethods().get(timerDeclarationId).extraParameters().size(), equalTo(1)); + assertThat( + sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(0), + instanceOf(FireTimestampParameter.class)); + } + @Test public void testPipelineOptionsParameter() throws Exception { DoFnSignature sig = diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index de69f49ecc3c..894ea2a1251d 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -2041,6 +2041,22 @@ public Instant timestamp(DoFn doFn) { return timestamp(); } + @Override + public @Nullable String currentRecordId(DoFn doFn) { + return currentRecordId(); + } + + @Override + public @Nullable Long currentRecordOffset(DoFn doFn) { + return currentRecordOffset(); + } + + @Override + public Instant fireTimestamp(DoFn doFn) { + throw new UnsupportedOperationException( + "Cannot access fire timestamp outside of @OnTimer method."); + } + @Override public String timerId(DoFn doFn) { throw new UnsupportedOperationException( @@ -2711,6 +2727,11 @@ public TimeDomain timeDomain(DoFn doFn) { return currentTimeDomain; } + @Override + public Instant fireTimestamp(DoFn doFn) { + return currentTimer.getFireTimestamp(); + } + @Override public K key() { return (K) currentTimer.getUserKey();