Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,22 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return timestamp();
}

@Override
public @Nullable String currentRecordId(DoFn<InputT, OutputT> doFn) {
return currentRecordId();
}

@Override
public @Nullable Long currentRecordOffset(DoFn<InputT, OutputT> doFn) {
return currentRecordOffset();
}

@Override
public Instant fireTimestamp(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
"Cannot access fire timestamp outside of @OnTimer method.");
}

@Override
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
return elem.causedByDrain();
Expand Down Expand Up @@ -846,6 +862,23 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return timestamp();
}

@Override
public @Nullable String currentRecordId(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
"Cannot access record id outside of @ProcessElement method.");
}

@Override
public @Nullable Long currentRecordOffset(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
"Cannot access record offset outside of @ProcessElement method.");
}

@Override
public Instant fireTimestamp(DoFn<InputT, OutputT> doFn) {
return fireTimestamp();
}

@Override
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
return causedByDrain;
Expand Down Expand Up @@ -1144,6 +1177,24 @@ public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException("CausedByDrain parameters are not supported.");
}

@Override
public @Nullable String currentRecordId(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
"Cannot access record id outside of @ProcessElement method.");
}

@Override
public @Nullable Long currentRecordOffset(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
"Cannot access record offset outside of @ProcessElement method.");
}

@Override
public Instant fireTimestamp(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
"Cannot access fire timestamp outside of @OnTimer method.");
}

@Override
public String timerId(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException("Timer parameters are not supported.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,10 @@ public interface MultiOutputReceiver {
*
* <p>The method annotated with {@code @OnTimer} may have parameters according to the same logic
* as {@link ProcessElement}, but limited to the {@link BoundedWindow}, {@link State} subclasses,
* and {@link Timer}. State and timer parameters must be annotated with their {@link StateId} and
* {@link TimerId} respectively.
* {@link Timer}, {@link FireTimestamp}, {@link Timestamp}, {@link Key}, {@link TimeDomain},
* {@link PipelineOptions}, {@link OutputReceiver}, {@link MultiOutputReceiver}, and {@link
* org.apache.beam.sdk.values.CausedByDrain CausedByDrain}. State and timer parameters must be
* annotated with their {@link StateId} and {@link TimerId} respectively.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
Expand All @@ -608,8 +610,10 @@ public interface MultiOutputReceiver {
*
* <p>The method annotated with {@code @OnTimerFamily} may have parameters according to the same
* logic as {@link ProcessElement}, but limited to the {@link BoundedWindow}, {@link State}
* subclasses, and {@link org.apache.beam.sdk.state.TimerMap}. State and timer parameters must be
* annotated with their {@link StateId} and {@link TimerId} respectively.
* subclasses, {@link org.apache.beam.sdk.state.TimerMap}, {@link Timestamp}, {@link Key}, {@link
* TimeDomain}, {@link PipelineOptions}, {@link OutputReceiver}, {@link MultiOutputReceiver}, and
* {@link org.apache.beam.sdk.values.CausedByDrain CausedByDrain}. State and timer parameters must
* be annotated with their {@link StateId} and {@link TimerId} respectively.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
Expand Down Expand Up @@ -700,6 +704,12 @@ public interface MultiOutputReceiver {
* <li>If one of its arguments is tagged with the {@link Element} annotation, then it will be
* passed the current element being processed. The argument type must match the input type
* of this DoFn exactly, or both types must have equivalent schemas registered.
* <li>If one of its arguments is tagged with the {@link RecordId} annotation, then it will be
* passed the record id of the current element being processed; the argument must be of type
* {@link String}.
* <li>If one of its arguments is tagged with the {@link RecordOffset} annotation, then it will
* be passed the record offset of the current element being processed; the argument must be
* of type {@link Long}.
* <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be
* passed the timestamp of the current element being processed; the argument must be of type
* {@link Instant}.
Expand Down Expand Up @@ -790,6 +800,12 @@ public interface MultiOutputReceiver {
* <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be
* passed the timestamp of the current element being processed; the argument must be of type
* {@link Instant}.
* <li>If one of its arguments is tagged with the {@link CurrentRecordId} annotation, then it
* will be passed the record ID of the current element being processed; the argument must be
* of type {@link String}.
* <li>If one of its arguments is tagged with the {@link CurrentRecordOffset} annotation, then
* it will be passed the record offset of the current element being processed; the argument
* must be of type {@link Long}.
* <li>If one of its arguments is of the type {@link WatermarkEstimator}, then it will be passed
* the watermark estimator.
* <li>If one of its arguments is of the type {@link ManualWatermarkEstimator}, then it will be
Expand Down Expand Up @@ -833,6 +849,18 @@ public interface MultiOutputReceiver {
@Target(ElementType.PARAMETER)
public @interface Element {}

/** Parameter annotation for the input element record id for {@link ProcessElement}. */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
public @interface CurrentRecordId {}

/** Parameter annotation for the input element record offset for {@link ProcessElement}. */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
public @interface CurrentRecordOffset {}

/**
* Parameter annotation for the restriction for {@link GetSize}, {@link SplitRestriction}, {@link
* GetInitialWatermarkEstimatorState}, {@link NewWatermarkEstimator}, and {@link NewTracker}
Expand All @@ -855,6 +883,12 @@ public interface MultiOutputReceiver {
@Target(ElementType.PARAMETER)
public @interface Timestamp {}

/** Parameter annotation for the firing timestamp for {@link OnTimer}. */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
public @interface FireTimestamp {}

/** Parameter annotation for the SideInput for a {@link ProcessElement} method. */
@Documented
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BundleFinalizerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.CausedByDrainParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.CurrentRecordIdParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.CurrentRecordOffsetParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ElementParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.FinishBundleContextParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.FireTimestampParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OnTimerContextParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.PaneInfoParameter;
Expand Down Expand Up @@ -128,6 +131,9 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
public static final String SCHEMA_ELEMENT_PARAMETER_METHOD = "schemaElement";
public static final String TIMESTAMP_PARAMETER_METHOD = "timestamp";
public static final String CAUSED_BY_DRAIN_PARAMETER_METHOD = "causedByDrain";
public static final String CURRENT_RECORD_ID_PARAMETER_METHOD = "currentRecordId";
public static final String CURRENT_RECORD_OFFSET_PARAMETER_METHOD = "currentRecordOffset";
public static final String FIRE_TIMESTAMP_PARAMETER_METHOD = "fireTimestamp";
public static final String BUNDLE_FINALIZER_PARAMETER_METHOD = "bundleFinalizer";
public static final String OUTPUT_ROW_RECEIVER_METHOD = "outputRowReceiver";
public static final String TIME_DOMAIN_PARAMETER_METHOD = "timeDomain";
Expand Down Expand Up @@ -1265,6 +1271,33 @@ public StackManipulation dispatch(DoFnSignature.Parameter.TimerIdParameter p) {
TIMER_ID_PARAMETER_METHOD, DoFn.class)));
}

@Override
public StackManipulation dispatch(CurrentRecordIdParameter p) {
return new StackManipulation.Compound(
pushDelegate,
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
CURRENT_RECORD_ID_PARAMETER_METHOD, DoFn.class)));
}

@Override
public StackManipulation dispatch(CurrentRecordOffsetParameter p) {
return new StackManipulation.Compound(
pushDelegate,
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
CURRENT_RECORD_OFFSET_PARAMETER_METHOD, DoFn.class)));
}

