From 5e412185dd31a1fe35b424d308db55d48ba426fb Mon Sep 17 00:00:00 2001 From: Sachin <52783123+sachinnn99@users.noreply.github.com> Date: Tue, 14 Apr 2026 23:54:51 +0530 Subject: [PATCH 1/2] [Java SDK] Infer Beam logical types for JSR-310 and UUID fields in JavaFieldSchema MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit StaticSchemaInference.fieldFromType() now maps java.time.LocalDate, LocalTime, LocalDateTime, java.time.Instant, and java.util.UUID to their corresponding Beam logical types (SqlTypes.DATE, TIME, DATETIME, NanosInstant, and SqlTypes.UUID). ByteBuddyUtils gains convertJavaTimeLogicalType() in both getter and setter paths so that POJO/Bean ↔ Row round-trips go through LogicalType.toBaseType / toInputType. Fixes #37524 --- .../sdk/schemas/utils/ByteBuddyUtils.java | 128 ++++++++++++++++++ .../schemas/utils/StaticSchemaInference.java | 19 ++- .../beam/sdk/schemas/JavaBeanSchemaTest.java | 83 ++++++++++++ .../beam/sdk/schemas/JavaFieldSchemaTest.java | 122 +++++++++++++++++ .../sdk/schemas/transforms/ConvertTest.java | 99 ++++++++++++++ .../sdk/schemas/utils/JavaBeanUtilsTest.java | 10 ++ .../beam/sdk/schemas/utils/POJOUtilsTest.java | 32 +++++ .../beam/sdk/schemas/utils/TestJavaBeans.java | 89 ++++++++++++ .../beam/sdk/schemas/utils/TestPOJOs.java | 118 ++++++++++++++++ 9 files changed, 699 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java index 832090926919..5e671627ee35 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java @@ -28,6 +28,9 @@ import java.lang.reflect.Parameter; import java.lang.reflect.Type; import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -38,6 +41,7 @@ import java.util.Optional; import java.util.Set; import java.util.SortedMap; +import java.util.UUID; import net.bytebuddy.ByteBuddy; import net.bytebuddy.NamingStrategy; import net.bytebuddy.NamingStrategy.SuffixingRandom.BaseNameResolver; @@ -78,6 +82,9 @@ import org.apache.beam.sdk.schemas.FieldValueHaver; import org.apache.beam.sdk.schemas.FieldValueSetter; import org.apache.beam.sdk.schemas.FieldValueTypeInformation; +import org.apache.beam.sdk.schemas.Schema.LogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.sdk.values.TypeDescriptor; @@ -120,6 +127,65 @@ public class ByteBuddyUtils { private static final ForLoadedType ENUM_TYPE = new ForLoadedType(Enum.class); private static final ForLoadedType BYTE_BUDDY_UTILS_TYPE = new ForLoadedType(ByteBuddyUtils.class); + private static final ForLoadedType LOGICAL_TYPE_TYPE = new ForLoadedType(LogicalType.class); + + // Static LogicalType instances used by codegen for JSR-310 and UUID POJO/Bean fields. The + // generated bytecode loads these via FieldAccess and invokes toBaseType / toInputType, so the + // fields must be public so generated classes in user packages can access them. + // See javaTimeLogicalTypeFieldName(...) for the type → field name mapping. + public static final LogicalType JAVA_LOCAL_DATE_LOGICAL_TYPE = SqlTypes.DATE; + public static final LogicalType JAVA_LOCAL_TIME_LOGICAL_TYPE = SqlTypes.TIME; + public static final LogicalType + JAVA_LOCAL_DATE_TIME_LOGICAL_TYPE = SqlTypes.DATETIME; + public static final LogicalType + JAVA_INSTANT_LOGICAL_TYPE = new NanosInstant(); + public static final LogicalType JAVA_UUID_LOGICAL_TYPE = + SqlTypes.UUID; + + /** + * Returns the {@link Schema.LogicalType} that {@link StaticSchemaInference} infers for the given + * Java raw type, or {@code null} if no JSR-310 / UUID inference applies. + */ + static @Nullable LogicalType javaTimeLogicalTypeFor(Class rawType) { + if (LocalDate.class.equals(rawType)) { + return JAVA_LOCAL_DATE_LOGICAL_TYPE; + } else if (LocalTime.class.equals(rawType)) { + return JAVA_LOCAL_TIME_LOGICAL_TYPE; + } else if (LocalDateTime.class.equals(rawType)) { + return JAVA_LOCAL_DATE_TIME_LOGICAL_TYPE; + } else if (java.time.Instant.class.equals(rawType)) { + return JAVA_INSTANT_LOGICAL_TYPE; + } else if (UUID.class.equals(rawType)) { + return JAVA_UUID_LOGICAL_TYPE; + } + return null; + } + + /** Maps a Java raw type to the static field name in {@link ByteBuddyUtils} that holds it. */ + private static String javaTimeLogicalTypeFieldName(Class rawType) { + if (LocalDate.class.equals(rawType)) { + return "JAVA_LOCAL_DATE_LOGICAL_TYPE"; + } else if (LocalTime.class.equals(rawType)) { + return "JAVA_LOCAL_TIME_LOGICAL_TYPE"; + } else if (LocalDateTime.class.equals(rawType)) { + return "JAVA_LOCAL_DATE_TIME_LOGICAL_TYPE"; + } else if (java.time.Instant.class.equals(rawType)) { + return "JAVA_INSTANT_LOGICAL_TYPE"; + } else if (UUID.class.equals(rawType)) { + return "JAVA_UUID_LOGICAL_TYPE"; + } + throw new IllegalArgumentException("Not a JSR-310/UUID inferred type: " + rawType); + } + + /** Stack manipulation that pushes the static {@link LogicalType} for the given Java type. */ + private static StackManipulation loadJavaTimeLogicalType(Class rawType) { + return FieldAccess.forField( + BYTE_BUDDY_UTILS_TYPE + .getDeclaredFields() + .filter(ElementMatchers.named(javaTimeLogicalTypeFieldName(rawType))) + .getOnly()) + .read(); + } /** * A naming strategy for ByteBuddy classes. @@ -286,6 +352,8 @@ public T convert(TypeDescriptor typeDescriptor) { return convertDateTime(typeDescriptor); } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ReadablePartial.class))) { return convertDateTime(typeDescriptor); + } else if (javaTimeLogicalTypeFor(typeDescriptor.getRawType()) != null) { + return convertJavaTimeLogicalType(typeDescriptor); } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) { return convertByteBuffer(typeDescriptor); } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(CharSequence.class))) { @@ -324,6 +392,14 @@ protected StackManipulation shortCircuitReturnNull( protected abstract T convertDateTime(TypeDescriptor type); + /** + * Handles JSR-310 ({@link LocalDate}, {@link LocalTime}, {@link LocalDateTime}, {@link + * java.time.Instant}) and {@link UUID} fields, which {@link StaticSchemaInference} infers as + * Beam {@link LogicalType}s. Subclasses emit code that round-trips through the corresponding + * static {@link LogicalType} instance ({@link #JAVA_LOCAL_DATE_LOGICAL_TYPE} etc.). + */ + protected abstract T convertJavaTimeLogicalType(TypeDescriptor type); + protected abstract T convertByteBuffer(TypeDescriptor type); protected abstract T convertCharSequence(TypeDescriptor type); @@ -401,6 +477,15 @@ protected Type convertDateTime(TypeDescriptor type) { return Instant.class; } + @Override + protected Type convertJavaTimeLogicalType(TypeDescriptor type) { + // The codegen-generated getter returns the LogicalType's base value (Long for + // Date/Time, Row for DateTime/NanosInstant/UUID). Object.class is a safe upper bound + // for the FieldValueGetter signature; the framework's GetLogicalInputType wrapper + // converts back to the input type before exposing the value to user code. + return Object.class; + } + @Override protected Type convertByteBuffer(TypeDescriptor type) { return byte[].class; @@ -915,6 +1000,26 @@ protected StackManipulation convertDateTime(TypeDescriptor type) { return new ShortCircuitReturnNull(readValue, stackManipulation); } + @Override + protected StackManipulation convertJavaTimeLogicalType(TypeDescriptor type) { + // Equivalent code: return STATIC_LOGICAL_TYPE.toBaseType(value); + // where STATIC_LOGICAL_TYPE is one of the JAVA_*_LOGICAL_TYPE static fields on + // ByteBuddyUtils. The base type is Long (for LocalDate, LocalTime) or Row (for the + // others); both are reference types so no boxing/casting is needed beyond the invoke. + StackManipulation stackManipulation = + new Compound( + loadJavaTimeLogicalType(type.getRawType()), + readValue, + MethodInvocation.invoke( + LOGICAL_TYPE_TYPE + .getDeclaredMethods() + .filter( + ElementMatchers.named("toBaseType") + .and(ElementMatchers.takesArguments(1))) + .getOnly())); + return new ShortCircuitReturnNull(readValue, stackManipulation); + } + @Override protected StackManipulation convertByteBuffer(TypeDescriptor type) { // Generate the following code: @@ -1361,6 +1466,29 @@ protected StackManipulation convertEnum(TypeDescriptor type) { return new ShortCircuitReturnNull(readValue, stackManipulation); } + @Override + protected StackManipulation convertJavaTimeLogicalType(TypeDescriptor type) { + // Equivalent code: return (JavaType) STATIC_LOGICAL_TYPE.toInputType(value); + // FromRowUsingCreator already converted the row's input-type value (e.g. LocalDate) + // to the LogicalType's base value (e.g. Long) before invoking the generated creator, + // so we receive the base type here and need to project back to the POJO field's + // Java type. + ForLoadedType loadedType = new ForLoadedType(type.getRawType()); + StackManipulation stackManipulation = + new Compound( + loadJavaTimeLogicalType(type.getRawType()), + readValue, + MethodInvocation.invoke( + LOGICAL_TYPE_TYPE + .getDeclaredMethods() + .filter( + ElementMatchers.named("toInputType") + .and(ElementMatchers.takesArguments(1))) + .getOnly()), + TypeCasting.to(loadedType)); + return new ShortCircuitReturnNull(readValue, stackManipulation); + } + @Override protected StackManipulation convertDefault(TypeDescriptor type) { return readValue; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java index 196ee6f86593..ffd07071679d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java @@ -22,17 +22,23 @@ import java.lang.reflect.ParameterizedType; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.FieldValueTypeInformation; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.joda.time.ReadableInstant; @@ -127,7 +133,8 @@ public static Schema.FieldType fieldFromType( return fieldFromType(type, fieldValueTypeSupplier, new HashMap<>()); } - // TODO(https://github.com/apache/beam/issues/21567): support type inference for logical types + // TODO(https://github.com/apache/beam/issues/21567): support inference for additional/custom + // logical types private static Schema.FieldType fieldFromType( TypeDescriptor type, FieldValueTypeSupplier fieldValueTypeSupplier, @@ -177,6 +184,16 @@ private static Schema.FieldType fieldFromType( return FieldType.STRING; } else if (type.isSubtypeOf(TypeDescriptor.of(ReadableInstant.class))) { return FieldType.DATETIME; + } else if (type.getRawType().equals(LocalDate.class)) { + return FieldType.logicalType(SqlTypes.DATE); + } else if (type.getRawType().equals(LocalTime.class)) { + return FieldType.logicalType(SqlTypes.TIME); + } else if (type.getRawType().equals(LocalDateTime.class)) { + return FieldType.logicalType(SqlTypes.DATETIME); + } else if (type.getRawType().equals(java.time.Instant.class)) { + return FieldType.logicalType(new NanosInstant()); + } else if (type.getRawType().equals(UUID.class)) { + return FieldType.logicalType(SqlTypes.UUID); } else if (type.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) { return FieldType.BYTES; } else if (type.isSubtypeOf(TypeDescriptor.of(Iterable.class))) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java index df6c9cf18e5a..de0953a0a088 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java @@ -24,6 +24,7 @@ import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.CASE_FORMAT_BEAM_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.FIELD_WITH_DESCRIPTION_BEAN_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.ITERABLE_BEAM_SCHEMA; +import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.JAVA_TIME_BEAN_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_ARRAYS_BEAM_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_ARRAY_BEAN_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_BEAN_SCHEMA; @@ -46,9 +47,15 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.UUID; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.utils.SchemaTestUtils; import org.apache.beam.sdk.schemas.utils.TestJavaBeans; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.AllNullableBean; @@ -57,6 +64,7 @@ import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithNoCreateOption; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithRenamedFieldsAndSetters; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.IterableBean; +import org.apache.beam.sdk.schemas.utils.TestJavaBeans.JavaTimeBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.MismatchingNullableBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArrayBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArraysBean; @@ -179,6 +187,81 @@ public void testFromRow() throws NoSuchSchemaException { assertEquals("stringbuilder", bean.getStringBuilder().toString()); } + @Test + public void testJavaTimeSchema() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + Schema schema = registry.getSchema(JavaTimeBean.class); + SchemaTestUtils.assertSchemaEquivalent(JAVA_TIME_BEAN_SCHEMA, schema); + Schema icebergStyleSchema = + Schema.builder() + .addLogicalTypeField("localDate", SqlTypes.DATE) + .addLogicalTypeField("localTime", SqlTypes.TIME) + .addLogicalTypeField("localDateTime", SqlTypes.DATETIME) + .addLogicalTypeField("instant", new NanosInstant()) + .addLogicalTypeField("uuid", SqlTypes.UUID) + .build(); + assertTrue(schema.assignableToIgnoreNullable(icebergStyleSchema)); + } + + @Test + public void testJavaTimeToRow() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + JavaTimeBean bean = new JavaTimeBean(); + bean.setLocalDate(LocalDate.of(2024, 1, 15)); + bean.setLocalTime(LocalTime.of(10, 30, 45)); + bean.setLocalDateTime(LocalDateTime.of(2024, 1, 15, 10, 30, 45)); + bean.setInstant(java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L)); + bean.setUuid(UUID.fromString("11111111-2222-3333-4444-555555555555")); + + Row row = registry.getToRowFunction(JavaTimeBean.class).apply(bean); + + assertEquals(5, row.getFieldCount()); + assertEquals(bean.getLocalDate(), row.getLogicalTypeValue("localDate", LocalDate.class)); + assertEquals(bean.getLocalTime(), row.getLogicalTypeValue("localTime", LocalTime.class)); + assertEquals( + bean.getLocalDateTime(), row.getLogicalTypeValue("localDateTime", LocalDateTime.class)); + assertEquals(bean.getInstant(), row.getLogicalTypeValue("instant", java.time.Instant.class)); + assertEquals(bean.getUuid(), row.getLogicalTypeValue("uuid", UUID.class)); + } + + @Test + public void testJavaTimeFromRow() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + LocalDate localDate = LocalDate.of(2024, 1, 15); + LocalTime localTime = LocalTime.of(10, 30, 45); + LocalDateTime localDateTime = LocalDateTime.of(2024, 1, 15, 10, 30, 45); + java.time.Instant instant = java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L); + UUID uuid = UUID.fromString("11111111-2222-3333-4444-555555555555"); + Row row = + Row.withSchema(JAVA_TIME_BEAN_SCHEMA) + .addValues(localDate, localTime, localDateTime, instant, uuid) + .build(); + + JavaTimeBean bean = registry.getFromRowFunction(JavaTimeBean.class).apply(row); + + assertEquals(localDate, bean.getLocalDate()); + assertEquals(localTime, bean.getLocalTime()); + assertEquals(localDateTime, bean.getLocalDateTime()); + assertEquals(instant, bean.getInstant()); + assertEquals(uuid, bean.getUuid()); + } + + @Test + public void testJavaTimeRoundTrip() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + JavaTimeBean original = new JavaTimeBean(); + original.setLocalDate(LocalDate.of(2024, 1, 15)); + original.setLocalTime(LocalTime.of(10, 30, 45)); + original.setLocalDateTime(LocalDateTime.of(2024, 1, 15, 10, 30, 45)); + original.setInstant(java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L)); + original.setUuid(UUID.fromString("11111111-2222-3333-4444-555555555555")); + + Row row = registry.getToRowFunction(JavaTimeBean.class).apply(original); + JavaTimeBean roundTripped = registry.getFromRowFunction(JavaTimeBean.class).apply(row); + + assertEquals(original, roundTripped); + } + @Test public void testNullableToRow() throws NoSuchSchemaException { SchemaRegistry registry = SchemaRegistry.createDefault(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java index 7a66decde017..c80b758adc31 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java @@ -21,12 +21,14 @@ import static org.apache.beam.sdk.schemas.utils.TestPOJOs.ANNOTATED_SIMPLE_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.CASE_FORMAT_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.ENUMERATION; +import static org.apache.beam.sdk.schemas.utils.TestPOJOs.JAVA_TIME_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_ARRAYS_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_ARRAY_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_MAP_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_NULLABLE_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NULLABLES_SCHEMA; +import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NULLABLE_JAVA_TIME_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NULLABLE_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.POJO_WITH_ENUM_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.POJO_WITH_ITERABLE; @@ -46,20 +48,28 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.utils.SchemaTestUtils; import org.apache.beam.sdk.schemas.utils.TestPOJOs; import org.apache.beam.sdk.schemas.utils.TestPOJOs.AnnotatedSimplePojo; import org.apache.beam.sdk.schemas.utils.TestPOJOs.FirstCircularNestedPOJO; +import org.apache.beam.sdk.schemas.utils.TestPOJOs.JavaTimePOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArraysPOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedMapPOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedPOJO; +import org.apache.beam.sdk.schemas.utils.TestPOJOs.NullableJavaTimePOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NullablePOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.POJOWithNestedNullable; import org.apache.beam.sdk.schemas.utils.TestPOJOs.POJOWithNullables; @@ -228,6 +238,118 @@ public void testFromRow() throws NoSuchSchemaException { assertEquals("stringbuilder", pojo.stringBuilder.toString()); } + @Test + public void testJavaTimeSchema() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + Schema schema = registry.getSchema(JavaTimePOJO.class); + SchemaTestUtils.assertSchemaEquivalent(JAVA_TIME_POJO_SCHEMA, schema); + // Reproduces the failure mode in #37524: a POJO inferred from JSR-310 fields must be + // assignable to a row schema (e.g. one produced by IcebergIO) that uses the same logical + // types. + Schema icebergStyleSchema = + Schema.builder() + .addLogicalTypeField("localDate", SqlTypes.DATE) + .addLogicalTypeField("localTime", SqlTypes.TIME) + .addLogicalTypeField("localDateTime", SqlTypes.DATETIME) + .addLogicalTypeField("instant", new NanosInstant()) + .addLogicalTypeField("uuid", SqlTypes.UUID) + .build(); + assertTrue(schema.assignableToIgnoreNullable(icebergStyleSchema)); + } + + @Test + public void testJavaTimeToRow() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + LocalDate localDate = LocalDate.of(2024, 1, 15); + LocalTime localTime = LocalTime.of(10, 30, 45); + LocalDateTime localDateTime = LocalDateTime.of(2024, 1, 15, 10, 30, 45); + java.time.Instant instant = java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L); + UUID uuid = UUID.fromString("11111111-2222-3333-4444-555555555555"); + JavaTimePOJO pojo = new JavaTimePOJO(localDate, localTime, localDateTime, instant, uuid); + + Row row = registry.getToRowFunction(JavaTimePOJO.class).apply(pojo); + + assertEquals(5, row.getFieldCount()); + assertEquals(localDate, row.getLogicalTypeValue("localDate", LocalDate.class)); + assertEquals(localTime, row.getLogicalTypeValue("localTime", LocalTime.class)); + assertEquals(localDateTime, row.getLogicalTypeValue("localDateTime", LocalDateTime.class)); + assertEquals(instant, row.getLogicalTypeValue("instant", java.time.Instant.class)); + assertEquals(uuid, row.getLogicalTypeValue("uuid", UUID.class)); + } + + @Test + public void testJavaTimeFromRow() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + LocalDate localDate = LocalDate.of(2024, 1, 15); + LocalTime localTime = LocalTime.of(10, 30, 45); + LocalDateTime localDateTime = LocalDateTime.of(2024, 1, 15, 10, 30, 45); + java.time.Instant instant = java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L); + UUID uuid = UUID.fromString("11111111-2222-3333-4444-555555555555"); + Row row = + Row.withSchema(JAVA_TIME_POJO_SCHEMA) + .addValues(localDate, localTime, localDateTime, instant, uuid) + .build(); + + JavaTimePOJO pojo = registry.getFromRowFunction(JavaTimePOJO.class).apply(row); + + assertEquals(localDate, pojo.localDate); + assertEquals(localTime, pojo.localTime); + assertEquals(localDateTime, pojo.localDateTime); + assertEquals(instant, pojo.instant); + assertEquals(uuid, pojo.uuid); + } + + @Test + public void testJavaTimeRoundTrip() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + JavaTimePOJO original = + new JavaTimePOJO( + LocalDate.of(2024, 1, 15), + LocalTime.of(10, 30, 45), + LocalDateTime.of(2024, 1, 15, 10, 30, 45), + java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L), + UUID.fromString("11111111-2222-3333-4444-555555555555")); + + Row row = registry.getToRowFunction(JavaTimePOJO.class).apply(original); + JavaTimePOJO roundTripped = registry.getFromRowFunction(JavaTimePOJO.class).apply(row); + + assertEquals(original, roundTripped); + } + + @Test + public void testNullableJavaTimeSchema() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + Schema schema = registry.getSchema(NullableJavaTimePOJO.class); + SchemaTestUtils.assertSchemaEquivalent(NULLABLE_JAVA_TIME_POJO_SCHEMA, schema); + } + + @Test + public void testNullableJavaTimeToRow() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + NullableJavaTimePOJO pojo = new NullableJavaTimePOJO(); + Row row = registry.getToRowFunction(NullableJavaTimePOJO.class).apply(pojo); + + assertEquals(5, row.getFieldCount()); + assertNull(row.getLogicalTypeValue("localDate", LocalDate.class)); + assertNull(row.getLogicalTypeValue("localTime", LocalTime.class)); + assertNull(row.getLogicalTypeValue("localDateTime", LocalDateTime.class)); + assertNull(row.getLogicalTypeValue("instant", java.time.Instant.class)); + assertNull(row.getLogicalTypeValue("uuid", UUID.class)); + } + + @Test + public void testNullableJavaTimeFromRow() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + Row row = Row.nullRow(NULLABLE_JAVA_TIME_POJO_SCHEMA); + + NullableJavaTimePOJO pojo = registry.getFromRowFunction(NullableJavaTimePOJO.class).apply(row); + assertNull(pojo.localDate); + assertNull(pojo.localTime); + assertNull(pojo.localDateTime); + assertNull(pojo.instant); + assertNull(pojo.uuid); + } + @Test public void testNullableSchema() throws NoSuchSchemaException { SchemaRegistry registry = SchemaRegistry.createDefault(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java index 93d6984d47e2..c206bd8f61fd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.schemas.transforms; +import java.time.LocalDateTime; import java.util.Arrays; import java.util.Map; import java.util.Objects; @@ -24,6 +25,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -211,6 +213,103 @@ public void testFromRows() { pipeline.run(); } + /** Reproducer for #37524: a Row with a logical-type DateTime field should convert to a POJO. */ + @DefaultSchema(JavaFieldSchema.class) + public static class TimePOJO { + public LocalDateTime time; + + public TimePOJO() {} + + public TimePOJO(LocalDateTime time) { + this.time = time; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TimePOJO)) { + return false; + } + TimePOJO that = (TimePOJO) o; + return Objects.equals(time, that.time); + } + + @Override + public int hashCode() { + return Objects.hash(time); + } + } + + @Test + @Category(NeedsRunner.class) + public void testFromRowsWithLogicalTypeDateTime() { + Schema icebergStyleSchema = + Schema.builder().addLogicalTypeField("time", SqlTypes.DATETIME).build(); + LocalDateTime expected = LocalDateTime.of(2024, 1, 15, 10, 30, 0); + Row inputRow = Row.withSchema(icebergStyleSchema).addValue(expected).build(); + + PCollection pojos = + pipeline + .apply(Create.of(inputRow).withRowSchema(icebergStyleSchema)) + .apply(Convert.fromRows(TimePOJO.class)); + + PAssert.that(pojos).containsInAnyOrder(new TimePOJO(expected)); + pipeline.run(); + } + + /** POJO mixing a logical-type field with a primitive field. */ + @DefaultSchema(JavaFieldSchema.class) + public static class MixedTimePOJO { + public LocalDateTime time; + public String name; + + public MixedTimePOJO() {} + + public MixedTimePOJO(LocalDateTime time, String name) { + this.time = time; + this.name = name; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MixedTimePOJO)) { + return false; + } + MixedTimePOJO that = (MixedTimePOJO) o; + return Objects.equals(time, that.time) && Objects.equals(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hash(time, name); + } + } + + @Test + @Category(NeedsRunner.class) + public void testFromRowsWithMixedLogicalAndPrimitiveTypes() { + Schema mixedSchema = + Schema.builder() + .addLogicalTypeField("time", SqlTypes.DATETIME) + .addStringField("name") + .build(); + LocalDateTime expectedTime = LocalDateTime.of(2024, 1, 15, 10, 30, 0); + Row inputRow = Row.withSchema(mixedSchema).addValues(expectedTime, "hello").build(); + + PCollection pojos = + pipeline + .apply(Create.of(inputRow).withRowSchema(mixedSchema)) + .apply(Convert.fromRows(MixedTimePOJO.class)); + + PAssert.that(pojos).containsInAnyOrder(new MixedTimePOJO(expectedTime, "hello")); + pipeline.run(); + } + @Test @Category(NeedsRunner.class) public void testGeneralConvert() { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java index 7e9cf9a894b9..57d4f905e6ec 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.BEAN_WITH_BOXED_FIELDS_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.BEAN_WITH_BYTE_ARRAY_SCHEMA; +import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.JAVA_TIME_BEAN_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_ARRAY_BEAN_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_BEAN_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_COLLECTION_BEAN_SCHEMA; @@ -44,6 +45,7 @@ import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithBoxedFields; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithByteArray; +import org.apache.beam.sdk.schemas.utils.TestJavaBeans.JavaTimeBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArrayBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedCollectionBean; @@ -79,6 +81,14 @@ public void testSimpleBean() { SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema); } + @Test + public void testJavaTimeBean() { + Schema schema = + JavaBeanUtils.schemaFromJavaBeanClass( + new TypeDescriptor() {}, GetterTypeSupplier.INSTANCE); + SchemaTestUtils.assertSchemaEquivalent(JAVA_TIME_BEAN_SCHEMA, schema); + } + @Test public void testNestedBean() { Schema schema = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java index 6b9fbcd30a27..72407d965da4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.schemas.utils; +import static org.apache.beam.sdk.schemas.utils.TestPOJOs.JAVA_TIME_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_ARRAY_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_COLLECTION_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_MAP_POJO_SCHEMA; @@ -38,7 +39,11 @@ import org.apache.beam.sdk.schemas.FieldValueGetter; import org.apache.beam.sdk.schemas.JavaFieldSchema.JavaFieldTypeSupplier; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory; +import org.apache.beam.sdk.schemas.utils.TestPOJOs.JavaTimePOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedCollectionPOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedMapPOJO; @@ -82,6 +87,33 @@ public void testSimplePOJO() { assertEquals(SIMPLE_POJO_SCHEMA, schema); } + @Test + public void testJavaTimePOJO() { + Schema schema = + POJOUtils.schemaFromPojoClass( + new TypeDescriptor() {}, JavaFieldTypeSupplier.INSTANCE); + assertEquals(JAVA_TIME_POJO_SCHEMA, schema); + } + + /** + * Regression test for #37524: Joda {@link Instant} must continue to map to {@link + * FieldType#DATETIME}, not the new {@code java.time.Instant} logical type. This guards the branch + * ordering in {@code StaticSchemaInference.fieldFromType}. + */ + @Test + public void testJodaInstantStillInfersAsDatetime() { + FieldType jodaInferred = + StaticSchemaInference.fieldFromType( + TypeDescriptor.of(Instant.class), JavaFieldTypeSupplier.INSTANCE); + assertEquals(FieldType.DATETIME, jodaInferred); + + FieldType javaTimeInferred = + StaticSchemaInference.fieldFromType( + TypeDescriptor.of(java.time.Instant.class), JavaFieldTypeSupplier.INSTANCE); + assertEquals(TypeName.LOGICAL_TYPE, javaTimeInferred.getTypeName()); + assertEquals(NanosInstant.IDENTIFIER, javaTimeInferred.getLogicalType().getIdentifier()); + } + @Test public void testNestedPOJO() { Schema schema = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java index 694db05b0918..d8ed86f2b552 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java @@ -19,10 +19,14 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; import org.apache.beam.sdk.schemas.JavaBeanSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -33,6 +37,8 @@ import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.checkerframework.checker.nullness.qual.Nullable; @@ -1398,4 +1404,87 @@ public void setValue(@Nullable Float value) { Schema.Field.nullable("value", FieldType.FLOAT) .withDescription("This value is the value stored in the object as a float.")) .build(); + + /** A Bean containing JSR-310 date/time types and a UUID, all inferred as Beam logical types. */ + @DefaultSchema(JavaBeanSchema.class) + public static class JavaTimeBean { + private LocalDate localDate; + private LocalTime localTime; + private LocalDateTime localDateTime; + private java.time.Instant instant; + private UUID uuid; + + public JavaTimeBean() {} + + public LocalDate getLocalDate() { + return localDate; + } + + public void setLocalDate(LocalDate localDate) { + this.localDate = localDate; + } + + public LocalTime getLocalTime() { + return localTime; + } + + public void setLocalTime(LocalTime localTime) { + this.localTime = localTime; + } + + public LocalDateTime getLocalDateTime() { + return localDateTime; + } + + public void setLocalDateTime(LocalDateTime localDateTime) { + this.localDateTime = localDateTime; + } + + public java.time.Instant getInstant() { + return instant; + } + + public void setInstant(java.time.Instant instant) { + this.instant = instant; + } + + public UUID getUuid() { + return uuid; + } + + public void setUuid(UUID uuid) { + this.uuid = uuid; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof JavaTimeBean)) { + return false; + } + JavaTimeBean that = (JavaTimeBean) o; + return Objects.equals(localDate, that.localDate) + && Objects.equals(localTime, that.localTime) + && Objects.equals(localDateTime, that.localDateTime) + && Objects.equals(instant, that.instant) + && Objects.equals(uuid, that.uuid); + } + + @Override + public int hashCode() { + return Objects.hash(localDate, localTime, localDateTime, instant, uuid); + } + } + + /** The schema for {@link JavaTimeBean}. */ + public static final Schema JAVA_TIME_BEAN_SCHEMA = + Schema.builder() + .addLogicalTypeField("localDate", SqlTypes.DATE) + .addLogicalTypeField("localTime", SqlTypes.TIME) + .addLogicalTypeField("localDateTime", SqlTypes.DATETIME) + .addLogicalTypeField("instant", new NanosInstant()) + .addLogicalTypeField("uuid", SqlTypes.UUID) + .build(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java index b82c4dc0e7e6..38ec507480f7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java @@ -19,10 +19,14 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; import org.apache.beam.sdk.schemas.JavaFieldSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -34,6 +38,8 @@ import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.checkerframework.checker.nullness.qual.Nullable; @@ -1281,4 +1287,116 @@ public int hashCode() { Schema.Field.nullable("str", FieldType.STRING) .withDescription("a simple string that is part of this field")) .build(); + + /** A POJO containing JSR-310 date/time types and a UUID, all inferred as Beam logical types. */ + @DefaultSchema(JavaFieldSchema.class) + public static class JavaTimePOJO { + public LocalDate localDate; + public LocalTime localTime; + public LocalDateTime localDateTime; + public java.time.Instant instant; + public UUID uuid; + + public JavaTimePOJO() {} + + public JavaTimePOJO( + LocalDate localDate, + LocalTime localTime, + LocalDateTime localDateTime, + java.time.Instant instant, + UUID uuid) { + this.localDate = localDate; + this.localTime = localTime; + this.localDateTime = localDateTime; + this.instant = instant; + this.uuid = uuid; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof JavaTimePOJO)) { + return false; + } + JavaTimePOJO that = (JavaTimePOJO) o; + return Objects.equals(localDate, that.localDate) + && Objects.equals(localTime, that.localTime) + && Objects.equals(localDateTime, that.localDateTime) + && Objects.equals(instant, that.instant) + && Objects.equals(uuid, that.uuid); + } + + @Override + public int hashCode() { + return Objects.hash(localDate, localTime, localDateTime, instant, uuid); + } + } + + /** The schema for {@link JavaTimePOJO}. */ + public static final Schema JAVA_TIME_POJO_SCHEMA = + Schema.builder() + .addLogicalTypeField("localDate", SqlTypes.DATE) + .addLogicalTypeField("localTime", SqlTypes.TIME) + .addLogicalTypeField("localDateTime", SqlTypes.DATETIME) + .addLogicalTypeField("instant", new NanosInstant()) + .addLogicalTypeField("uuid", SqlTypes.UUID) + .build(); + + /** A POJO with nullable JSR-310 and UUID fields. */ + @DefaultSchema(JavaFieldSchema.class) + public static class NullableJavaTimePOJO { + public @Nullable LocalDate localDate; + public @Nullable LocalTime localTime; + public @Nullable LocalDateTime localDateTime; + public java.time.@Nullable Instant instant; + public @Nullable UUID uuid; + + public NullableJavaTimePOJO() {} + + public NullableJavaTimePOJO( + LocalDate localDate, + LocalTime localTime, + LocalDateTime localDateTime, + java.time.Instant instant, + UUID uuid) { + this.localDate = localDate; + this.localTime = localTime; + this.localDateTime = localDateTime; + this.instant = instant; + this.uuid = uuid; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof NullableJavaTimePOJO)) { + return false; + } + NullableJavaTimePOJO that = (NullableJavaTimePOJO) o; + return Objects.equals(localDate, that.localDate) + && Objects.equals(localTime, that.localTime) + && Objects.equals(localDateTime, that.localDateTime) + && Objects.equals(instant, that.instant) + && Objects.equals(uuid, that.uuid); + } + + @Override + public int hashCode() { + return Objects.hash(localDate, localTime, localDateTime, instant, uuid); + } + } + + /** The schema for {@link NullableJavaTimePOJO}. */ + public static final Schema NULLABLE_JAVA_TIME_POJO_SCHEMA = + Schema.builder() + .addNullableLogicalTypeField("localDate", SqlTypes.DATE) + .addNullableLogicalTypeField("localTime", SqlTypes.TIME) + .addNullableLogicalTypeField("localDateTime", SqlTypes.DATETIME) + .addNullableLogicalTypeField("instant", new NanosInstant()) + .addNullableLogicalTypeField("uuid", SqlTypes.UUID) + .build(); } From 0efc53649f3b670aa61f4adac921668a66f1d187 Mon Sep 17 00:00:00 2001 From: Sachin <52783123+sachinnn99@users.noreply.github.com> Date: Wed, 29 Apr 2026 11:42:04 +0530 Subject: [PATCH 2/2] [Avro] Override convertJavaTimeLogicalType in Avro TypeConversion subclasses The new convertJavaTimeLogicalType dispatch in TypeConversion.convert() intercepts java.time.Instant and java.time.LocalDate before they reach convertDefault(), where the Avro subclasses had their Joda-bridging logic. This caused AvroSchemaTest.testSpecificRecordToRow (AssertionError) and testRowToSpecificRecord (ClassCastException) to fail. Move the Joda-bridging bytecode from convertDefault() into convertJavaTimeLogicalType() overrides in AvroConvertType, AvroConvertValueForGetter, and AvroConvertValueForSetter. Other JSR-310/UUID types delegate to the base class LogicalType path. --- .../avro/schemas/utils/AvroUtils.java | 87 +++++++++++-------- 1 file changed, 52 insertions(+), 35 deletions(-) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index 457265646420..09cf9ee1be84 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -333,15 +333,20 @@ public AvroConvertType(boolean returnRawType) { } @Override - protected java.lang.reflect.Type convertDefault(TypeDescriptor type) { + protected java.lang.reflect.Type convertJavaTimeLogicalType(TypeDescriptor type) { if (type.isSubtypeOf(TypeDescriptor.of(java.time.Instant.class)) || type.isSubtypeOf(TypeDescriptor.of(java.time.LocalDate.class))) { return convertDateTime(type); - } else if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { + } + return super.convertJavaTimeLogicalType(type); + } + + @Override + protected java.lang.reflect.Type convertDefault(TypeDescriptor type) { + if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { return byte[].class; - } else { - return super.convertDefault(type); } + return super.convertDefault(type); } } @@ -356,18 +361,8 @@ protected TypeConversionsFactory getFactory() { } @Override - protected StackManipulation convertDefault(TypeDescriptor type) { - if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { - // Generate the following code: - // return value.bytes(); - return new Compound( - readValue, - MethodInvocation.invoke( - new ForLoadedType(GenericFixed.class) - .getDeclaredMethods() - .filter(ElementMatchers.named("bytes").and(ElementMatchers.returns(BYTES))) - .getOnly())); - } else if (java.time.Instant.class.isAssignableFrom(type.getRawType())) { + protected StackManipulation convertJavaTimeLogicalType(TypeDescriptor type) { + if (java.time.Instant.class.isAssignableFrom(type.getRawType())) { // Generates the following code: // return Instant.ofEpochMilli(value.toEpochMilli()) StackManipulation onNotNull = @@ -406,6 +401,22 @@ protected StackManipulation convertDefault(TypeDescriptor type) { .getOnly())); return shortCircuitReturnNull(readValue, onNotNull); } + return super.convertJavaTimeLogicalType(type); + } + + @Override + protected StackManipulation convertDefault(TypeDescriptor type) { + if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { + // Generate the following code: + // return value.bytes(); + return new Compound( + readValue, + MethodInvocation.invoke( + new ForLoadedType(GenericFixed.class) + .getDeclaredMethods() + .filter(ElementMatchers.named("bytes").and(ElementMatchers.returns(BYTES))) + .getOnly())); + } return super.convertDefault(type); } } @@ -421,25 +432,8 @@ protected TypeConversionsFactory getFactory() { } @Override - protected StackManipulation convertDefault(TypeDescriptor type) { - if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { - // Generate the following code: - // return new T((byte[]) value); - ForLoadedType loadedType = new ForLoadedType(type.getRawType()); - return new Compound( - TypeCreation.of(loadedType), - Duplication.SINGLE, - // Load the parameter and cast it to a byte[]. - readValue, - TypeCasting.to(BYTES), - // Create a new instance that wraps this byte[]. - MethodInvocation.invoke( - loadedType - .getDeclaredMethods() - .filter( - ElementMatchers.isConstructor().and(ElementMatchers.takesArguments(BYTES))) - .getOnly())); - } else if (java.time.Instant.class.isAssignableFrom(type.getRawType())) { + protected StackManipulation convertJavaTimeLogicalType(TypeDescriptor type) { + if (java.time.Instant.class.isAssignableFrom(type.getRawType())) { // Generates the following code: // return java.time.Instant.ofEpochMilli(value.getMillis()) StackManipulation onNotNull = @@ -477,6 +471,29 @@ protected StackManipulation convertDefault(TypeDescriptor type) { .getOnly())); return shortCircuitReturnNull(readValue, onNotNull); } + return super.convertJavaTimeLogicalType(type); + } + + @Override + protected StackManipulation convertDefault(TypeDescriptor type) { + if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { + // Generate the following code: + // return new T((byte[]) value); + ForLoadedType loadedType = new ForLoadedType(type.getRawType()); + return new Compound( + TypeCreation.of(loadedType), + Duplication.SINGLE, + // Load the parameter and cast it to a byte[]. + readValue, + TypeCasting.to(BYTES), + // Create a new instance that wraps this byte[]. + MethodInvocation.invoke( + loadedType + .getDeclaredMethods() + .filter( + ElementMatchers.isConstructor().and(ElementMatchers.takesArguments(BYTES))) + .getOnly())); + } return super.convertDefault(type); } }