-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[Java SDK] Infer Beam logical types for JSR-310 and UUID fields #38194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,9 @@ | |
| import java.lang.reflect.Parameter; | ||
| import java.lang.reflect.Type; | ||
| import java.nio.ByteBuffer; | ||
| import java.time.LocalDate; | ||
| import java.time.LocalDateTime; | ||
| import java.time.LocalTime; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.Collection; | ||
|
|
@@ -38,6 +41,7 @@ | |
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.SortedMap; | ||
| import java.util.UUID; | ||
| import net.bytebuddy.ByteBuddy; | ||
| import net.bytebuddy.NamingStrategy; | ||
| import net.bytebuddy.NamingStrategy.SuffixingRandom.BaseNameResolver; | ||
|
|
@@ -78,6 +82,9 @@ | |
| import org.apache.beam.sdk.schemas.FieldValueHaver; | ||
| import org.apache.beam.sdk.schemas.FieldValueSetter; | ||
| import org.apache.beam.sdk.schemas.FieldValueTypeInformation; | ||
| import org.apache.beam.sdk.schemas.Schema.LogicalType; | ||
| import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; | ||
| import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; | ||
| import org.apache.beam.sdk.util.Preconditions; | ||
| import org.apache.beam.sdk.util.common.ReflectHelpers; | ||
| import org.apache.beam.sdk.values.TypeDescriptor; | ||
|
|
@@ -120,6 +127,65 @@ public class ByteBuddyUtils { | |
| private static final ForLoadedType ENUM_TYPE = new ForLoadedType(Enum.class); | ||
| private static final ForLoadedType BYTE_BUDDY_UTILS_TYPE = | ||
| new ForLoadedType(ByteBuddyUtils.class); | ||
| private static final ForLoadedType LOGICAL_TYPE_TYPE = new ForLoadedType(LogicalType.class); | ||
|
|
||
| // Static LogicalType instances used by codegen for JSR-310 and UUID POJO/Bean fields. The | ||
| // generated bytecode loads these via FieldAccess and invokes toBaseType / toInputType, so the | ||
| // fields must be public so generated classes in user packages can access them. | ||
| // See javaTimeLogicalTypeFieldName(...) for the type → field name mapping. | ||
| public static final LogicalType<LocalDate, Long> JAVA_LOCAL_DATE_LOGICAL_TYPE = SqlTypes.DATE; | ||
| public static final LogicalType<LocalTime, Long> JAVA_LOCAL_TIME_LOGICAL_TYPE = SqlTypes.TIME; | ||
| public static final LogicalType<LocalDateTime, org.apache.beam.sdk.values.Row> | ||
| JAVA_LOCAL_DATE_TIME_LOGICAL_TYPE = SqlTypes.DATETIME; | ||
| public static final LogicalType<java.time.Instant, org.apache.beam.sdk.values.Row> | ||
| JAVA_INSTANT_LOGICAL_TYPE = new NanosInstant(); | ||
| public static final LogicalType<UUID, org.apache.beam.sdk.values.Row> JAVA_UUID_LOGICAL_TYPE = | ||
| SqlTypes.UUID; | ||
|
|
||
| /** | ||
| * Returns the {@link Schema.LogicalType} that {@link StaticSchemaInference} infers for the given | ||
| * Java raw type, or {@code null} if no JSR-310 / UUID inference applies. | ||
| */ | ||
| static @Nullable LogicalType<?, ?> javaTimeLogicalTypeFor(Class<?> rawType) { | ||
| if (LocalDate.class.equals(rawType)) { | ||
| return JAVA_LOCAL_DATE_LOGICAL_TYPE; | ||
| } else if (LocalTime.class.equals(rawType)) { | ||
| return JAVA_LOCAL_TIME_LOGICAL_TYPE; | ||
| } else if (LocalDateTime.class.equals(rawType)) { | ||
| return JAVA_LOCAL_DATE_TIME_LOGICAL_TYPE; | ||
| } else if (java.time.Instant.class.equals(rawType)) { | ||
| return JAVA_INSTANT_LOGICAL_TYPE; | ||
| } else if (UUID.class.equals(rawType)) { | ||
| return JAVA_UUID_LOGICAL_TYPE; | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| /** Maps a Java raw type to the static field name in {@link ByteBuddyUtils} that holds it. */ | ||
| private static String javaTimeLogicalTypeFieldName(Class<?> rawType) { | ||
| if (LocalDate.class.equals(rawType)) { | ||
| return "JAVA_LOCAL_DATE_LOGICAL_TYPE"; | ||
| } else if (LocalTime.class.equals(rawType)) { | ||
| return "JAVA_LOCAL_TIME_LOGICAL_TYPE"; | ||
| } else if (LocalDateTime.class.equals(rawType)) { | ||
| return "JAVA_LOCAL_DATE_TIME_LOGICAL_TYPE"; | ||
| } else if (java.time.Instant.class.equals(rawType)) { | ||
| return "JAVA_INSTANT_LOGICAL_TYPE"; | ||
| } else if (UUID.class.equals(rawType)) { | ||
| return "JAVA_UUID_LOGICAL_TYPE"; | ||
| } | ||
| throw new IllegalArgumentException("Not a JSR-310/UUID inferred type: " + rawType); | ||
| } | ||
|
|
||
| /** Stack manipulation that pushes the static {@link LogicalType} for the given Java type. */ | ||
| private static StackManipulation loadJavaTimeLogicalType(Class<?> rawType) { | ||
| return FieldAccess.forField( | ||
| BYTE_BUDDY_UTILS_TYPE | ||
| .getDeclaredFields() | ||
| .filter(ElementMatchers.named(javaTimeLogicalTypeFieldName(rawType))) | ||
| .getOnly()) | ||
| .read(); | ||
| } | ||
|
|
||
| /** | ||
| * A naming strategy for ByteBuddy classes. | ||
|
|
@@ -286,6 +352,8 @@ public T convert(TypeDescriptor<?> typeDescriptor) { | |
| return convertDateTime(typeDescriptor); | ||
| } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ReadablePartial.class))) { | ||
| return convertDateTime(typeDescriptor); | ||
| } else if (javaTimeLogicalTypeFor(typeDescriptor.getRawType()) != null) { | ||
| return convertJavaTimeLogicalType(typeDescriptor); | ||
| } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) { | ||
| return convertByteBuffer(typeDescriptor); | ||
| } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(CharSequence.class))) { | ||
|
|
@@ -324,6 +392,14 @@ protected StackManipulation shortCircuitReturnNull( | |
|
|
||
| protected abstract T convertDateTime(TypeDescriptor<?> type); | ||
|
|
||
| /** | ||
| * Handles JSR-310 ({@link LocalDate}, {@link LocalTime}, {@link LocalDateTime}, {@link | ||
| * java.time.Instant}) and {@link UUID} fields, which {@link StaticSchemaInference} infers as | ||
| * Beam {@link LogicalType}s. Subclasses emit code that round-trips through the corresponding | ||
| * static {@link LogicalType} instance ({@link #JAVA_LOCAL_DATE_LOGICAL_TYPE} etc.). | ||
| */ | ||
| protected abstract T convertJavaTimeLogicalType(TypeDescriptor<?> type); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The addition of the abstract method For |
||
|
|
||
| protected abstract T convertByteBuffer(TypeDescriptor<?> type); | ||
|
|
||
| protected abstract T convertCharSequence(TypeDescriptor<?> type); | ||
|
|
@@ -401,6 +477,15 @@ protected Type convertDateTime(TypeDescriptor<?> type) { | |
| return Instant.class; | ||
| } | ||
|
|
||
| @Override | ||
| protected Type convertJavaTimeLogicalType(TypeDescriptor<?> type) { | ||
| // The codegen-generated getter returns the LogicalType's base value (Long for | ||
| // Date/Time, Row for DateTime/NanosInstant/UUID). Object.class is a safe upper bound | ||
| // for the FieldValueGetter signature; the framework's GetLogicalInputType wrapper | ||
| // converts back to the input type before exposing the value to user code. | ||
| return Object.class; | ||
| } | ||
|
|
||
| @Override | ||
| protected Type convertByteBuffer(TypeDescriptor<?> type) { | ||
| return byte[].class; | ||
|
|
@@ -915,6 +1000,26 @@ protected StackManipulation convertDateTime(TypeDescriptor<?> type) { | |
| return new ShortCircuitReturnNull(readValue, stackManipulation); | ||
| } | ||
|
|
||
| @Override | ||
| protected StackManipulation convertJavaTimeLogicalType(TypeDescriptor<?> type) { | ||
| // Equivalent code: return STATIC_LOGICAL_TYPE.toBaseType(value); | ||
| // where STATIC_LOGICAL_TYPE is one of the JAVA_*_LOGICAL_TYPE static fields on | ||
| // ByteBuddyUtils. The base type is Long (for LocalDate, LocalTime) or Row (for the | ||
| // others); both are reference types so no boxing/casting is needed beyond the invoke. | ||
| StackManipulation stackManipulation = | ||
| new Compound( | ||
| loadJavaTimeLogicalType(type.getRawType()), | ||
| readValue, | ||
| MethodInvocation.invoke( | ||
| LOGICAL_TYPE_TYPE | ||
| .getDeclaredMethods() | ||
| .filter( | ||
| ElementMatchers.named("toBaseType") | ||
| .and(ElementMatchers.takesArguments(1))) | ||
| .getOnly())); | ||
| return new ShortCircuitReturnNull(readValue, stackManipulation); | ||
| } | ||
|
|
||
| @Override | ||
| protected StackManipulation convertByteBuffer(TypeDescriptor<?> type) { | ||
| // Generate the following code: | ||
|
|
@@ -1361,6 +1466,29 @@ protected StackManipulation convertEnum(TypeDescriptor<?> type) { | |
| return new ShortCircuitReturnNull(readValue, stackManipulation); | ||
| } | ||
|
|
||
| @Override | ||
| protected StackManipulation convertJavaTimeLogicalType(TypeDescriptor<?> type) { | ||
| // Equivalent code: return (JavaType) STATIC_LOGICAL_TYPE.toInputType(value); | ||
| // FromRowUsingCreator already converted the row's input-type value (e.g. LocalDate) | ||
| // to the LogicalType's base value (e.g. Long) before invoking the generated creator, | ||
| // so we receive the base type here and need to project back to the POJO field's | ||
| // Java type. | ||
| ForLoadedType loadedType = new ForLoadedType(type.getRawType()); | ||
| StackManipulation stackManipulation = | ||
| new Compound( | ||
| loadJavaTimeLogicalType(type.getRawType()), | ||
| readValue, | ||
| MethodInvocation.invoke( | ||
| LOGICAL_TYPE_TYPE | ||
| .getDeclaredMethods() | ||
| .filter( | ||
| ElementMatchers.named("toInputType") | ||
| .and(ElementMatchers.takesArguments(1))) | ||
| .getOnly()), | ||
| TypeCasting.to(loadedType)); | ||
| return new ShortCircuitReturnNull(readValue, stackManipulation); | ||
| } | ||
|
|
||
| @Override | ||
| protected StackManipulation convertDefault(TypeDescriptor<?> type) { | ||
| return readValue; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if we can expand the naming of this logic to include other LogicalTypes in the future? Instead of restricting it to just Time types. We already include UUID, might as well open it up to other LogicalTypes?