@Override
public StackManipulation dispatch(FireTimestampParameter p) {
return new StackManipulation.Compound(
pushDelegate,
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
FIRE_TIMESTAMP_PARAMETER_METHOD, DoFn.class)));
}

@Override
public StackManipulation dispatch(DoFnSignature.Parameter.KeyParameter p) {
return new StackManipulation.Compound(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,17 @@ interface ArgumentProvider<InputT, OutputT> {
/** Provide a reference to the input element timestamp. */
Instant timestamp(DoFn<InputT, OutputT> doFn);

/** Provide a reference to the record id of the current element. */
@Nullable
String currentRecordId(DoFn<InputT, OutputT> doFn);

/** Provide a reference to the record offset of the current element. */
@Nullable
Long currentRecordOffset(DoFn<InputT, OutputT> doFn);

/** Provide a reference to the firing timestamp of the current timer. */
Instant fireTimestamp(DoFn<InputT, OutputT> doFn);

/** Provide a reference to the caused by drain. */
CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn);

Expand Down Expand Up @@ -329,6 +340,24 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
String.format("Timestamp unsupported in %s", getErrorContext()));
}

@Override
public @Nullable String currentRecordId(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
String.format("RecordId unsupported in %s", getErrorContext()));
}

@Override
public @Nullable Long currentRecordOffset(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
String.format("RecordOffset unsupported in %s", getErrorContext()));
}

@Override
public Instant fireTimestamp(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
String.format("FireTimestamp unsupported in %s", getErrorContext()));
}

@Override
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -524,6 +553,21 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return delegate.timestamp(doFn);
}

@Override
public @Nullable String currentRecordId(DoFn<InputT, OutputT> doFn) {
return delegate.currentRecordId(doFn);
}

@Override
public @Nullable Long currentRecordOffset(DoFn<InputT, OutputT> doFn) {
return delegate.currentRecordOffset(doFn);
}

