diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java index 5f47e2ee2e5..7a0e9145f32 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java @@ -55,12 +55,12 @@ import io.trino.plugin.kafka.schema.KafkaSchemaRegistryClientPropertiesProvider; import io.trino.plugin.kafka.schema.ProtobufAnySupportConfig; import io.trino.plugin.kafka.schema.TableDescriptionSupplier; -import io.trino.spi.HostAddress; import io.trino.spi.TrinoException; import io.trino.spi.type.TypeManager; import jakarta.annotation.PreDestroy; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -70,7 +70,6 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.inject.Scopes.SINGLETON; import static com.google.inject.multibindings.MapBinder.newMapBinder; import static com.google.inject.multibindings.Multibinder.newSetBinder; @@ -112,15 +111,16 @@ protected void setup(Binder binder) newMapBinder(binder, String.class, SchemaParser.class).addBinding("AVRO").to(AvroSchemaParser.class).in(Scopes.SINGLETON); newMapBinder(binder, String.class, SchemaParser.class).addBinding("PROTOBUF").to(LazyLoadedProtobufSchemaParser.class).in(Scopes.SINGLETON); - // Bind the appropriate ConfluentSchemaRegistryAuth implementation based on configuration - ConfluentSchemaRegistryConfig schemaRegistryConfig = buildConfigObject(ConfluentSchemaRegistryConfig.class); - if (schemaRegistryConfig.getConfluentSchemaRegistryAuthType() == BASIC_AUTH) { - configBinder(binder).bindConfig(BasicAuthConfig.class); - binder.bind(SchemaRegistryClientPropertiesProvider.class).to(ConfluentSchemaRegistryBasicAuth.class).in(Scopes.SINGLETON); - } - else { - binder.bind(SchemaRegistryClientPropertiesProvider.class).to(ConfluentSchemaRegistryNoAuth.class).in(Scopes.SINGLETON); - } + configBinder(binder).bindConfig(BasicAuthConfig.class); + install(conditionalModule( + ConfluentSchemaRegistryConfig.class, + schemaRegistryConfig -> schemaRegistryConfig.getConfluentSchemaRegistryAuthType() == BASIC_AUTH, + authBinder -> authBinder.bind(SchemaRegistryClientPropertiesProvider.class) + .to(ConfluentSchemaRegistryBasicAuth.class) + .in(Scopes.SINGLETON), + authBinder -> authBinder.bind(SchemaRegistryClientPropertiesProvider.class) + .to(ConfluentSchemaRegistryNoAuth.class) + .in(Scopes.SINGLETON))); } @Provides @@ -135,23 +135,42 @@ public static SchemaRegistryClient createSchemaRegistryClient( requireNonNull(propertiesProviders, "propertiesProviders is null"); List baseUrl = confluentConfig.getConfluentSchemaRegistryUrls().stream() - .map(HostAddress::getHostText) .collect(toImmutableList()); - Map schemaRegistryClientProperties = propertiesProviders.stream() - .map(SchemaRegistryClientPropertiesProvider::getSchemaRegistryClientProperties) - .flatMap(properties -> properties.entrySet().stream()) - .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); - return new ClassLoaderSafeSchemaRegistryClient( new CachedSchemaRegistryClient( baseUrl, confluentConfig.getConfluentSchemaRegistryClientCacheSize(), ImmutableList.copyOf(schemaProviders), - schemaRegistryClientProperties), + buildSchemaRegistryClientProperties(confluentConfig, propertiesProviders)), classLoader); } + static Map buildSchemaRegistryClientProperties( + ConfluentSchemaRegistryConfig confluentConfig, + Set propertiesProviders) + { + Map schemaRegistryClientProperties = propertiesProviders.stream() + .map(SchemaRegistryClientPropertiesProvider::getSchemaRegistryClientProperties) + .flatMap(properties -> properties.entrySet().stream()) + .collect(java.util.stream.Collectors.toMap( + Map.Entry::getKey, + entry -> (Object) entry.getValue(), + (left, right) -> right, + HashMap::new)); + + if (confluentConfig.getConfluentSchemaRegistryAuthType() == BASIC_AUTH) { + Object userInfo = schemaRegistryClientProperties.get("basic.auth.user.info"); + if (userInfo instanceof String) { + schemaRegistryClientProperties.putIfAbsent("basic.auth.credentials.source", "USER_INFO"); + schemaRegistryClientProperties.putIfAbsent("schema.registry.basic.auth.credentials.source", "USER_INFO"); + schemaRegistryClientProperties.putIfAbsent("schema.registry.basic.auth.user.info", userInfo); + } + } + + return schemaRegistryClientProperties; + } + @PreDestroy public void destroy(SchemaRegistryClient client) throws IOException diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java index c2810d9f422..1db777d624e 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java @@ -15,14 +15,12 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; import io.airlift.units.Duration; import io.airlift.units.MaxDuration; import io.airlift.units.MinDuration; import io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy; -import io.trino.spi.HostAddress; import io.trino.spi.connector.SchemaTableName; import jakarta.validation.constraints.Max; import jakarta.validation.constraints.Min; @@ -30,10 +28,9 @@ import java.util.List; import java.util.Map; -import java.util.Set; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Streams.stream; import static io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.IGNORE; import static java.util.Objects.requireNonNull; @@ -47,7 +44,7 @@ public enum ConfluentSchemaRegistryAuthType BASIC_AUTH, } - private Set confluentSchemaRegistryUrls; + private List confluentSchemaRegistryUrls; private ConfluentSchemaRegistryAuthType confluentSchemaRegistryAuthType = ConfluentSchemaRegistryAuthType.NONE; private int confluentSchemaRegistryClientCacheSize = 1000; private EmptyFieldStrategy emptyFieldStrategy = IGNORE; @@ -55,7 +52,7 @@ public enum ConfluentSchemaRegistryAuthType private Map confluentSchemaRegistrySubjectMapping = ImmutableMap.of(); @Size(min = 1) - public Set getConfluentSchemaRegistryUrls() + public List getConfluentSchemaRegistryUrls() { return confluentSchemaRegistryUrls; } @@ -137,17 +134,20 @@ public ConfluentSchemaRegistryConfig setConfluentSchemaRegistrySubjectMapping(St return this; } - private static ImmutableSet parseNodes(String nodes) + private static List parseNodes(String nodes) { Splitter splitter = Splitter.on(',').omitEmptyStrings().trimResults(); return stream(splitter.split(nodes)) - .map(ConfluentSchemaRegistryConfig::toHostAddress) - .collect(toImmutableSet()); + .map(ConfluentSchemaRegistryConfig::normalizeUrl) + .collect(toImmutableList()); } - private static HostAddress toHostAddress(String value) + private static String normalizeUrl(String value) { - return HostAddress.fromString(value); + if (value.contains("://")) { + return value; + } + return "https://" + value; } private static ImmutableMap parseSubjectMapping(String mapping) diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentModule.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentModule.java new file mode 100644 index 00000000000..373942361e9 --- /dev/null +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentModule.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.schema.confluent; + +import com.google.common.collect.ImmutableSet; +import io.trino.plugin.kafka.schema.KafkaSchemaRegistryClientPropertiesProvider; +import org.junit.jupiter.api.Test; + +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TestConfluentModule +{ + @Test + public void testBasicAuthPropertiesIncludedInSchemaRegistryClientProperties() + { + ConfluentSchemaRegistryConfig config = new ConfluentSchemaRegistryConfig() + .setConfluentSchemaRegistryUrls("schema-registry.example.com:8081") + .setConfluentSchemaRegistryAuthType(ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.BASIC_AUTH); + BasicAuthConfig basicAuthConfig = new BasicAuthConfig() + .setConfluentSchemaRegistryUsername("user1") + .setConfluentSchemaRegistryPassword("secret1"); + + Set providers = ImmutableSet.of( + new KafkaSchemaRegistryClientPropertiesProvider(new ConfluentSchemaRegistryBasicAuth(basicAuthConfig))); + java.util.Map merged = ConfluentModule.buildSchemaRegistryClientProperties(config, providers); + + assertThat(merged) + .containsEntry("basic.auth.credentials.source", "USER_INFO") + .containsEntry("basic.auth.user.info", "user1:secret1") + .containsEntry("schema.registry.basic.auth.credentials.source", "USER_INFO") + .containsEntry("schema.registry.basic.auth.user.info", "user1:secret1"); + } +} diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java index f24bf2b1575..c4b0604ad26 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableMap; import io.airlift.units.Duration; +import io.trino.spi.connector.SchemaTableName; import org.junit.jupiter.api.Test; import java.util.Map; @@ -27,6 +28,7 @@ import static io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.BASIC_AUTH; import static io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.NONE; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; public class TestConfluentSchemaRegistryConfig { @@ -38,7 +40,8 @@ public void testDefaults() .setConfluentSchemaRegistryAuthType(NONE) .setConfluentSchemaRegistryClientCacheSize(1000) .setEmptyFieldStrategy(IGNORE) - .setConfluentSubjectsCacheRefreshInterval(new Duration(1, SECONDS))); + .setConfluentSubjectsCacheRefreshInterval(new Duration(1, SECONDS)) + .setConfluentSchemaRegistrySubjectMapping(null)); } @Test @@ -50,6 +53,7 @@ public void testExplicitPropertyMappings() .put("kafka.confluent-schema-registry-client-cache-size", "1500") .put("kafka.empty-field-strategy", "MARK") .put("kafka.confluent-subjects-cache-refresh-interval", "2s") + .put("kafka.confluent-schema-registry-subject-mapping", "default.orders:orders-value") .buildOrThrow(); ConfluentSchemaRegistryConfig expected = new ConfluentSchemaRegistryConfig() @@ -57,8 +61,30 @@ public void testExplicitPropertyMappings() .setConfluentSchemaRegistryAuthType(BASIC_AUTH) .setConfluentSchemaRegistryClientCacheSize(1500) .setEmptyFieldStrategy(MARK) - .setConfluentSubjectsCacheRefreshInterval(new Duration(2, SECONDS)); + .setConfluentSubjectsCacheRefreshInterval(new Duration(2, SECONDS)) + .setConfluentSchemaRegistrySubjectMapping("default.orders:orders-value"); assertFullMapping(properties, expected); } + + @Test + public void testHostPortSchemaRegistryUrlDefaultsToHttps() + { + ConfluentSchemaRegistryConfig config = new ConfluentSchemaRegistryConfig() + .setConfluentSchemaRegistryUrls("schema-registry-a:8081, schema-registry-b:8082"); + + assertThat(config.getConfluentSchemaRegistryUrls()) + .containsExactly("https://schema-registry-a:8081", "https://schema-registry-b:8082"); + } + + @Test + public void testSubjectMappingParsing() + { + ConfluentSchemaRegistryConfig config = new ConfluentSchemaRegistryConfig() + .setConfluentSchemaRegistrySubjectMapping("default.orders:orders-value,analytics.users:users-value"); + + assertThat(config.getConfluentSchemaRegistrySubjectMapping()) + .containsEntry(new SchemaTableName("default", "orders"), "orders-value") + .containsEntry(new SchemaTableName("analytics", "users"), "users-value"); + } }