diff --git a/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html b/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html index df9032e4e79cf..3b82439dde078 100644 --- a/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html +++ b/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html @@ -56,5 +56,17 @@ String service.version passed to OpenTelemetry Reporters. + +
metrics.reporter.OpenTelemetry.transform.attribute-value-length-limits.<attribute-name>
+ (none) + Integer + Limits of the exported attribute values length. Only applies to the metric reporter; ignored by the trace and event reporters. Configuration is prefix based, for example to limit `task_name` attribute set `metrics.reporter.otel.transform.attribute-value-length-limits.task_name: 60` in the config for OTel reporter. A special key '*' can be used to define a global limit for all attributes not explicitly listed. For example `metrics.reporter.otel.transform.attribute-value-length-limits.*: 1024` will limit all attributes to attributeValue.substring(0, 1024). Global limit defaults to Integer.MAX_VALUE if not set. Individual attribute limits always override the global limit and are verified by exact match on the attribute name. 0 can be used to drop an attribute. Negative values are interpreted as no limit for the attribute (can be used for global limit overrides). + + +
metrics.reporter.OpenTelemetry.transform.collision-tracking-max-slots
+ 50000 + Integer + Maximum number of distinct (metric name, transformed attributes) slots the truncation-collision tracker retains. Only applies to the metric reporter; ignored by the trace and event reporters. When the cap is reached the least-recently-touched slot is evicted (LRU); a previously warned slot that later gets evicted may fire its warning again on re-entry. Only consulted when attribute-value length limits are configured — if no truncation is happening, there is nothing to track and this option has no effect. Set to 0 to disable collision tracking entirely. Malformed or negative values fall back to the default with a warning in the logs. + diff --git a/flink-metrics/flink-metrics-otel/pom.xml b/flink-metrics/flink-metrics-otel/pom.xml index 33117e1f5b808..ee2b5d984da3c 100644 --- a/flink-metrics/flink-metrics-otel/pom.xml +++ b/flink-metrics/flink-metrics-otel/pom.xml @@ -128,6 +128,7 @@ under the License. org.apache.flink flink-test-utils-junit ${project.version} + test diff --git a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java new file mode 100644 index 0000000000000..3248d08d94070 --- /dev/null +++ b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.otel; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.metrics.MetricConfig; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY; +import static org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS; + +/** + * Applies per-attribute and global length limits to metric attribute values before they are + * exported via the OpenTelemetry reporter, and best-effort detects cases where truncation caused + * two distinct raw attribute maps to collapse to the same transformed output for a given metric + * name (which results in ambiguous exported series at the downstream backend). + * + *

Configuration is prefix-based under {@value + * OpenTelemetryReporterOptions#ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY}. The special key {@code *} + * sets a global limit for all attributes not explicitly listed. + * + *

Semantics of individual limit values: + * + *

+ * + *

Collision detection fires at most once per (name, transformed-attributes) slot when two + * distinct raw attribute maps have been observed for that slot. The tracker stores only 64-bit + * hashes, in a bounded LRU so that memory stays contained when attribute cardinality is high (e.g. + * a per-attempt UUID). The cap is configured via {@link + * OpenTelemetryReporterOptions#COLLISION_TRACKING_MAX_SLOTS}; {@code 0} disables tracking entirely. + * On overflow the least-recently-touched slot is evicted, and a previously warned slot that later + * gets evicted may fire its warning again on re-entry. + * + *

This class is not thread-safe. Callers are expected to serialize calls to {@link + * #transform}. + * + * @see OpenTelemetryReporterOptions#ATTRIBUTE_VALUE_LENGTH_LIMITS + */ +@NotThreadSafe +final class MetricAttributeTransformer { + + private static final Logger LOG = LoggerFactory.getLogger(MetricAttributeTransformer.class); + + static final String GLOBAL_LIMIT_KEY = "*"; + + /** Load factor for the tracking map; matches {@link java.util.HashMap}'s default. */ + private static final float TRACKING_MAP_LOAD_FACTOR = 0.75f; + + /** Upper bound on the tracking map's initial capacity. */ + private static final int TRACKING_MAP_INITIAL_CAPACITY = 1024; + + /** Access-ordered iteration so {@code removeEldestEntry} evicts the true LRU entry. */ + private static final boolean LRU_ACCESS_ORDER = true; + + /** Parsed attribute-to-limit mapping, including {@code *} for the global limit. */ + private final Map attributeValueLimits; + + /** + * Bounded LRU of slot-hash → tracking state, or {@code null} when tracking is disabled. The + * slot-hash encodes (metricName, transformedVariables); {@link SlotState} records the first + * raw-hash observed at that slot and whether the slot has already warned. + */ + @Nullable private final Map slotStates; + + private long collisionCount = 0; + + /** + * Per-slot tracking state. Mutating {@code warned} in place avoids a per-warning allocation; + * the immutable slot-hash key drives LRU ordering regardless. + */ + private static final class SlotState { + final long firstRawHash; + boolean warned; + + SlotState(final long firstRawHash) { + this.firstRawHash = firstRawHash; + } + } + + /** + * Constructs a {@link MetricAttributeTransformer} by reading length-limit entries from the + * supplied {@link MetricConfig}. + */ + MetricAttributeTransformer(final MetricConfig metricConfig) { + this.attributeValueLimits = parseAttributeLimits(metricConfig); + if (!attributeValueLimits.isEmpty()) { + LOG.info( + "Metric attribute transformer is configured with value length limits: {}", + attributeValueLimits); + } + this.slotStates = buildSlotStateMap(metricConfig); + } + + @Nullable + private static Map buildSlotStateMap(final MetricConfig metricConfig) { + final int maxSlots = readCollisionTrackingMaxSlots(metricConfig); + if (maxSlots == 0) { + return null; + } + return new LinkedHashMap<>( + TRACKING_MAP_INITIAL_CAPACITY, TRACKING_MAP_LOAD_FACTOR, LRU_ACCESS_ORDER) { + @Override + protected boolean removeEldestEntry(final Map.Entry eldest) { + return size() > maxSlots; + } + }; + } + + private static int readCollisionTrackingMaxSlots(final MetricConfig metricConfig) { + final int defaultValue = COLLISION_TRACKING_MAX_SLOTS.defaultValue(); + final int parsed; + try { + parsed = metricConfig.getInteger(COLLISION_TRACKING_MAX_SLOTS.key(), defaultValue); + } catch (final NumberFormatException e) { + LOG.warn( + "Skipping invalid format for {} with value: {}; falling back to default {}", + COLLISION_TRACKING_MAX_SLOTS.key(), + metricConfig.get(COLLISION_TRACKING_MAX_SLOTS.key()), + defaultValue, + e); + return defaultValue; + } + if (parsed < 0) { + LOG.warn( + "Ignoring negative value {} for {}; falling back to default {}", + parsed, + COLLISION_TRACKING_MAX_SLOTS.key(), + defaultValue); + return defaultValue; + } + return parsed; + } + + /** + * Applies configured length limits to {@code rawVariables} and best-effort tracks whether the + * resulting (name, transformed-attributes) slot has been seen before with a different raw + * input. Emits at most one WARN per slot. + * + *

If no limits are configured, or if truncation produces no changes, {@code rawVariables} is + * returned unchanged (reference-equal) and no tracking state is touched. + * + * @param metricName the fully-qualified metric name + * @param rawVariables the raw metric attribute map + * @return the transformed attribute map to store alongside the metric + */ + Map transform(final String metricName, final Map rawVariables) { + if (attributeValueLimits.isEmpty()) { + return rawVariables; + } + final Map transformed = getTransformedOrNull(rawVariables); + if (transformed == null) { + // No-op truncation can't produce new collisions. Return the input to avoid retaining + // the freshly-allocated transformed map. + return rawVariables; + } + if (slotStates == null) { + return transformed; + } + + final long slotHash = slotHash(metricName, transformed); + final long rawHash = attributesHash(rawVariables); + final SlotState state = slotStates.get(slotHash); + if (state == null) { + slotStates.put(slotHash, new SlotState(rawHash)); + return transformed; + } + if (state.warned || state.firstRawHash == rawHash) { + return transformed; + } + LOG.warn( + "Possible truncation collision at metric '{}': this registration truncated {} and " + + "collapsed onto transformed attributes {} already seen for a " + + "different raw input. Exported series at this slot may be " + + "ambiguous at the downstream backend.", + metricName, + truncatedEntries(rawVariables, transformed), + transformed); + state.warned = true; + collisionCount++; + return transformed; + } + + /** + * Returns the subset of {@code raw} entries whose value was actually truncated. Dropped + * attributes don't contribute to the collision signature for this warning. + */ + private static Map truncatedEntries( + final Map raw, final Map transformed) { + final Map diff = new HashMap<>(); + for (Map.Entry e : raw.entrySet()) { + final String newValue = transformed.get(e.getKey()); + if (newValue != null && !newValue.equals(e.getValue())) { + diff.put(e.getKey(), e.getValue()); + } + } + return diff; + } + + /** + * @return the cumulative number of collisions observed since construction. + */ + @VisibleForTesting + long getCollisionCount() { + return collisionCount; + } + + @VisibleForTesting + int getTrackedSlotCount() { + return slotStates == null ? 0 : slotStates.size(); + } + + void reset() { + if (slotStates != null) { + slotStates.clear(); + } + collisionCount = 0; + } + + @VisibleForTesting + Map getAttributeValueLimits() { + return attributeValueLimits; + } + + // ----------------------------------------------------------------------- + + @Nullable + private Map getTransformedOrNull(final Map variables) { + final int globalLimit = + attributeValueLimits.getOrDefault(GLOBAL_LIMIT_KEY, Integer.MAX_VALUE); + final Map result = new HashMap<>(); + boolean modified = false; + for (Map.Entry entry : variables.entrySet()) { + final String key = entry.getKey(); + final String value = entry.getValue(); + if (value == null) { + result.put(key, null); + continue; + } + final int configuredLimit = attributeValueLimits.getOrDefault(key, globalLimit); + if (configuredLimit == 0) { + // Drop the attribute entirely (user-requested via 0). + modified = true; + continue; + } + if (configuredLimit > 0 && value.length() > configuredLimit) { + result.put(key, value.substring(0, configuredLimit)); + modified = true; + } else { + // No truncation needed, or negative limit (disables per-attribute truncation). + result.put(key, value); + } + } + return modified ? result : null; + } + + /** + * Reserved attribute key under which the metric name is folded into {@link #slotHash}, so the + * metric name participates in the slot-hash exactly like any other attribute. + */ + private static final String METRIC_NAME_PSEUDO_ATTRIBUTE = "__metric_name__"; + + /** Bit-mask to extend an {@code int} hash into the low 32 bits of a {@code long}. */ + private static final long LOW_32_BITS_MASK = 0xFFFFFFFFL; + + /** + * Order-independent 64-bit hash of (metricName, transformed). Folds the metric name as a + * reserved pseudo-attribute, so it combines through the same logic as real attributes. + */ + private static long slotHash(final String metricName, final Map transformed) { + return attributesHash(transformed) ^ entryHash(METRIC_NAME_PSEUDO_ATTRIBUTE, metricName); + } + + /** Order-independent 64-bit hash of an attribute map. */ + private static long attributesHash(final Map variables) { + long h = 0; + for (final Map.Entry e : variables.entrySet()) { + h ^= entryHash(e.getKey(), e.getValue()); + } + return h; + } + + /** Packs two 32-bit string hashes into one 64-bit value — key in the high half, value low. */ + private static long entryHash(final String key, @Nullable final String value) { + final int valueHash = value != null ? value.hashCode() : 0; + return (((long) key.hashCode()) << 32) ^ (valueHash & LOW_32_BITS_MASK); + } + + private static Map parseAttributeLimits(final MetricConfig config) { + final Map limits = new HashMap<>(); + for (final Object keyObj : config.keySet()) { + if (!(keyObj instanceof String)) { + continue; + } + final String fullKey = (String) keyObj; + if (!fullKey.startsWith(ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY)) { + continue; + } + final String attributeName = + fullKey.substring(ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY.length()); + if (attributeName.isEmpty()) { + LOG.warn( + "Ignoring attribute value length limit with empty attribute name for key: {}", + fullKey); + continue; + } + try { + final int limit = config.getInteger(fullKey, Integer.MAX_VALUE); + limits.put(attributeName, limit); + } catch (final NumberFormatException e) { + LOG.warn( + "Skipping invalid format for attribute length limit with key: {} and value: {}", + fullKey, + config.get(fullKey), + e); + } + } + return limits; + } +} diff --git a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java index 33215ab367840..3ad30aae0109a 100644 --- a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java +++ b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java @@ -105,6 +105,9 @@ public class OpenTelemetryMetricReporter extends OpenTelemetryReporterBase private long exportCompletionTimeoutMillis = OpenTelemetryReporterOptions.EXPORT_COMPLETION_TIMEOUT_MILLIS.defaultValue(); + @GuardedBy("this") + private @Nullable MetricAttributeTransformer transformer; + public OpenTelemetryMetricReporter() { this(Clock.systemUTC()); } @@ -114,6 +117,12 @@ public OpenTelemetryMetricReporter() { this.clock = clock; } + /** + * Initializes the reporter's exporter, batching, and attribute transformer from {@code + * metricConfig}. Must be called before any of {@link #notifyOfAddedMetric}, {@link + * #notifyOfRemovedMetric}, {@link #report}, or {@link #collectAllMetrics}; violating this + * contract will result in an NPE. + */ @Override public void open(final MetricConfig metricConfig) { LOG.info("Starting OpenTelemetryMetricReporter"); @@ -122,6 +131,7 @@ public void open(final MetricConfig metricConfig) { synchronized (this) { exporter = createExporter(metricConfig); configureBatching(metricConfig); + transformer = new MetricAttributeTransformer(metricConfig); } } @@ -156,6 +166,9 @@ protected MetricExporter createExporter(final MetricConfig metricConfig) { @Override public synchronized void close() { + if (transformer != null) { + transformer.reset(); + } if (exporter != null) { exporter.flush(); waitForLastReportToComplete(); @@ -172,17 +185,19 @@ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup gr + "." + metricName; - Map variables = + final Map rawVariables = group.getAllVariables().entrySet().stream() .collect( Collectors.toMap( e -> VariableNameUtil.getVariableName(e.getKey()), Entry::getValue)); - LOG.debug("Adding metric {} with variables {}", metricName, variables); - - MetricMetadata metricMetadata = new MetricMetadata(name, variables); synchronized (this) { + final Map variables = transformer.transform(name, rawVariables); + + LOG.debug("Adding metric {} with variables {}", metricName, variables); + + final MetricMetadata metricMetadata = new MetricMetadata(name, variables); switch (metric.getMetricType()) { case COUNTER: this.counters.put((Counter) metric, metricMetadata); diff --git a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java index f810750b98570..5b2b571104c38 100644 --- a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java +++ b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java @@ -132,6 +132,71 @@ private OpenTelemetryReporterOptions() {} "Timeout in milliseconds for waiting on async export completion.") .build()); + /** Prefix key used to identify attribute value length limit configuration entries. */ + public static final String ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY = + "transform.attribute-value-length-limits."; + + /** + * Config option for attribute value length limits. Only used for documentation purposes — the + * actual option is prefix-based and parsed by {@link MetricAttributeTransformer}. + * + *

For example, to limit the {@code task_name} attribute to 60 characters set {@code + * metrics.reporter.otel.transform.attribute-value-length-limits.task_name: 60} in the Flink + * configuration. The special key {@code *} defines a global limit for all attributes not + * explicitly listed. {@code 0} drops an attribute; negative values disable the limit for that + * attribute (useful to override a global cap). + */ + @PublicEvolving + public static final ConfigOption ATTRIBUTE_VALUE_LENGTH_LIMITS = + ConfigOptions.key(ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + "") + .intType() + .noDefaultValue() + .withDescription( + Description.builder() + .text( + "Limits of the exported attribute values length. Only applies to the metric " + + "reporter; ignored by the trace and event reporters. " + + "Configuration is prefix based, " + + "for example to limit `task_name` attribute set " + + "`metrics.reporter.otel.transform.attribute-value-length-limits.task_name: 60` " + + "in the config for OTel reporter. " + + "A special key '*' can be used to define a global limit for all attributes not " + + "explicitly listed. " + + "For example `metrics.reporter.otel.transform.attribute-value-length-limits.*: 1024` " + + "will limit all attributes to attributeValue.substring(0, 1024). " + + "Global limit defaults to Integer.MAX_VALUE if not set. Individual attribute " + + "limits always override the global limit and are verified by exact match on the " + + "attribute name. " + + "0 can be used to drop an attribute. Negative values are interpreted as no limit " + + "for the attribute (can be used for global limit overrides).") + .build()); + + /** Config key for the collision tracker's maximum size. */ + public static final String COLLISION_TRACKING_MAX_SLOTS_KEY = + "transform.collision-tracking-max-slots"; + + @PublicEvolving + public static final ConfigOption COLLISION_TRACKING_MAX_SLOTS = + ConfigOptions.key(COLLISION_TRACKING_MAX_SLOTS_KEY) + .intType() + .defaultValue(50_000) + .withDescription( + Description.builder() + .text( + "Maximum number of distinct (metric name, transformed attributes) " + + "slots the truncation-collision tracker retains. Only applies to " + + "the metric reporter; ignored by the trace and event reporters. " + + "When the cap is " + + "reached the least-recently-touched slot is evicted (LRU); a " + + "previously warned slot that later gets evicted may fire its warning " + + "again on re-entry. " + + "Only consulted when attribute-value length limits are configured — " + + "if no truncation is happening, there is nothing to track and this " + + "option has no effect. " + + "Set to 0 to disable collision tracking entirely. Malformed or " + + "negative values fall back to the default with a warning in the logs.") + .build()); + @Internal public static void tryConfigureTimeout(MetricConfig metricConfig, Consumer builder) { final String timeoutConfKey = EXPORTER_TIMEOUT.key(); diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java new file mode 100644 index 0000000000000..40c033271c575 --- /dev/null +++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.otel; + +import org.apache.flink.metrics.MetricConfig; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; + +/** Unit tests for {@link MetricAttributeTransformer}. */ +class MetricAttributeTransformerTest { + + private static final String TEST_VALUE = "test-attribute-value"; + + // Attribute that should be truncated to the global limit + private static final String GLOBAL_TRUNCATED_KEY = "global_truncated_key"; + private static final int GLOBAL_LIMIT = 5; + private static final String GLOBAL_TRUNCATED_EXPECTED = TEST_VALUE.substring(0, GLOBAL_LIMIT); + + // Attribute with a per-attribute limit smaller than the global limit + private static final String CUSTOM_SMALL_KEY = "custom_small_key"; + private static final int CUSTOM_SMALL_LIMIT = 3; + private static final String CUSTOM_SMALL_EXPECTED = TEST_VALUE.substring(0, CUSTOM_SMALL_LIMIT); + + // Attribute with a per-attribute limit larger than the global limit + private static final String CUSTOM_LARGE_KEY = "custom_large_key"; + private static final int CUSTOM_LARGE_LIMIT = GLOBAL_LIMIT + CUSTOM_SMALL_LIMIT; // 8 + private static final String CUSTOM_LARGE_EXPECTED = TEST_VALUE.substring(0, CUSTOM_LARGE_LIMIT); + + // Attribute whose value is already shorter than the global limit + private static final String NON_TRUNCATED_KEY = "non_truncated_key"; + private static final String NON_TRUNCATED_VALUE = TEST_VALUE.substring(0, GLOBAL_LIMIT - 1); + + // Attribute with a zero limit (should be dropped) + private static final String ZERO_KEY = "zero_key"; + + // Attribute with a negative limit (no truncation, overrides global) + private static final String NEGATIVE_KEY = "negative_key"; + private static final int NEGATIVE_LIMIT = -GLOBAL_LIMIT; + + // Attribute with an Integer-typed config value — verifies MetricConfig.getInteger tolerates + // non-String values stored directly in the Properties map. + private static final String NUMERIC_KEY = "numeric_key"; + private static final int NUMERIC_LIMIT = 42; + + // Keys with invalid (non-integer) values that should be skipped + private static final String INVALID_STRING_KEY = "invalid_string"; + private static final String INVALID_DURATION_KEY = "invalid_duration"; + + @Test + void testLimitsParsingFromConfiguration() { + final MetricConfig cfg = buildFullTestConfig(); + final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg); + + assertThat(transformer.getAttributeValueLimits()) + .containsEntry(MetricAttributeTransformer.GLOBAL_LIMIT_KEY, GLOBAL_LIMIT) + .containsEntry(CUSTOM_SMALL_KEY, CUSTOM_SMALL_LIMIT) + .containsEntry(CUSTOM_LARGE_KEY, CUSTOM_LARGE_LIMIT) + .containsEntry(ZERO_KEY, 0) + .containsEntry(NEGATIVE_KEY, NEGATIVE_LIMIT) + .containsEntry(NUMERIC_KEY, NUMERIC_LIMIT) + .doesNotContainKey(INVALID_STRING_KEY) + .doesNotContainKey(INVALID_DURATION_KEY); + } + + @Test + void testAttributeValueLengthLimitsParsingAndTruncation() { + final MetricConfig cfg = buildFullTestConfig(); + final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg); + + final Map input = new HashMap<>(); + input.put(CUSTOM_SMALL_KEY, TEST_VALUE); + input.put(CUSTOM_LARGE_KEY, TEST_VALUE); + input.put(GLOBAL_TRUNCATED_KEY, TEST_VALUE); + input.put(NON_TRUNCATED_KEY, NON_TRUNCATED_VALUE); + input.put(NEGATIVE_KEY, TEST_VALUE); + input.put(ZERO_KEY, TEST_VALUE); + + final Map result = transformer.transform("m", input); + + assertThat(result) + .containsEntry(CUSTOM_SMALL_KEY, CUSTOM_SMALL_EXPECTED) + .containsEntry(CUSTOM_LARGE_KEY, CUSTOM_LARGE_EXPECTED) + .containsEntry(GLOBAL_TRUNCATED_KEY, GLOBAL_TRUNCATED_EXPECTED) + .containsEntry(NON_TRUNCATED_KEY, NON_TRUNCATED_VALUE) + .containsEntry(NEGATIVE_KEY, TEST_VALUE) + .doesNotContainKey(ZERO_KEY); + + assertNotSame(input, result); + } + + @Test + void testNoTruncationWhenConfigIsEmpty() { + final MetricConfig cfg = new MetricConfig(); + final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg); + + final Map input = new HashMap<>(); + input.put("k", "v"); + + final Map result = transformer.transform("m", input); + + assertSame(input, result); + } + + @Test + void testTruncationWithoutGlobalLimit() { + final MetricConfig cfg = new MetricConfig(); + cfg.setProperty( + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + + CUSTOM_SMALL_KEY, + String.valueOf(CUSTOM_SMALL_LIMIT)); + + final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg); + + final Map input = new HashMap<>(); + input.put(CUSTOM_SMALL_KEY, TEST_VALUE); + input.put(NON_TRUNCATED_KEY, TEST_VALUE); + + final Map result = transformer.transform("m", input); + + assertThat(result) + .containsEntry(CUSTOM_SMALL_KEY, CUSTOM_SMALL_EXPECTED) + .containsEntry(NON_TRUNCATED_KEY, TEST_VALUE); + } + + @Test + void testEmptyAttributeValueIsPreservedUnderNonZeroLimit() { + // Regression: an empty-string attribute value must survive any non-zero configured limit. + // A naive min(limit, length) computation would collapse to limit=0 for empty values and + // silently drop the attribute, changing exported series identity. + final MetricAttributeTransformer transformer = buildTransformerWithGlobalLimit(100); + final MetricConfig cfgWithPerAttr = new MetricConfig(); + cfgWithPerAttr.setProperty( + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + "k", "50"); + final MetricAttributeTransformer perAttrTransformer = + new MetricAttributeTransformer(cfgWithPerAttr); + + final Map input = new HashMap<>(); + input.put("k", ""); + + assertThat(transformer.transform("m", input)).containsEntry("k", ""); + assertThat(perAttrTransformer.transform("m", input)).containsEntry("k", ""); + } + + @Test + void testNullAttributeValueIsPreserved() { + final MetricAttributeTransformer transformer = buildTransformerWithGlobalLimit(5); + + final Map input = new HashMap<>(); + input.put("null_key", null); + input.put("normal_key", "long_value_to_truncate"); + + final Map result = transformer.transform("m", input); + + assertThat(result).containsEntry("null_key", null).containsEntry("normal_key", "long_"); + assertNotSame(input, result); + } + + @Test + void testNullAttributeValueNoOpWhenOnlyNullsPresent() { + final MetricAttributeTransformer transformer = buildTransformerWithGlobalLimit(5); + + final Map input = new HashMap<>(); + input.put("null_key", null); + + final Map result = transformer.transform("m", input); + + assertSame(input, result); + } + + @Test + void testEmptyAttributeNameIsRejected() { + final MetricConfig cfg = new MetricConfig(); + // A config key with no attribute name suffix (the dot is the last character) must be + // silently rejected; the parsed limits map must not contain an empty-string key. + cfg.setProperty( + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY, "10"); + + final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg); + + assertThat(transformer.getAttributeValueLimits()).doesNotContainKey(""); + } + + @ParameterizedTest + @MethodSource("collisionCases") + void testCollisionSemantics( + final String caseName, final List rawTaskNames, final long expectedCollisions) { + final MetricAttributeTransformer transformer = buildTransformerWithGlobalLimit(3); + + for (final String taskName : rawTaskNames) { + final Map raw = new HashMap<>(); + raw.put("task_name", taskName); + transformer.transform("m", raw); + } + + assertThat(transformer.getCollisionCount()).as(caseName).isEqualTo(expectedCollisions); + } + + static Stream collisionCases() { + return Stream.of( + Arguments.of("single raw — no collision", Arrays.asList("tsk_AAA"), 0L), + Arguments.of( + "identical raws re-registered — no collision", + Arrays.asList("tsk_AAA", "tsk_AAA", "tsk_AAA"), + 0L), + Arguments.of( + "two distinct raws collapsing — one collision", + Arrays.asList("tsk_AAA", "tsk_BBB"), + 1L), + Arguments.of( + "multiple distinct raws at same slot — sentinel caps at one warning", + Arrays.asList("tsk_AAA", "tsk_BBB", "tsk_CCC", "tsk_DDD", "tsk_EEE"), + 1L)); + } + + @Test + void testNoOpTransformDoesNotOccupyTrackingSlot() { + final MetricAttributeTransformer transformer = buildTransformerWithGlobalLimit(100); + + final Map raw = new HashMap<>(); + // Value shorter than the limit — no truncation happens. + raw.put("task_name", "short"); + transformer.transform("m", raw); + + assertThat(transformer.getTrackedSlotCount()) + .as("No-op transformation must not populate the collision tracker") + .isZero(); + } + + @Test + void testSlotTrackingIsBounded() { + // Per-attribute limits: `task_name` truncates; `task_attempt_id` is kept in full (negative + // limit), so its unique per-attempt value survives into the transformed map. Every + // registration therefore lands on a distinct slot — exactly the failover-loop pattern. + final int maxSlots = 32; + final MetricConfig cfg = new MetricConfig(); + final String prefix = OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY; + cfg.setProperty(prefix + "task_name", "3"); + cfg.setProperty(prefix + "task_attempt_id", "-1"); + cfg.setProperty( + OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS_KEY, + String.valueOf(maxSlots)); + final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg); + + for (int i = 0; i < maxSlots * 4; i++) { + final Map raw = new HashMap<>(); + raw.put("task_attempt_id", "attempt-" + i); + raw.put("task_name", "long_task_name_that_truncates"); + transformer.transform("m", raw); + } + + assertThat(transformer.getTrackedSlotCount()) + .as("Slot tracker must saturate at the configured maximum under distinct-slot load") + .isEqualTo(maxSlots); + } + + @Test + void testHotSlotSurvivesLruEviction() { + // Confirms the tracker is access-ordered LRU, not insertion-ordered or arbitrary: + // a slot that keeps being re-observed must outlast cold slots under cap pressure. + final int maxSlots = 16; + final MetricConfig cfg = new MetricConfig(); + final String prefix = OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY; + cfg.setProperty(prefix + "task_name", "3"); // truncate task_name to 3 chars + cfg.setProperty(prefix + "task_attempt_id", "-1"); // keep per-attempt UUID in full + cfg.setProperty( + OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS_KEY, + String.valueOf(maxSlots)); + final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg); + + // Prime the hot slot — "hot_AAA" truncates to "hot". + final Map hotRawA = new HashMap<>(); + hotRawA.put("task_name", "hot_AAA"); + transformer.transform("m", hotRawA); + + // Fill the tracker past capacity with cold, distinct slots. Between each cold + // registration, re-touch the hot slot so it stays MRU. + for (int i = 0; i < maxSlots * 4; i++) { + final Map coldRaw = new HashMap<>(); + coldRaw.put("task_attempt_id", "attempt-" + i); + coldRaw.put("task_name", "long_task_name_that_truncates"); + transformer.transform("m", coldRaw); + transformer.transform("m", hotRawA); // refresh LRU access order on the hot slot + } + + // Register a second raw that collapses onto the same transformed slot as the hot one. + // If the hot slot is still tracked, this fires a collision WARN. If it was evicted + // (i.e., the map is not access-ordered LRU), no collision is recorded — the re-entry + // would just insert a new SlotState. + final Map hotRawB = new HashMap<>(); + hotRawB.put("task_name", "hot_BBB"); + transformer.transform("m", hotRawB); + + assertThat(transformer.getCollisionCount()) + .as("Hot slot must survive LRU eviction and fire a collision on re-entry") + .isEqualTo(1); + } + + @Test + void testNegativeCollisionTrackingMaxSlotsFallsBackToDefault() { + final MetricConfig cfg = new MetricConfig(); + cfg.setProperty( + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + "*", "3"); + cfg.setProperty(OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS_KEY, "-1"); + + // Must not throw — invalid values log a WARN and fall back to the default, same as the + // attribute-limit parser's behavior for malformed entries. + final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg); + + // Tracking is enabled with the default cap — two distinct raws collapsing must still WARN. + final Map firstRaw = new HashMap<>(); + firstRaw.put("task_name", "tsk_AAA"); + transformer.transform("m", firstRaw); + final Map secondRaw = new HashMap<>(); + secondRaw.put("task_name", "tsk_BBB"); + transformer.transform("m", secondRaw); + + assertThat(transformer.getCollisionCount()) + .as("Negative maxSlots must fall back to default, leaving tracking enabled") + .isEqualTo(1); + } + + @Test + void testMalformedCollisionTrackingMaxSlotsFallsBackToDefault() { + final MetricConfig cfg = new MetricConfig(); + cfg.setProperty(OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS_KEY, "not-a-num"); + + // Must not throw — malformed integers log a WARN and fall back to the default. + final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg); + + assertThat(transformer.getTrackedSlotCount()) + .as("Malformed maxSlots must fall back to default, leaving tracking enabled") + .isZero(); + } + + @Test + void testCollisionTrackingCanBeDisabled() { + final MetricConfig cfg = new MetricConfig(); + cfg.setProperty( + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + "*", "3"); + cfg.setProperty(OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS_KEY, "0"); + final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg); + + // Two distinct raws collapsing to the same transformed slot would normally WARN; with + // tracking disabled, nothing is recorded. + for (final String taskName : Arrays.asList("tsk_AAA", "tsk_BBB")) { + final Map raw = new HashMap<>(); + raw.put("task_name", taskName); + transformer.transform("m", raw); + } + + assertThat(transformer.getCollisionCount()) + .as("No collisions must be recorded when tracking is disabled") + .isZero(); + assertThat(transformer.getTrackedSlotCount()) + .as("Tracked slot count must be zero when tracking is disabled") + .isZero(); + } + + private static MetricAttributeTransformer buildTransformerWithGlobalLimit(final int limit) { + final MetricConfig cfg = new MetricConfig(); + cfg.setProperty( + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + "*", + String.valueOf(limit)); + return new MetricAttributeTransformer(cfg); + } + + @Test + void testTruncationWithIntegerConfigValue() { + final MetricConfig cfg = new MetricConfig(); + // Put an Integer object (not a String) to verify that MetricConfig.getInteger handles + // non-String property values correctly. + cfg.put( + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + NUMERIC_KEY, + NUMERIC_LIMIT); + + final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg); + + assertThat(transformer.getAttributeValueLimits()).containsEntry(NUMERIC_KEY, NUMERIC_LIMIT); + } + + // ----------------------------------------------------------------------- + + private static MetricConfig buildFullTestConfig() { + final MetricConfig cfg = new MetricConfig(); + + cfg.setProperty( + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + + MetricAttributeTransformer.GLOBAL_LIMIT_KEY, + String.valueOf(GLOBAL_LIMIT)); + cfg.setProperty( + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + + CUSTOM_SMALL_KEY, + String.valueOf(CUSTOM_SMALL_LIMIT)); + cfg.setProperty( + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + + CUSTOM_LARGE_KEY, + String.valueOf(CUSTOM_LARGE_LIMIT)); + cfg.setProperty( + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + ZERO_KEY, + String.valueOf(0)); + cfg.setProperty( + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + + NEGATIVE_KEY, + String.valueOf(NEGATIVE_LIMIT)); + + // Integer-typed value — should be parsed correctly. + cfg.put( + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + NUMERIC_KEY, + NUMERIC_LIMIT); + + // Non-numeric string — should be skipped. + cfg.setProperty( + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + + INVALID_STRING_KEY, + "3.5"); + + // Non-string value — should be skipped. + cfg.put( + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + + INVALID_DURATION_KEY, + Duration.ofSeconds(1)); + + return cfg; + } +} diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java index e7a96940f9533..582a4c31511ac 100644 --- a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java +++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java @@ -40,8 +40,13 @@ import java.time.Clock; import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static org.assertj.core.api.Assertions.assertThat; @@ -321,6 +326,99 @@ public void testReportWithBatching() throws Exception { }); } + /** + * Exercises all four attribute-limit config semantics in a single end-to-end run, proving that + * the reporter routes the transformer through to the exported OTLP payload: + * + *

+ */ + @Test + public void testAttributeValueTruncation() throws Exception { + final MetricConfig metricConfig = createMetricConfig(); + final String limitsPrefix = + OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY; + metricConfig.setProperty(limitsPrefix + "*", "5"); + metricConfig.setProperty(limitsPrefix + "operator_name", "3"); + metricConfig.setProperty(limitsPrefix + "job_id", "0"); + metricConfig.setProperty(limitsPrefix + "job_name", "-1"); + + final String longTaskName = "task_name_longer_than_five"; + final String longOperatorName = "operator_name_longer_than_three"; + final String longJobId = "job_id_dropped_entirely"; + final String longJobName = "job_name_kept_despite_global_limit"; + + final Map rawVariables = new HashMap<>(); + rawVariables.put("", longTaskName); + rawVariables.put("", longOperatorName); + rawVariables.put("", longJobId); + rawVariables.put("", longJobName); + final MetricGroup group = new TestMetricGroupWithVariables(rawVariables); + + reporter.open(metricConfig); + reporter.notifyOfAddedMetric(new SimpleCounter(), "trunc.counter", group); + reporter.report(); + reporter.close(); + + final String metricName = "flink.logical.scope.trunc.counter"; + eventuallyConsumeJson( + json -> { + assertThat(extractMetricNames(json)).contains(metricName); + + assertThat( + collectAttributeValues( + json, metricName, "/sum/dataPoints", "task_name")) + .as("task_name must be truncated to the global limit (5)") + .containsExactly(longTaskName.substring(0, 5)); + + assertThat( + collectAttributeValues( + json, metricName, "/sum/dataPoints", "operator_name")) + .as("operator_name must be truncated by per-attribute limit (3)") + .containsExactly(longOperatorName.substring(0, 3)); + + assertThat( + collectAttributeValues( + json, metricName, "/sum/dataPoints", "job_id")) + .as("job_id must be dropped entirely (limit 0)") + .isEmpty(); + + assertThat( + collectAttributeValues( + json, metricName, "/sum/dataPoints", "job_name")) + .as( + "job_name must be kept at full length (negative limit overrides global)") + .containsExactly(longJobName); + }); + } + + /** + * Collects the string values of attribute {@code attrKey} on every data point of the first + * metric in {@code json} matching {@code metricName}. + */ + private static List collectAttributeValues( + final JsonNode json, + final String metricName, + final String dataPointsPath, + final String attrKey) { + final List values = new ArrayList<>(); + streamOf(json.findPath("resourceMetrics").findPath("scopeMetrics").findPath("metrics")) + .filter(metric -> metricName.equals(metric.findPath("name").asText())) + .flatMap(metric -> streamOf(metric.at(dataPointsPath))) + .flatMap(dp -> streamOf(dp.findPath("attributes"))) + .filter(attr -> attrKey.equals(attr.findPath("key").asText())) + .forEach(attr -> values.add(attr.at("/value/stringValue").asText())); + return values; + } + + private static Stream streamOf(final JsonNode node) { + return StreamSupport.stream(node.spliterator(), false); + } + static class TestMetricGroup extends UnregisteredMetricsGroup implements LogicalScopeProvider { @Override @@ -338,4 +436,22 @@ public MetricGroup getWrappedMetricGroup() { return this; } } + + /** + * A {@link TestMetricGroup} that additionally exposes a fixed set of variables via {@link + * #getAllVariables()}, enabling tests to verify attribute-level behaviour (e.g. truncation). + */ + static class TestMetricGroupWithVariables extends TestMetricGroup { + + private final Map variables; + + TestMetricGroupWithVariables(final Map variables) { + this.variables = variables; + } + + @Override + public Map getAllVariables() { + return variables; + } + } }