@Override
public Instant fireTimestamp(DoFn<InputT, OutputT> doFn) {
return delegate.fireTimestamp(doFn);
}

@Override
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
return delegate.causedByDrain(doFn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ public <ResultT> ResultT match(Cases<ResultT> cases) {
return cases.dispatch((BundleFinalizerParameter) this);
} else if (this instanceof CausedByDrainParameter) {
return cases.dispatch((CausedByDrainParameter) this);
} else if (this instanceof CurrentRecordIdParameter) {
return cases.dispatch((CurrentRecordIdParameter) this);
} else if (this instanceof CurrentRecordOffsetParameter) {
return cases.dispatch((CurrentRecordOffsetParameter) this);
} else if (this instanceof FireTimestampParameter) {
return cases.dispatch((FireTimestampParameter) this);
} else if (this instanceof KeyParameter) {
return cases.dispatch((KeyParameter) this);
} else {
Expand Down Expand Up @@ -374,6 +380,12 @@ public interface Cases<ResultT> {

ResultT dispatch(TaggedOutputReceiverParameter p);

ResultT dispatch(CurrentRecordIdParameter p);

ResultT dispatch(CurrentRecordOffsetParameter p);

ResultT dispatch(FireTimestampParameter p);

ResultT dispatch(OnTimerContextParameter p);

ResultT dispatch(WindowParameter p);
Expand Down Expand Up @@ -461,6 +473,21 @@ public ResultT dispatch(TimeDomainParameter p) {
return dispatchDefault(p);
}

@Override
public ResultT dispatch(CurrentRecordIdParameter p) {
return dispatchDefault(p);
}

@Override
public ResultT dispatch(CurrentRecordOffsetParameter p) {
return dispatchDefault(p);
}

@Override
public ResultT dispatch(FireTimestampParameter p) {
return dispatchDefault(p);
}

@Override
public ResultT dispatch(OnTimerContextParameter p) {
return dispatchDefault(p);
Expand Down Expand Up @@ -565,6 +592,12 @@ public ResultT dispatch(KeyParameter p) {
new AutoValue_DoFnSignature_Parameter_CausedByDrainParameter();
private static final OnWindowExpirationContextParameter ON_WINDOW_EXPIRATION_CONTEXT_PARAMETER =
new AutoValue_DoFnSignature_Parameter_OnWindowExpirationContextParameter();
private static final CurrentRecordIdParameter CURRENT_RECORD_ID_PARAMETER =
new AutoValue_DoFnSignature_Parameter_CurrentRecordIdParameter();
private static final CurrentRecordOffsetParameter CURRENT_RECORD_OFFSET_PARAMETER =
new AutoValue_DoFnSignature_Parameter_CurrentRecordOffsetParameter();
private static final FireTimestampParameter FIRE_TIMESTAMP_PARAMETER =
new AutoValue_DoFnSignature_Parameter_FireTimestampParameter();

/** Returns a {@link ProcessContextParameter}. */
public static ProcessContextParameter processContext() {
Expand All @@ -591,6 +624,21 @@ public static CausedByDrainParameter causedByDrainParameter() {
return CAUSED_BY_DRAIN_PARAMETER;
}

/** Returns a {@link CurrentRecordIdParameter}. */
public static CurrentRecordIdParameter currentRecordIdParameter() {
return CURRENT_RECORD_ID_PARAMETER;
}

/** Returns a {@link CurrentRecordOffsetParameter}. */
public static CurrentRecordOffsetParameter currentRecordOffsetParameter() {
return CURRENT_RECORD_OFFSET_PARAMETER;
}

/** Returns a {@link FireTimestampParameter}. */
public static FireTimestampParameter fireTimestampParameter() {
return FIRE_TIMESTAMP_PARAMETER;
}

public static ElementParameter elementParameter(TypeDescriptor<?> elementT) {
return new AutoValue_DoFnSignature_Parameter_ElementParameter(elementT);
}
Expand Down Expand Up @@ -753,6 +801,36 @@ public abstract static class CausedByDrainParameter extends Parameter {
CausedByDrainParameter() {}
}

/**
* Descriptor for a {@link Parameter} of type {@link DoFn.RecordId}.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class CurrentRecordIdParameter extends Parameter {
CurrentRecordIdParameter() {}
}

/**
* Descriptor for a {@link Parameter} of type {@link DoFn.RecordOffset}.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class CurrentRecordOffsetParameter extends Parameter {
CurrentRecordOffsetParameter() {}
}

/**
* Descriptor for a {@link Parameter} of type {@link DoFn.FireTimestamp}.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class FireTimestampParameter extends Parameter {
FireTimestampParameter() {}
}

/**
* Descriptor for a {@link Parameter} of type {@link DoFn.Element}.
*
Expand Down
Loading
Loading