Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import java.lang.reflect.Parameter;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -38,6 +41,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.UUID;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.NamingStrategy;
import net.bytebuddy.NamingStrategy.SuffixingRandom.BaseNameResolver;
Expand Down Expand Up @@ -78,6 +82,9 @@
import org.apache.beam.sdk.schemas.FieldValueHaver;
import org.apache.beam.sdk.schemas.FieldValueSetter;
import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.TypeDescriptor;
Expand Down Expand Up @@ -120,6 +127,65 @@ public class ByteBuddyUtils {
private static final ForLoadedType ENUM_TYPE = new ForLoadedType(Enum.class);
private static final ForLoadedType BYTE_BUDDY_UTILS_TYPE =
new ForLoadedType(ByteBuddyUtils.class);
private static final ForLoadedType LOGICAL_TYPE_TYPE = new ForLoadedType(LogicalType.class);

// Static LogicalType instances used by codegen for JSR-310 and UUID POJO/Bean fields. The
// generated bytecode loads these via FieldAccess and invokes toBaseType / toInputType, so the
// fields must be public so generated classes in user packages can access them.
// See javaTimeLogicalTypeFieldName(...) for the type → field name mapping.
public static final LogicalType<LocalDate, Long> JAVA_LOCAL_DATE_LOGICAL_TYPE = SqlTypes.DATE;
public static final LogicalType<LocalTime, Long> JAVA_LOCAL_TIME_LOGICAL_TYPE = SqlTypes.TIME;
public static final LogicalType<LocalDateTime, org.apache.beam.sdk.values.Row>
JAVA_LOCAL_DATE_TIME_LOGICAL_TYPE = SqlTypes.DATETIME;
public static final LogicalType<java.time.Instant, org.apache.beam.sdk.values.Row>
JAVA_INSTANT_LOGICAL_TYPE = new NanosInstant();
public static final LogicalType<UUID, org.apache.beam.sdk.values.Row> JAVA_UUID_LOGICAL_TYPE =
SqlTypes.UUID;

/**
* Returns the {@link Schema.LogicalType} that {@link StaticSchemaInference} infers for the given
* Java raw type, or {@code null} if no JSR-310 / UUID inference applies.
*/
static @Nullable LogicalType<?, ?> javaTimeLogicalTypeFor(Class<?> rawType) {
if (LocalDate.class.equals(rawType)) {
return JAVA_LOCAL_DATE_LOGICAL_TYPE;
} else if (LocalTime.class.equals(rawType)) {
return JAVA_LOCAL_TIME_LOGICAL_TYPE;
} else if (LocalDateTime.class.equals(rawType)) {
return JAVA_LOCAL_DATE_TIME_LOGICAL_TYPE;
} else if (java.time.Instant.class.equals(rawType)) {
return JAVA_INSTANT_LOGICAL_TYPE;
} else if (UUID.class.equals(rawType)) {
return JAVA_UUID_LOGICAL_TYPE;
}
return null;
}

/** Maps a Java raw type to the static field name in {@link ByteBuddyUtils} that holds it. */
private static String javaTimeLogicalTypeFieldName(Class<?> rawType) {
if (LocalDate.class.equals(rawType)) {
return "JAVA_LOCAL_DATE_LOGICAL_TYPE";
} else if (LocalTime.class.equals(rawType)) {
return "JAVA_LOCAL_TIME_LOGICAL_TYPE";
} else if (LocalDateTime.class.equals(rawType)) {
return "JAVA_LOCAL_DATE_TIME_LOGICAL_TYPE";
} else if (java.time.Instant.class.equals(rawType)) {
return "JAVA_INSTANT_LOGICAL_TYPE";
} else if (UUID.class.equals(rawType)) {
return "JAVA_UUID_LOGICAL_TYPE";
}
throw new IllegalArgumentException("Not a JSR-310/UUID inferred type: " + rawType);
}

/** Stack manipulation that pushes the static {@link LogicalType} for the given Java type. */
private static StackManipulation loadJavaTimeLogicalType(Class<?> rawType) {
return FieldAccess.forField(
BYTE_BUDDY_UTILS_TYPE
.getDeclaredFields()
.filter(ElementMatchers.named(javaTimeLogicalTypeFieldName(rawType)))
.getOnly())
.read();
}

/**
* A naming strategy for ByteBuddy classes.
Expand Down Expand Up @@ -286,6 +352,8 @@ public T convert(TypeDescriptor<?> typeDescriptor) {
return convertDateTime(typeDescriptor);
} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ReadablePartial.class))) {
return convertDateTime(typeDescriptor);
} else if (javaTimeLogicalTypeFor(typeDescriptor.getRawType()) != null) {
return convertJavaTimeLogicalType(typeDescriptor);
Comment on lines +355 to +356
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we can expand the naming of this logic to include other LogicalTypes in the future? Instead of restricting it to just Time types. We already include UUID, might as well open it up to other LogicalTypes?

} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) {
return convertByteBuffer(typeDescriptor);
} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(CharSequence.class))) {
Expand Down Expand Up @@ -324,6 +392,14 @@ protected StackManipulation shortCircuitReturnNull(

protected abstract T convertDateTime(TypeDescriptor<?> type);

/**
* Handles JSR-310 ({@link LocalDate}, {@link LocalTime}, {@link LocalDateTime}, {@link
* java.time.Instant}) and {@link UUID} fields, which {@link StaticSchemaInference} infers as
* Beam {@link LogicalType}s. Subclasses emit code that round-trips through the corresponding
* static {@link LogicalType} instance ({@link #JAVA_LOCAL_DATE_LOGICAL_TYPE} etc.).
*/
protected abstract T convertJavaTimeLogicalType(TypeDescriptor<?> type);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The addition of the abstract method convertJavaTimeLogicalType to the TypeConversion class requires an implementation in all of its subclasses to avoid compilation errors. While you have provided implementations for GetterTypeConversion, GetterPropertyConversion, and SetterPropertyConversion, you appear to have missed SetterTypeConversion and CreatorParameterTypeConversion. Both of these subclasses also extend TypeConversion and must implement this new method.

For SetterTypeConversion and CreatorParameterTypeConversion, returning Object.class (similar to GetterTypeConversion) should be appropriate as the generated code handles the specific casting in the property conversion path.


protected abstract T convertByteBuffer(TypeDescriptor<?> type);

protected abstract T convertCharSequence(TypeDescriptor<?> type);
Expand Down Expand Up @@ -401,6 +477,15 @@ protected Type convertDateTime(TypeDescriptor<?> type) {
return Instant.class;
}

@Override
protected Type convertJavaTimeLogicalType(TypeDescriptor<?> type) {
// The codegen-generated getter returns the LogicalType's base value (Long for
// Date/Time, Row for DateTime/NanosInstant/UUID). Object.class is a safe upper bound
// for the FieldValueGetter signature; the framework's GetLogicalInputType wrapper
// converts back to the input type before exposing the value to user code.
return Object.class;
}

@Override
protected Type convertByteBuffer(TypeDescriptor<?> type) {
return byte[].class;
Expand Down Expand Up @@ -915,6 +1000,26 @@ protected StackManipulation convertDateTime(TypeDescriptor<?> type) {
return new ShortCircuitReturnNull(readValue, stackManipulation);
}

@Override
protected StackManipulation convertJavaTimeLogicalType(TypeDescriptor<?> type) {
// Equivalent code: return STATIC_LOGICAL_TYPE.toBaseType(value);
// where STATIC_LOGICAL_TYPE is one of the JAVA_*_LOGICAL_TYPE static fields on
// ByteBuddyUtils. The base type is Long (for LocalDate, LocalTime) or Row (for the
// others); both are reference types so no boxing/casting is needed beyond the invoke.
StackManipulation stackManipulation =
new Compound(
loadJavaTimeLogicalType(type.getRawType()),
readValue,
MethodInvocation.invoke(
LOGICAL_TYPE_TYPE
.getDeclaredMethods()
.filter(
ElementMatchers.named("toBaseType")
.and(ElementMatchers.takesArguments(1)))
.getOnly()));
return new ShortCircuitReturnNull(readValue, stackManipulation);
}

@Override
protected StackManipulation convertByteBuffer(TypeDescriptor<?> type) {
// Generate the following code:
Expand Down Expand Up @@ -1361,6 +1466,29 @@ protected StackManipulation convertEnum(TypeDescriptor<?> type) {
return new ShortCircuitReturnNull(readValue, stackManipulation);
}

@Override
protected StackManipulation convertJavaTimeLogicalType(TypeDescriptor<?> type) {
// Equivalent code: return (JavaType) STATIC_LOGICAL_TYPE.toInputType(value);
// FromRowUsingCreator already converted the row's input-type value (e.g. LocalDate)
// to the LogicalType's base value (e.g. Long) before invoking the generated creator,
// so we receive the base type here and need to project back to the POJO field's
// Java type.
ForLoadedType loadedType = new ForLoadedType(type.getRawType());
StackManipulation stackManipulation =
new Compound(
loadJavaTimeLogicalType(type.getRawType()),
readValue,
MethodInvocation.invoke(
LOGICAL_TYPE_TYPE
.getDeclaredMethods()
.filter(
ElementMatchers.named("toInputType")
.and(ElementMatchers.takesArguments(1)))
.getOnly()),
TypeCasting.to(loadedType));
return new ShortCircuitReturnNull(readValue, stackManipulation);
}

@Override
protected StackManipulation convertDefault(TypeDescriptor<?> type) {
return readValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,23 @@
import java.lang.reflect.ParameterizedType;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.ReadableInstant;
Expand Down Expand Up @@ -127,7 +133,8 @@ public static Schema.FieldType fieldFromType(
return fieldFromType(type, fieldValueTypeSupplier, new HashMap<>());
}

// TODO(https://github.com/apache/beam/issues/21567): support type inference for logical types
// TODO(https://github.com/apache/beam/issues/21567): support inference for additional/custom
// logical types
private static Schema.FieldType fieldFromType(
TypeDescriptor type,
FieldValueTypeSupplier fieldValueTypeSupplier,
Expand Down Expand Up @@ -177,6 +184,16 @@ private static Schema.FieldType fieldFromType(
return FieldType.STRING;
} else if (type.isSubtypeOf(TypeDescriptor.of(ReadableInstant.class))) {
return FieldType.DATETIME;
} else if (type.getRawType().equals(LocalDate.class)) {
return FieldType.logicalType(SqlTypes.DATE);
} else if (type.getRawType().equals(LocalTime.class)) {
return FieldType.logicalType(SqlTypes.TIME);
} else if (type.getRawType().equals(LocalDateTime.class)) {
return FieldType.logicalType(SqlTypes.DATETIME);
} else if (type.getRawType().equals(java.time.Instant.class)) {
return FieldType.logicalType(new NanosInstant());
} else if (type.getRawType().equals(UUID.class)) {
return FieldType.logicalType(SqlTypes.UUID);
} else if (type.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) {
return FieldType.BYTES;
} else if (type.isSubtypeOf(TypeDescriptor.of(Iterable.class))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.CASE_FORMAT_BEAM_SCHEMA;
import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.FIELD_WITH_DESCRIPTION_BEAN_SCHEMA;
import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.ITERABLE_BEAM_SCHEMA;
import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.JAVA_TIME_BEAN_SCHEMA;
import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_ARRAYS_BEAM_SCHEMA;
import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_ARRAY_BEAN_SCHEMA;
import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_BEAN_SCHEMA;
Expand All @@ -46,9 +47,15 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.AllNullableBean;
Expand All @@ -57,6 +64,7 @@
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithNoCreateOption;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithRenamedFieldsAndSetters;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.IterableBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.JavaTimeBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.MismatchingNullableBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArrayBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArraysBean;
Expand Down Expand Up @@ -179,6 +187,81 @@ public void testFromRow() throws NoSuchSchemaException {
assertEquals("stringbuilder", bean.getStringBuilder().toString());
}

@Test
public void testJavaTimeSchema() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
Schema schema = registry.getSchema(JavaTimeBean.class);
SchemaTestUtils.assertSchemaEquivalent(JAVA_TIME_BEAN_SCHEMA, schema);
Schema icebergStyleSchema =
Schema.builder()
.addLogicalTypeField("localDate", SqlTypes.DATE)
.addLogicalTypeField("localTime", SqlTypes.TIME)
.addLogicalTypeField("localDateTime", SqlTypes.DATETIME)
.addLogicalTypeField("instant", new NanosInstant())
.addLogicalTypeField("uuid", SqlTypes.UUID)
.build();
assertTrue(schema.assignableToIgnoreNullable(icebergStyleSchema));
}

@Test
public void testJavaTimeToRow() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
JavaTimeBean bean = new JavaTimeBean();
bean.setLocalDate(LocalDate.of(2024, 1, 15));
bean.setLocalTime(LocalTime.of(10, 30, 45));
bean.setLocalDateTime(LocalDateTime.of(2024, 1, 15, 10, 30, 45));
bean.setInstant(java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L));
bean.setUuid(UUID.fromString("11111111-2222-3333-4444-555555555555"));

Row row = registry.getToRowFunction(JavaTimeBean.class).apply(bean);

assertEquals(5, row.getFieldCount());
assertEquals(bean.getLocalDate(), row.getLogicalTypeValue("localDate", LocalDate.class));
assertEquals(bean.getLocalTime(), row.getLogicalTypeValue("localTime", LocalTime.class));
assertEquals(
bean.getLocalDateTime(), row.getLogicalTypeValue("localDateTime", LocalDateTime.class));
assertEquals(bean.getInstant(), row.getLogicalTypeValue("instant", java.time.Instant.class));
assertEquals(bean.getUuid(), row.getLogicalTypeValue("uuid", UUID.class));
}

@Test
public void testJavaTimeFromRow() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
LocalDate localDate = LocalDate.of(2024, 1, 15);
LocalTime localTime = LocalTime.of(10, 30, 45);
LocalDateTime localDateTime = LocalDateTime.of(2024, 1, 15, 10, 30, 45);
java.time.Instant instant = java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L);
UUID uuid = UUID.fromString("11111111-2222-3333-4444-555555555555");
Row row =
Row.withSchema(JAVA_TIME_BEAN_SCHEMA)
.addValues(localDate, localTime, localDateTime, instant, uuid)
.build();

JavaTimeBean bean = registry.getFromRowFunction(JavaTimeBean.class).apply(row);

assertEquals(localDate, bean.getLocalDate());
assertEquals(localTime, bean.getLocalTime());
assertEquals(localDateTime, bean.getLocalDateTime());
assertEquals(instant, bean.getInstant());
assertEquals(uuid, bean.getUuid());
}

@Test
public void testJavaTimeRoundTrip() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
JavaTimeBean original = new JavaTimeBean();
original.setLocalDate(LocalDate.of(2024, 1, 15));
original.setLocalTime(LocalTime.of(10, 30, 45));
original.setLocalDateTime(LocalDateTime.of(2024, 1, 15, 10, 30, 45));
original.setInstant(java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L));
original.setUuid(UUID.fromString("11111111-2222-3333-4444-555555555555"));

Row row = registry.getToRowFunction(JavaTimeBean.class).apply(original);
JavaTimeBean roundTripped = registry.getFromRowFunction(JavaTimeBean.class).apply(row);

assertEquals(original, roundTripped);
}

@Test
public void testNullableToRow() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
Expand Down
Loading
Loading