diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java index 832090926919..2b7bb1ca0df6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java @@ -28,6 +28,9 @@ import java.lang.reflect.Parameter; import java.lang.reflect.Type; import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -38,6 +41,7 @@ import java.util.Optional; import java.util.Set; import java.util.SortedMap; +import java.util.UUID; import net.bytebuddy.ByteBuddy; import net.bytebuddy.NamingStrategy; import net.bytebuddy.NamingStrategy.SuffixingRandom.BaseNameResolver; @@ -78,6 +82,9 @@ import org.apache.beam.sdk.schemas.FieldValueHaver; import org.apache.beam.sdk.schemas.FieldValueSetter; import org.apache.beam.sdk.schemas.FieldValueTypeInformation; +import org.apache.beam.sdk.schemas.Schema.LogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.sdk.values.TypeDescriptor; @@ -120,6 +127,65 @@ public class ByteBuddyUtils { private static final ForLoadedType ENUM_TYPE = new ForLoadedType(Enum.class); private static final ForLoadedType BYTE_BUDDY_UTILS_TYPE = new ForLoadedType(ByteBuddyUtils.class); + private static final ForLoadedType LOGICAL_TYPE_TYPE = new ForLoadedType(LogicalType.class); + + // Static LogicalType instances used by codegen for JSR-310 and UUID POJO/Bean fields. The + // generated bytecode loads these via FieldAccess and invokes toBaseType / toInputType, so the + // fields must be public so generated classes in user packages can access them. + // See logicalTypeFieldName(...) for the type → field name mapping. + public static final LogicalType JAVA_LOCAL_DATE_LOGICAL_TYPE = SqlTypes.DATE; + public static final LogicalType JAVA_LOCAL_TIME_LOGICAL_TYPE = SqlTypes.TIME; + public static final LogicalType + JAVA_LOCAL_DATE_TIME_LOGICAL_TYPE = SqlTypes.DATETIME; + public static final LogicalType + JAVA_INSTANT_LOGICAL_TYPE = new NanosInstant(); + public static final LogicalType JAVA_UUID_LOGICAL_TYPE = + SqlTypes.UUID; + + /** + * Returns the {@link Schema.LogicalType} that {@link StaticSchemaInference} infers for the given + * Java raw type, or {@code null} if no JSR-310 / UUID inference applies. + */ + static @Nullable LogicalType inferredLogicalTypeFor(Class rawType) { + if (LocalDate.class.equals(rawType)) { + return JAVA_LOCAL_DATE_LOGICAL_TYPE; + } else if (LocalTime.class.equals(rawType)) { + return JAVA_LOCAL_TIME_LOGICAL_TYPE; + } else if (LocalDateTime.class.equals(rawType)) { + return JAVA_LOCAL_DATE_TIME_LOGICAL_TYPE; + } else if (java.time.Instant.class.equals(rawType)) { + return JAVA_INSTANT_LOGICAL_TYPE; + } else if (UUID.class.equals(rawType)) { + return JAVA_UUID_LOGICAL_TYPE; + } + return null; + } + + /** Maps a Java raw type to the static field name in {@link ByteBuddyUtils} that holds it. */ + private static String logicalTypeFieldName(Class rawType) { + if (LocalDate.class.equals(rawType)) { + return "JAVA_LOCAL_DATE_LOGICAL_TYPE"; + } else if (LocalTime.class.equals(rawType)) { + return "JAVA_LOCAL_TIME_LOGICAL_TYPE"; + } else if (LocalDateTime.class.equals(rawType)) { + return "JAVA_LOCAL_DATE_TIME_LOGICAL_TYPE"; + } else if (java.time.Instant.class.equals(rawType)) { + return "JAVA_INSTANT_LOGICAL_TYPE"; + } else if (UUID.class.equals(rawType)) { + return "JAVA_UUID_LOGICAL_TYPE"; + } + throw new IllegalArgumentException("Not an inferred logical type: " + rawType); + } + + /** Stack manipulation that pushes the static {@link LogicalType} for the given Java type. */ + private static StackManipulation loadLogicalType(Class rawType) { + return FieldAccess.forField( + BYTE_BUDDY_UTILS_TYPE + .getDeclaredFields() + .filter(ElementMatchers.named(logicalTypeFieldName(rawType))) + .getOnly()) + .read(); + } /** * A naming strategy for ByteBuddy classes. @@ -286,6 +352,8 @@ public T convert(TypeDescriptor typeDescriptor) { return convertDateTime(typeDescriptor); } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ReadablePartial.class))) { return convertDateTime(typeDescriptor); + } else if (inferredLogicalTypeFor(typeDescriptor.getRawType()) != null) { + return convertLogicalType(typeDescriptor); } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) { return convertByteBuffer(typeDescriptor); } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(CharSequence.class))) { @@ -324,6 +392,14 @@ protected StackManipulation shortCircuitReturnNull( protected abstract T convertDateTime(TypeDescriptor type); + /** + * Handles JSR-310 ({@link LocalDate}, {@link LocalTime}, {@link LocalDateTime}, {@link + * java.time.Instant}) and {@link UUID} fields, which {@link StaticSchemaInference} infers as + * Beam {@link LogicalType}s. Subclasses emit code that round-trips through the corresponding + * static {@link LogicalType} instance ({@link #JAVA_LOCAL_DATE_LOGICAL_TYPE} etc.). + */ + protected abstract T convertLogicalType(TypeDescriptor type); + protected abstract T convertByteBuffer(TypeDescriptor type); protected abstract T convertCharSequence(TypeDescriptor type); @@ -401,6 +477,15 @@ protected Type convertDateTime(TypeDescriptor type) { return Instant.class; } + @Override + protected Type convertLogicalType(TypeDescriptor type) { + // The codegen-generated getter returns the LogicalType's base value (Long for + // Date/Time, Row for DateTime/NanosInstant/UUID). Object.class is a safe upper bound + // for the FieldValueGetter signature; the framework's GetLogicalInputType wrapper + // converts back to the input type before exposing the value to user code. + return Object.class; + } + @Override protected Type convertByteBuffer(TypeDescriptor type) { return byte[].class; @@ -915,6 +1000,26 @@ protected StackManipulation convertDateTime(TypeDescriptor type) { return new ShortCircuitReturnNull(readValue, stackManipulation); } + @Override + protected StackManipulation convertLogicalType(TypeDescriptor type) { + // Equivalent code: return STATIC_LOGICAL_TYPE.toBaseType(value); + // where STATIC_LOGICAL_TYPE is one of the JAVA_*_LOGICAL_TYPE static fields on + // ByteBuddyUtils. The base type is Long (for LocalDate, LocalTime) or Row (for the + // others); both are reference types so no boxing/casting is needed beyond the invoke. + StackManipulation stackManipulation = + new Compound( + loadLogicalType(type.getRawType()), + readValue, + MethodInvocation.invoke( + LOGICAL_TYPE_TYPE + .getDeclaredMethods() + .filter( + ElementMatchers.named("toBaseType") + .and(ElementMatchers.takesArguments(1))) + .getOnly())); + return new ShortCircuitReturnNull(readValue, stackManipulation); + } + @Override protected StackManipulation convertByteBuffer(TypeDescriptor type) { // Generate the following code: @@ -1361,6 +1466,29 @@ protected StackManipulation convertEnum(TypeDescriptor type) { return new ShortCircuitReturnNull(readValue, stackManipulation); } + @Override + protected StackManipulation convertLogicalType(TypeDescriptor type) { + // Equivalent code: return (JavaType) STATIC_LOGICAL_TYPE.toInputType(value); + // FromRowUsingCreator already converted the row's input-type value (e.g. LocalDate) + // to the LogicalType's base value (e.g. Long) before invoking the generated creator, + // so we receive the base type here and need to project back to the POJO field's + // Java type. + ForLoadedType loadedType = new ForLoadedType(type.getRawType()); + StackManipulation stackManipulation = + new Compound( + loadLogicalType(type.getRawType()), + readValue, + MethodInvocation.invoke( + LOGICAL_TYPE_TYPE + .getDeclaredMethods() + .filter( + ElementMatchers.named("toInputType") + .and(ElementMatchers.takesArguments(1))) + .getOnly()), + TypeCasting.to(loadedType)); + return new ShortCircuitReturnNull(readValue, stackManipulation); + } + @Override protected StackManipulation convertDefault(TypeDescriptor type) { return readValue; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java index 196ee6f86593..ffd07071679d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java @@ -22,17 +22,23 @@ import java.lang.reflect.ParameterizedType; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.FieldValueTypeInformation; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.joda.time.ReadableInstant; @@ -127,7 +133,8 @@ public static Schema.FieldType fieldFromType( return fieldFromType(type, fieldValueTypeSupplier, new HashMap<>()); } - // TODO(https://github.com/apache/beam/issues/21567): support type inference for logical types + // TODO(https://github.com/apache/beam/issues/21567): support inference for additional/custom + // logical types private static Schema.FieldType fieldFromType( TypeDescriptor type, FieldValueTypeSupplier fieldValueTypeSupplier, @@ -177,6 +184,16 @@ private static Schema.FieldType fieldFromType( return FieldType.STRING; } else if (type.isSubtypeOf(TypeDescriptor.of(ReadableInstant.class))) { return FieldType.DATETIME; + } else if (type.getRawType().equals(LocalDate.class)) { + return FieldType.logicalType(SqlTypes.DATE); + } else if (type.getRawType().equals(LocalTime.class)) { + return FieldType.logicalType(SqlTypes.TIME); + } else if (type.getRawType().equals(LocalDateTime.class)) { + return FieldType.logicalType(SqlTypes.DATETIME); + } else if (type.getRawType().equals(java.time.Instant.class)) { + return FieldType.logicalType(new NanosInstant()); + } else if (type.getRawType().equals(UUID.class)) { + return FieldType.logicalType(SqlTypes.UUID); } else if (type.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) { return FieldType.BYTES; } else if (type.isSubtypeOf(TypeDescriptor.of(Iterable.class))) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java index df6c9cf18e5a..de0953a0a088 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java @@ -24,6 +24,7 @@ import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.CASE_FORMAT_BEAM_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.FIELD_WITH_DESCRIPTION_BEAN_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.ITERABLE_BEAM_SCHEMA; +import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.JAVA_TIME_BEAN_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_ARRAYS_BEAM_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_ARRAY_BEAN_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_BEAN_SCHEMA; @@ -46,9 +47,15 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.UUID; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.utils.SchemaTestUtils; import org.apache.beam.sdk.schemas.utils.TestJavaBeans; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.AllNullableBean; @@ -57,6 +64,7 @@ import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithNoCreateOption; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithRenamedFieldsAndSetters; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.IterableBean; +import org.apache.beam.sdk.schemas.utils.TestJavaBeans.JavaTimeBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.MismatchingNullableBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArrayBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArraysBean; @@ -179,6 +187,81 @@ public void testFromRow() throws NoSuchSchemaException { assertEquals("stringbuilder", bean.getStringBuilder().toString()); } + @Test + public void testJavaTimeSchema() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + Schema schema = registry.getSchema(JavaTimeBean.class); + SchemaTestUtils.assertSchemaEquivalent(JAVA_TIME_BEAN_SCHEMA, schema); + Schema icebergStyleSchema = + Schema.builder() + .addLogicalTypeField("localDate", SqlTypes.DATE) + .addLogicalTypeField("localTime", SqlTypes.TIME) + .addLogicalTypeField("localDateTime", SqlTypes.DATETIME) + .addLogicalTypeField("instant", new NanosInstant()) + .addLogicalTypeField("uuid", SqlTypes.UUID) + .build(); + assertTrue(schema.assignableToIgnoreNullable(icebergStyleSchema)); + } + + @Test + public void testJavaTimeToRow() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + JavaTimeBean bean = new JavaTimeBean(); + bean.setLocalDate(LocalDate.of(2024, 1, 15)); + bean.setLocalTime(LocalTime.of(10, 30, 45)); + bean.setLocalDateTime(LocalDateTime.of(2024, 1, 15, 10, 30, 45)); + bean.setInstant(java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L)); + bean.setUuid(UUID.fromString("11111111-2222-3333-4444-555555555555")); + + Row row = registry.getToRowFunction(JavaTimeBean.class).apply(bean); + + assertEquals(5, row.getFieldCount()); + assertEquals(bean.getLocalDate(), row.getLogicalTypeValue("localDate", LocalDate.class)); + assertEquals(bean.getLocalTime(), row.getLogicalTypeValue("localTime", LocalTime.class)); + assertEquals( + bean.getLocalDateTime(), row.getLogicalTypeValue("localDateTime", LocalDateTime.class)); + assertEquals(bean.getInstant(), row.getLogicalTypeValue("instant", java.time.Instant.class)); + assertEquals(bean.getUuid(), row.getLogicalTypeValue("uuid", UUID.class)); + } + + @Test + public void testJavaTimeFromRow() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + LocalDate localDate = LocalDate.of(2024, 1, 15); + LocalTime localTime = LocalTime.of(10, 30, 45); + LocalDateTime localDateTime = LocalDateTime.of(2024, 1, 15, 10, 30, 45); + java.time.Instant instant = java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L); + UUID uuid = UUID.fromString("11111111-2222-3333-4444-555555555555"); + Row row = + Row.withSchema(JAVA_TIME_BEAN_SCHEMA) + .addValues(localDate, localTime, localDateTime, instant, uuid) + .build(); + + JavaTimeBean bean = registry.getFromRowFunction(JavaTimeBean.class).apply(row); + + assertEquals(localDate, bean.getLocalDate()); + assertEquals(localTime, bean.getLocalTime()); + assertEquals(localDateTime, bean.getLocalDateTime()); + assertEquals(instant, bean.getInstant()); + assertEquals(uuid, bean.getUuid()); + } + + @Test + public void testJavaTimeRoundTrip() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + JavaTimeBean original = new JavaTimeBean(); + original.setLocalDate(LocalDate.of(2024, 1, 15)); + original.setLocalTime(LocalTime.of(10, 30, 45)); + original.setLocalDateTime(LocalDateTime.of(2024, 1, 15, 10, 30, 45)); + original.setInstant(java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L)); + original.setUuid(UUID.fromString("11111111-2222-3333-4444-555555555555")); + + Row row = registry.getToRowFunction(JavaTimeBean.class).apply(original); + JavaTimeBean roundTripped = registry.getFromRowFunction(JavaTimeBean.class).apply(row); + + assertEquals(original, roundTripped); + } + @Test public void testNullableToRow() throws NoSuchSchemaException { SchemaRegistry registry = SchemaRegistry.createDefault(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java index 7a66decde017..c80b758adc31 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java @@ -21,12 +21,14 @@ import static org.apache.beam.sdk.schemas.utils.TestPOJOs.ANNOTATED_SIMPLE_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.CASE_FORMAT_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.ENUMERATION; +import static org.apache.beam.sdk.schemas.utils.TestPOJOs.JAVA_TIME_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_ARRAYS_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_ARRAY_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_MAP_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_NULLABLE_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NULLABLES_SCHEMA; +import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NULLABLE_JAVA_TIME_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NULLABLE_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.POJO_WITH_ENUM_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.POJO_WITH_ITERABLE; @@ -46,20 +48,28 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.utils.SchemaTestUtils; import org.apache.beam.sdk.schemas.utils.TestPOJOs; import org.apache.beam.sdk.schemas.utils.TestPOJOs.AnnotatedSimplePojo; import org.apache.beam.sdk.schemas.utils.TestPOJOs.FirstCircularNestedPOJO; +import org.apache.beam.sdk.schemas.utils.TestPOJOs.JavaTimePOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArraysPOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedMapPOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedPOJO; +import org.apache.beam.sdk.schemas.utils.TestPOJOs.NullableJavaTimePOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NullablePOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.POJOWithNestedNullable; import org.apache.beam.sdk.schemas.utils.TestPOJOs.POJOWithNullables; @@ -228,6 +238,118 @@ public void testFromRow() throws NoSuchSchemaException { assertEquals("stringbuilder", pojo.stringBuilder.toString()); } + @Test + public void testJavaTimeSchema() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + Schema schema = registry.getSchema(JavaTimePOJO.class); + SchemaTestUtils.assertSchemaEquivalent(JAVA_TIME_POJO_SCHEMA, schema); + // Reproduces the failure mode in #37524: a POJO inferred from JSR-310 fields must be + // assignable to a row schema (e.g. one produced by IcebergIO) that uses the same logical + // types. + Schema icebergStyleSchema = + Schema.builder() + .addLogicalTypeField("localDate", SqlTypes.DATE) + .addLogicalTypeField("localTime", SqlTypes.TIME) + .addLogicalTypeField("localDateTime", SqlTypes.DATETIME) + .addLogicalTypeField("instant", new NanosInstant()) + .addLogicalTypeField("uuid", SqlTypes.UUID) + .build(); + assertTrue(schema.assignableToIgnoreNullable(icebergStyleSchema)); + } + + @Test + public void testJavaTimeToRow() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + LocalDate localDate = LocalDate.of(2024, 1, 15); + LocalTime localTime = LocalTime.of(10, 30, 45); + LocalDateTime localDateTime = LocalDateTime.of(2024, 1, 15, 10, 30, 45); + java.time.Instant instant = java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L); + UUID uuid = UUID.fromString("11111111-2222-3333-4444-555555555555"); + JavaTimePOJO pojo = new JavaTimePOJO(localDate, localTime, localDateTime, instant, uuid); + + Row row = registry.getToRowFunction(JavaTimePOJO.class).apply(pojo); + + assertEquals(5, row.getFieldCount()); + assertEquals(localDate, row.getLogicalTypeValue("localDate", LocalDate.class)); + assertEquals(localTime, row.getLogicalTypeValue("localTime", LocalTime.class)); + assertEquals(localDateTime, row.getLogicalTypeValue("localDateTime", LocalDateTime.class)); + assertEquals(instant, row.getLogicalTypeValue("instant", java.time.Instant.class)); + assertEquals(uuid, row.getLogicalTypeValue("uuid", UUID.class)); + } + + @Test + public void testJavaTimeFromRow() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + LocalDate localDate = LocalDate.of(2024, 1, 15); + LocalTime localTime = LocalTime.of(10, 30, 45); + LocalDateTime localDateTime = LocalDateTime.of(2024, 1, 15, 10, 30, 45); + java.time.Instant instant = java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L); + UUID uuid = UUID.fromString("11111111-2222-3333-4444-555555555555"); + Row row = + Row.withSchema(JAVA_TIME_POJO_SCHEMA) + .addValues(localDate, localTime, localDateTime, instant, uuid) + .build(); + + JavaTimePOJO pojo = registry.getFromRowFunction(JavaTimePOJO.class).apply(row); + + assertEquals(localDate, pojo.localDate); + assertEquals(localTime, pojo.localTime); + assertEquals(localDateTime, pojo.localDateTime); + assertEquals(instant, pojo.instant); + assertEquals(uuid, pojo.uuid); + } + + @Test + public void testJavaTimeRoundTrip() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + JavaTimePOJO original = + new JavaTimePOJO( + LocalDate.of(2024, 1, 15), + LocalTime.of(10, 30, 45), + LocalDateTime.of(2024, 1, 15, 10, 30, 45), + java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L), + UUID.fromString("11111111-2222-3333-4444-555555555555")); + + Row row = registry.getToRowFunction(JavaTimePOJO.class).apply(original); + JavaTimePOJO roundTripped = registry.getFromRowFunction(JavaTimePOJO.class).apply(row); + + assertEquals(original, roundTripped); + } + + @Test + public void testNullableJavaTimeSchema() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + Schema schema = registry.getSchema(NullableJavaTimePOJO.class); + SchemaTestUtils.assertSchemaEquivalent(NULLABLE_JAVA_TIME_POJO_SCHEMA, schema); + } + + @Test + public void testNullableJavaTimeToRow() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + NullableJavaTimePOJO pojo = new NullableJavaTimePOJO(); + Row row = registry.getToRowFunction(NullableJavaTimePOJO.class).apply(pojo); + + assertEquals(5, row.getFieldCount()); + assertNull(row.getLogicalTypeValue("localDate", LocalDate.class)); + assertNull(row.getLogicalTypeValue("localTime", LocalTime.class)); + assertNull(row.getLogicalTypeValue("localDateTime", LocalDateTime.class)); + assertNull(row.getLogicalTypeValue("instant", java.time.Instant.class)); + assertNull(row.getLogicalTypeValue("uuid", UUID.class)); + } + + @Test + public void testNullableJavaTimeFromRow() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + Row row = Row.nullRow(NULLABLE_JAVA_TIME_POJO_SCHEMA); + + NullableJavaTimePOJO pojo = registry.getFromRowFunction(NullableJavaTimePOJO.class).apply(row); + assertNull(pojo.localDate); + assertNull(pojo.localTime); + assertNull(pojo.localDateTime); + assertNull(pojo.instant); + assertNull(pojo.uuid); + } + @Test public void testNullableSchema() throws NoSuchSchemaException { SchemaRegistry registry = SchemaRegistry.createDefault(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java index 93d6984d47e2..c206bd8f61fd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.schemas.transforms; +import java.time.LocalDateTime; import java.util.Arrays; import java.util.Map; import java.util.Objects; @@ -24,6 +25,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -211,6 +213,103 @@ public void testFromRows() { pipeline.run(); } + /** Reproducer for #37524: a Row with a logical-type DateTime field should convert to a POJO. */ + @DefaultSchema(JavaFieldSchema.class) + public static class TimePOJO { + public LocalDateTime time; + + public TimePOJO() {} + + public TimePOJO(LocalDateTime time) { + this.time = time; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TimePOJO)) { + return false; + } + TimePOJO that = (TimePOJO) o; + return Objects.equals(time, that.time); + } + + @Override + public int hashCode() { + return Objects.hash(time); + } + } + + @Test + @Category(NeedsRunner.class) + public void testFromRowsWithLogicalTypeDateTime() { + Schema icebergStyleSchema = + Schema.builder().addLogicalTypeField("time", SqlTypes.DATETIME).build(); + LocalDateTime expected = LocalDateTime.of(2024, 1, 15, 10, 30, 0); + Row inputRow = Row.withSchema(icebergStyleSchema).addValue(expected).build(); + + PCollection pojos = + pipeline + .apply(Create.of(inputRow).withRowSchema(icebergStyleSchema)) + .apply(Convert.fromRows(TimePOJO.class)); + + PAssert.that(pojos).containsInAnyOrder(new TimePOJO(expected)); + pipeline.run(); + } + + /** POJO mixing a logical-type field with a primitive field. */ + @DefaultSchema(JavaFieldSchema.class) + public static class MixedTimePOJO { + public LocalDateTime time; + public String name; + + public MixedTimePOJO() {} + + public MixedTimePOJO(LocalDateTime time, String name) { + this.time = time; + this.name = name; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MixedTimePOJO)) { + return false; + } + MixedTimePOJO that = (MixedTimePOJO) o; + return Objects.equals(time, that.time) && Objects.equals(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hash(time, name); + } + } + + @Test + @Category(NeedsRunner.class) + public void testFromRowsWithMixedLogicalAndPrimitiveTypes() { + Schema mixedSchema = + Schema.builder() + .addLogicalTypeField("time", SqlTypes.DATETIME) + .addStringField("name") + .build(); + LocalDateTime expectedTime = LocalDateTime.of(2024, 1, 15, 10, 30, 0); + Row inputRow = Row.withSchema(mixedSchema).addValues(expectedTime, "hello").build(); + + PCollection pojos = + pipeline + .apply(Create.of(inputRow).withRowSchema(mixedSchema)) + .apply(Convert.fromRows(MixedTimePOJO.class)); + + PAssert.that(pojos).containsInAnyOrder(new MixedTimePOJO(expectedTime, "hello")); + pipeline.run(); + } + @Test @Category(NeedsRunner.class) public void testGeneralConvert() { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java index 7e9cf9a894b9..57d4f905e6ec 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.BEAN_WITH_BOXED_FIELDS_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.BEAN_WITH_BYTE_ARRAY_SCHEMA; +import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.JAVA_TIME_BEAN_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_ARRAY_BEAN_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_BEAN_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_COLLECTION_BEAN_SCHEMA; @@ -44,6 +45,7 @@ import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithBoxedFields; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithByteArray; +import org.apache.beam.sdk.schemas.utils.TestJavaBeans.JavaTimeBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArrayBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedCollectionBean; @@ -79,6 +81,14 @@ public void testSimpleBean() { SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema); } + @Test + public void testJavaTimeBean() { + Schema schema = + JavaBeanUtils.schemaFromJavaBeanClass( + new TypeDescriptor() {}, GetterTypeSupplier.INSTANCE); + SchemaTestUtils.assertSchemaEquivalent(JAVA_TIME_BEAN_SCHEMA, schema); + } + @Test public void testNestedBean() { Schema schema = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java index 6b9fbcd30a27..72407d965da4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.schemas.utils; +import static org.apache.beam.sdk.schemas.utils.TestPOJOs.JAVA_TIME_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_ARRAY_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_COLLECTION_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_MAP_POJO_SCHEMA; @@ -38,7 +39,11 @@ import org.apache.beam.sdk.schemas.FieldValueGetter; import org.apache.beam.sdk.schemas.JavaFieldSchema.JavaFieldTypeSupplier; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory; +import org.apache.beam.sdk.schemas.utils.TestPOJOs.JavaTimePOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedCollectionPOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedMapPOJO; @@ -82,6 +87,33 @@ public void testSimplePOJO() { assertEquals(SIMPLE_POJO_SCHEMA, schema); } + @Test + public void testJavaTimePOJO() { + Schema schema = + POJOUtils.schemaFromPojoClass( + new TypeDescriptor() {}, JavaFieldTypeSupplier.INSTANCE); + assertEquals(JAVA_TIME_POJO_SCHEMA, schema); + } + + /** + * Regression test for #37524: Joda {@link Instant} must continue to map to {@link + * FieldType#DATETIME}, not the new {@code java.time.Instant} logical type. This guards the branch + * ordering in {@code StaticSchemaInference.fieldFromType}. + */ + @Test + public void testJodaInstantStillInfersAsDatetime() { + FieldType jodaInferred = + StaticSchemaInference.fieldFromType( + TypeDescriptor.of(Instant.class), JavaFieldTypeSupplier.INSTANCE); + assertEquals(FieldType.DATETIME, jodaInferred); + + FieldType javaTimeInferred = + StaticSchemaInference.fieldFromType( + TypeDescriptor.of(java.time.Instant.class), JavaFieldTypeSupplier.INSTANCE); + assertEquals(TypeName.LOGICAL_TYPE, javaTimeInferred.getTypeName()); + assertEquals(NanosInstant.IDENTIFIER, javaTimeInferred.getLogicalType().getIdentifier()); + } + @Test public void testNestedPOJO() { Schema schema = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java index 694db05b0918..d8ed86f2b552 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java @@ -19,10 +19,14 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; import org.apache.beam.sdk.schemas.JavaBeanSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -33,6 +37,8 @@ import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.checkerframework.checker.nullness.qual.Nullable; @@ -1398,4 +1404,87 @@ public void setValue(@Nullable Float value) { Schema.Field.nullable("value", FieldType.FLOAT) .withDescription("This value is the value stored in the object as a float.")) .build(); + + /** A Bean containing JSR-310 date/time types and a UUID, all inferred as Beam logical types. */ + @DefaultSchema(JavaBeanSchema.class) + public static class JavaTimeBean { + private LocalDate localDate; + private LocalTime localTime; + private LocalDateTime localDateTime; + private java.time.Instant instant; + private UUID uuid; + + public JavaTimeBean() {} + + public LocalDate getLocalDate() { + return localDate; + } + + public void setLocalDate(LocalDate localDate) { + this.localDate = localDate; + } + + public LocalTime getLocalTime() { + return localTime; + } + + public void setLocalTime(LocalTime localTime) { + this.localTime = localTime; + } + + public LocalDateTime getLocalDateTime() { + return localDateTime; + } + + public void setLocalDateTime(LocalDateTime localDateTime) { + this.localDateTime = localDateTime; + } + + public java.time.Instant getInstant() { + return instant; + } + + public void setInstant(java.time.Instant instant) { + this.instant = instant; + } + + public UUID getUuid() { + return uuid; + } + + public void setUuid(UUID uuid) { + this.uuid = uuid; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof JavaTimeBean)) { + return false; + } + JavaTimeBean that = (JavaTimeBean) o; + return Objects.equals(localDate, that.localDate) + && Objects.equals(localTime, that.localTime) + && Objects.equals(localDateTime, that.localDateTime) + && Objects.equals(instant, that.instant) + && Objects.equals(uuid, that.uuid); + } + + @Override + public int hashCode() { + return Objects.hash(localDate, localTime, localDateTime, instant, uuid); + } + } + + /** The schema for {@link JavaTimeBean}. */ + public static final Schema JAVA_TIME_BEAN_SCHEMA = + Schema.builder() + .addLogicalTypeField("localDate", SqlTypes.DATE) + .addLogicalTypeField("localTime", SqlTypes.TIME) + .addLogicalTypeField("localDateTime", SqlTypes.DATETIME) + .addLogicalTypeField("instant", new NanosInstant()) + .addLogicalTypeField("uuid", SqlTypes.UUID) + .build(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java index b82c4dc0e7e6..38ec507480f7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java @@ -19,10 +19,14 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; import org.apache.beam.sdk.schemas.JavaFieldSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -34,6 +38,8 @@ import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.checkerframework.checker.nullness.qual.Nullable; @@ -1281,4 +1287,116 @@ public int hashCode() { Schema.Field.nullable("str", FieldType.STRING) .withDescription("a simple string that is part of this field")) .build(); + + /** A POJO containing JSR-310 date/time types and a UUID, all inferred as Beam logical types. */ + @DefaultSchema(JavaFieldSchema.class) + public static class JavaTimePOJO { + public LocalDate localDate; + public LocalTime localTime; + public LocalDateTime localDateTime; + public java.time.Instant instant; + public UUID uuid; + + public JavaTimePOJO() {} + + public JavaTimePOJO( + LocalDate localDate, + LocalTime localTime, + LocalDateTime localDateTime, + java.time.Instant instant, + UUID uuid) { + this.localDate = localDate; + this.localTime = localTime; + this.localDateTime = localDateTime; + this.instant = instant; + this.uuid = uuid; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof JavaTimePOJO)) { + return false; + } + JavaTimePOJO that = (JavaTimePOJO) o; + return Objects.equals(localDate, that.localDate) + && Objects.equals(localTime, that.localTime) + && Objects.equals(localDateTime, that.localDateTime) + && Objects.equals(instant, that.instant) + && Objects.equals(uuid, that.uuid); + } + + @Override + public int hashCode() { + return Objects.hash(localDate, localTime, localDateTime, instant, uuid); + } + } + + /** The schema for {@link JavaTimePOJO}. */ + public static final Schema JAVA_TIME_POJO_SCHEMA = + Schema.builder() + .addLogicalTypeField("localDate", SqlTypes.DATE) + .addLogicalTypeField("localTime", SqlTypes.TIME) + .addLogicalTypeField("localDateTime", SqlTypes.DATETIME) + .addLogicalTypeField("instant", new NanosInstant()) + .addLogicalTypeField("uuid", SqlTypes.UUID) + .build(); + + /** A POJO with nullable JSR-310 and UUID fields. */ + @DefaultSchema(JavaFieldSchema.class) + public static class NullableJavaTimePOJO { + public @Nullable LocalDate localDate; + public @Nullable LocalTime localTime; + public @Nullable LocalDateTime localDateTime; + public java.time.@Nullable Instant instant; + public @Nullable UUID uuid; + + public NullableJavaTimePOJO() {} + + public NullableJavaTimePOJO( + LocalDate localDate, + LocalTime localTime, + LocalDateTime localDateTime, + java.time.Instant instant, + UUID uuid) { + this.localDate = localDate; + this.localTime = localTime; + this.localDateTime = localDateTime; + this.instant = instant; + this.uuid = uuid; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof NullableJavaTimePOJO)) { + return false; + } + NullableJavaTimePOJO that = (NullableJavaTimePOJO) o; + return Objects.equals(localDate, that.localDate) + && Objects.equals(localTime, that.localTime) + && Objects.equals(localDateTime, that.localDateTime) + && Objects.equals(instant, that.instant) + && Objects.equals(uuid, that.uuid); + } + + @Override + public int hashCode() { + return Objects.hash(localDate, localTime, localDateTime, instant, uuid); + } + } + + /** The schema for {@link NullableJavaTimePOJO}. */ + public static final Schema NULLABLE_JAVA_TIME_POJO_SCHEMA = + Schema.builder() + .addNullableLogicalTypeField("localDate", SqlTypes.DATE) + .addNullableLogicalTypeField("localTime", SqlTypes.TIME) + .addNullableLogicalTypeField("localDateTime", SqlTypes.DATETIME) + .addNullableLogicalTypeField("instant", new NanosInstant()) + .addNullableLogicalTypeField("uuid", SqlTypes.UUID) + .build(); } diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index 457265646420..44a39f3c099a 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -333,15 +333,20 @@ public AvroConvertType(boolean returnRawType) { } @Override - protected java.lang.reflect.Type convertDefault(TypeDescriptor type) { + protected java.lang.reflect.Type convertLogicalType(TypeDescriptor type) { if (type.isSubtypeOf(TypeDescriptor.of(java.time.Instant.class)) || type.isSubtypeOf(TypeDescriptor.of(java.time.LocalDate.class))) { return convertDateTime(type); - } else if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { + } + return super.convertLogicalType(type); + } + + @Override + protected java.lang.reflect.Type convertDefault(TypeDescriptor type) { + if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { return byte[].class; - } else { - return super.convertDefault(type); } + return super.convertDefault(type); } } @@ -356,18 +361,8 @@ protected TypeConversionsFactory getFactory() { } @Override - protected StackManipulation convertDefault(TypeDescriptor type) { - if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { - // Generate the following code: - // return value.bytes(); - return new Compound( - readValue, - MethodInvocation.invoke( - new ForLoadedType(GenericFixed.class) - .getDeclaredMethods() - .filter(ElementMatchers.named("bytes").and(ElementMatchers.returns(BYTES))) - .getOnly())); - } else if (java.time.Instant.class.isAssignableFrom(type.getRawType())) { + protected StackManipulation convertLogicalType(TypeDescriptor type) { + if (java.time.Instant.class.isAssignableFrom(type.getRawType())) { // Generates the following code: // return Instant.ofEpochMilli(value.toEpochMilli()) StackManipulation onNotNull = @@ -406,6 +401,22 @@ protected StackManipulation convertDefault(TypeDescriptor type) { .getOnly())); return shortCircuitReturnNull(readValue, onNotNull); } + return super.convertLogicalType(type); + } + + @Override + protected StackManipulation convertDefault(TypeDescriptor type) { + if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { + // Generate the following code: + // return value.bytes(); + return new Compound( + readValue, + MethodInvocation.invoke( + new ForLoadedType(GenericFixed.class) + .getDeclaredMethods() + .filter(ElementMatchers.named("bytes").and(ElementMatchers.returns(BYTES))) + .getOnly())); + } return super.convertDefault(type); } } @@ -421,25 +432,8 @@ protected TypeConversionsFactory getFactory() { } @Override - protected StackManipulation convertDefault(TypeDescriptor type) { - if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { - // Generate the following code: - // return new T((byte[]) value); - ForLoadedType loadedType = new ForLoadedType(type.getRawType()); - return new Compound( - TypeCreation.of(loadedType), - Duplication.SINGLE, - // Load the parameter and cast it to a byte[]. - readValue, - TypeCasting.to(BYTES), - // Create a new instance that wraps this byte[]. - MethodInvocation.invoke( - loadedType - .getDeclaredMethods() - .filter( - ElementMatchers.isConstructor().and(ElementMatchers.takesArguments(BYTES))) - .getOnly())); - } else if (java.time.Instant.class.isAssignableFrom(type.getRawType())) { + protected StackManipulation convertLogicalType(TypeDescriptor type) { + if (java.time.Instant.class.isAssignableFrom(type.getRawType())) { // Generates the following code: // return java.time.Instant.ofEpochMilli(value.getMillis()) StackManipulation onNotNull = @@ -477,6 +471,29 @@ protected StackManipulation convertDefault(TypeDescriptor type) { .getOnly())); return shortCircuitReturnNull(readValue, onNotNull); } + return super.convertLogicalType(type); + } + + @Override + protected StackManipulation convertDefault(TypeDescriptor type) { + if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { + // Generate the following code: + // return new T((byte[]) value); + ForLoadedType loadedType = new ForLoadedType(type.getRawType()); + return new Compound( + TypeCreation.of(loadedType), + Duplication.SINGLE, + // Load the parameter and cast it to a byte[]. + readValue, + TypeCasting.to(BYTES), + // Create a new instance that wraps this byte[]. + MethodInvocation.invoke( + loadedType + .getDeclaredMethods() + .filter( + ElementMatchers.isConstructor().and(ElementMatchers.takesArguments(BYTES))) + .getOnly())); + } return super.convertDefault(type); } }