diff --git a/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html b/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
index df9032e4e79cf..3b82439dde078 100644
--- a/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
+++ b/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
@@ -56,5 +56,17 @@
diff --git a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java
new file mode 100644
index 0000000000000..3248d08d94070
--- /dev/null
+++ b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.otel;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.MetricConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY;
+import static org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS;
+
+/**
+ * Applies per-attribute and global length limits to metric attribute values before they are
+ * exported via the OpenTelemetry reporter, and best-effort detects cases where truncation caused
+ * two distinct raw attribute maps to collapse to the same transformed output for a given metric
+ * name (which results in ambiguous exported series at the downstream backend).
+ *
+ * Configuration is prefix-based under {@value
+ * OpenTelemetryReporterOptions#ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY}. The special key {@code *}
+ * sets a global limit for all attributes not explicitly listed.
+ *
+ *
Semantics of individual limit values:
+ *
+ *
+ * - Positive: truncate the attribute value to that many characters.
+ *
- Zero: drop the attribute entirely.
+ *
- Negative: keep the full value (overrides a global cap).
+ *
+ *
+ * Collision detection fires at most once per (name, transformed-attributes) slot when two
+ * distinct raw attribute maps have been observed for that slot. The tracker stores only 64-bit
+ * hashes, in a bounded LRU so that memory stays contained when attribute cardinality is high (e.g.
+ * a per-attempt UUID). The cap is configured via {@link
+ * OpenTelemetryReporterOptions#COLLISION_TRACKING_MAX_SLOTS}; {@code 0} disables tracking entirely.
+ * On overflow the least-recently-touched slot is evicted, and a previously warned slot that later
+ * gets evicted may fire its warning again on re-entry.
+ *
+ *
This class is not thread-safe. Callers are expected to serialize calls to {@link
+ * #transform}.
+ *
+ * @see OpenTelemetryReporterOptions#ATTRIBUTE_VALUE_LENGTH_LIMITS
+ */
+@NotThreadSafe
+final class MetricAttributeTransformer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MetricAttributeTransformer.class);
+
+ static final String GLOBAL_LIMIT_KEY = "*";
+
+ /** Load factor for the tracking map; matches {@link java.util.HashMap}'s default. */
+ private static final float TRACKING_MAP_LOAD_FACTOR = 0.75f;
+
+ /** Upper bound on the tracking map's initial capacity. */
+ private static final int TRACKING_MAP_INITIAL_CAPACITY = 1024;
+
+ /** Access-ordered iteration so {@code removeEldestEntry} evicts the true LRU entry. */
+ private static final boolean LRU_ACCESS_ORDER = true;
+
+ /** Parsed attribute-to-limit mapping, including {@code *} for the global limit. */
+ private final Map attributeValueLimits;
+
+ /**
+ * Bounded LRU of slot-hash → tracking state, or {@code null} when tracking is disabled. The
+ * slot-hash encodes (metricName, transformedVariables); {@link SlotState} records the first
+ * raw-hash observed at that slot and whether the slot has already warned.
+ */
+ @Nullable private final Map slotStates;
+
+ private long collisionCount = 0;
+
+ /**
+ * Per-slot tracking state. Mutating {@code warned} in place avoids a per-warning allocation;
+ * the immutable slot-hash key drives LRU ordering regardless.
+ */
+ private static final class SlotState {
+ final long firstRawHash;
+ boolean warned;
+
+ SlotState(final long firstRawHash) {
+ this.firstRawHash = firstRawHash;
+ }
+ }
+
+ /**
+ * Constructs a {@link MetricAttributeTransformer} by reading length-limit entries from the
+ * supplied {@link MetricConfig}.
+ */
+ MetricAttributeTransformer(final MetricConfig metricConfig) {
+ this.attributeValueLimits = parseAttributeLimits(metricConfig);
+ if (!attributeValueLimits.isEmpty()) {
+ LOG.info(
+ "Metric attribute transformer is configured with value length limits: {}",
+ attributeValueLimits);
+ }
+ this.slotStates = buildSlotStateMap(metricConfig);
+ }
+
+ @Nullable
+ private static Map buildSlotStateMap(final MetricConfig metricConfig) {
+ final int maxSlots = readCollisionTrackingMaxSlots(metricConfig);
+ if (maxSlots == 0) {
+ return null;
+ }
+ return new LinkedHashMap<>(
+ TRACKING_MAP_INITIAL_CAPACITY, TRACKING_MAP_LOAD_FACTOR, LRU_ACCESS_ORDER) {
+ @Override
+ protected boolean removeEldestEntry(final Map.Entry eldest) {
+ return size() > maxSlots;
+ }
+ };
+ }
+
+ private static int readCollisionTrackingMaxSlots(final MetricConfig metricConfig) {
+ final int defaultValue = COLLISION_TRACKING_MAX_SLOTS.defaultValue();
+ final int parsed;
+ try {
+ parsed = metricConfig.getInteger(COLLISION_TRACKING_MAX_SLOTS.key(), defaultValue);
+ } catch (final NumberFormatException e) {
+ LOG.warn(
+ "Skipping invalid format for {} with value: {}; falling back to default {}",
+ COLLISION_TRACKING_MAX_SLOTS.key(),
+ metricConfig.get(COLLISION_TRACKING_MAX_SLOTS.key()),
+ defaultValue,
+ e);
+ return defaultValue;
+ }
+ if (parsed < 0) {
+ LOG.warn(
+ "Ignoring negative value {} for {}; falling back to default {}",
+ parsed,
+ COLLISION_TRACKING_MAX_SLOTS.key(),
+ defaultValue);
+ return defaultValue;
+ }
+ return parsed;
+ }
+
+ /**
+ * Applies configured length limits to {@code rawVariables} and best-effort tracks whether the
+ * resulting (name, transformed-attributes) slot has been seen before with a different raw
+ * input. Emits at most one WARN per slot.
+ *
+ * If no limits are configured, or if truncation produces no changes, {@code rawVariables} is
+ * returned unchanged (reference-equal) and no tracking state is touched.
+ *
+ * @param metricName the fully-qualified metric name
+ * @param rawVariables the raw metric attribute map
+ * @return the transformed attribute map to store alongside the metric
+ */
+ Map transform(final String metricName, final Map rawVariables) {
+ if (attributeValueLimits.isEmpty()) {
+ return rawVariables;
+ }
+ final Map transformed = getTransformedOrNull(rawVariables);
+ if (transformed == null) {
+ // No-op truncation can't produce new collisions. Return the input to avoid retaining
+ // the freshly-allocated transformed map.
+ return rawVariables;
+ }
+ if (slotStates == null) {
+ return transformed;
+ }
+
+ final long slotHash = slotHash(metricName, transformed);
+ final long rawHash = attributesHash(rawVariables);
+ final SlotState state = slotStates.get(slotHash);
+ if (state == null) {
+ slotStates.put(slotHash, new SlotState(rawHash));
+ return transformed;
+ }
+ if (state.warned || state.firstRawHash == rawHash) {
+ return transformed;
+ }
+ LOG.warn(
+ "Possible truncation collision at metric '{}': this registration truncated {} and "
+ + "collapsed onto transformed attributes {} already seen for a "
+ + "different raw input. Exported series at this slot may be "
+ + "ambiguous at the downstream backend.",
+ metricName,
+ truncatedEntries(rawVariables, transformed),
+ transformed);
+ state.warned = true;
+ collisionCount++;
+ return transformed;
+ }
+
+ /**
+ * Returns the subset of {@code raw} entries whose value was actually truncated. Dropped
+ * attributes don't contribute to the collision signature for this warning.
+ */
+ private static Map truncatedEntries(
+ final Map raw, final Map transformed) {
+ final Map diff = new HashMap<>();
+ for (Map.Entry e : raw.entrySet()) {
+ final String newValue = transformed.get(e.getKey());
+ if (newValue != null && !newValue.equals(e.getValue())) {
+ diff.put(e.getKey(), e.getValue());
+ }
+ }
+ return diff;
+ }
+
+ /**
+ * @return the cumulative number of collisions observed since construction.
+ */
+ @VisibleForTesting
+ long getCollisionCount() {
+ return collisionCount;
+ }
+
+ @VisibleForTesting
+ int getTrackedSlotCount() {
+ return slotStates == null ? 0 : slotStates.size();
+ }
+
+ void reset() {
+ if (slotStates != null) {
+ slotStates.clear();
+ }
+ collisionCount = 0;
+ }
+
+ @VisibleForTesting
+ Map getAttributeValueLimits() {
+ return attributeValueLimits;
+ }
+
+ // -----------------------------------------------------------------------
+
+ @Nullable
+ private Map getTransformedOrNull(final Map variables) {
+ final int globalLimit =
+ attributeValueLimits.getOrDefault(GLOBAL_LIMIT_KEY, Integer.MAX_VALUE);
+ final Map result = new HashMap<>();
+ boolean modified = false;
+ for (Map.Entry entry : variables.entrySet()) {
+ final String key = entry.getKey();
+ final String value = entry.getValue();
+ if (value == null) {
+ result.put(key, null);
+ continue;
+ }
+ final int configuredLimit = attributeValueLimits.getOrDefault(key, globalLimit);
+ if (configuredLimit == 0) {
+ // Drop the attribute entirely (user-requested via 0).
+ modified = true;
+ continue;
+ }
+ if (configuredLimit > 0 && value.length() > configuredLimit) {
+ result.put(key, value.substring(0, configuredLimit));
+ modified = true;
+ } else {
+ // No truncation needed, or negative limit (disables per-attribute truncation).
+ result.put(key, value);
+ }
+ }
+ return modified ? result : null;
+ }
+
+ /**
+ * Reserved attribute key under which the metric name is folded into {@link #slotHash}, so the
+ * metric name participates in the slot-hash exactly like any other attribute.
+ */
+ private static final String METRIC_NAME_PSEUDO_ATTRIBUTE = "__metric_name__";
+
+ /** Bit-mask to extend an {@code int} hash into the low 32 bits of a {@code long}. */
+ private static final long LOW_32_BITS_MASK = 0xFFFFFFFFL;
+
+ /**
+ * Order-independent 64-bit hash of (metricName, transformed). Folds the metric name as a
+ * reserved pseudo-attribute, so it combines through the same logic as real attributes.
+ */
+ private static long slotHash(final String metricName, final Map transformed) {
+ return attributesHash(transformed) ^ entryHash(METRIC_NAME_PSEUDO_ATTRIBUTE, metricName);
+ }
+
+ /** Order-independent 64-bit hash of an attribute map. */
+ private static long attributesHash(final Map variables) {
+ long h = 0;
+ for (final Map.Entry e : variables.entrySet()) {
+ h ^= entryHash(e.getKey(), e.getValue());
+ }
+ return h;
+ }
+
+ /** Packs two 32-bit string hashes into one 64-bit value — key in the high half, value low. */
+ private static long entryHash(final String key, @Nullable final String value) {
+ final int valueHash = value != null ? value.hashCode() : 0;
+ return (((long) key.hashCode()) << 32) ^ (valueHash & LOW_32_BITS_MASK);
+ }
+
+ private static Map parseAttributeLimits(final MetricConfig config) {
+ final Map limits = new HashMap<>();
+ for (final Object keyObj : config.keySet()) {
+ if (!(keyObj instanceof String)) {
+ continue;
+ }
+ final String fullKey = (String) keyObj;
+ if (!fullKey.startsWith(ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY)) {
+ continue;
+ }
+ final String attributeName =
+ fullKey.substring(ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY.length());
+ if (attributeName.isEmpty()) {
+ LOG.warn(
+ "Ignoring attribute value length limit with empty attribute name for key: {}",
+ fullKey);
+ continue;
+ }
+ try {
+ final int limit = config.getInteger(fullKey, Integer.MAX_VALUE);
+ limits.put(attributeName, limit);
+ } catch (final NumberFormatException e) {
+ LOG.warn(
+ "Skipping invalid format for attribute length limit with key: {} and value: {}",
+ fullKey,
+ config.get(fullKey),
+ e);
+ }
+ }
+ return limits;
+ }
+}
diff --git a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
index 33215ab367840..3ad30aae0109a 100644
--- a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
+++ b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
@@ -105,6 +105,9 @@ public class OpenTelemetryMetricReporter extends OpenTelemetryReporterBase
private long exportCompletionTimeoutMillis =
OpenTelemetryReporterOptions.EXPORT_COMPLETION_TIMEOUT_MILLIS.defaultValue();
+ @GuardedBy("this")
+ private @Nullable MetricAttributeTransformer transformer;
+
public OpenTelemetryMetricReporter() {
this(Clock.systemUTC());
}
@@ -114,6 +117,12 @@ public OpenTelemetryMetricReporter() {
this.clock = clock;
}
+ /**
+ * Initializes the reporter's exporter, batching, and attribute transformer from {@code
+ * metricConfig}. Must be called before any of {@link #notifyOfAddedMetric}, {@link
+ * #notifyOfRemovedMetric}, {@link #report}, or {@link #collectAllMetrics}; violating this
+ * contract will result in an NPE.
+ */
@Override
public void open(final MetricConfig metricConfig) {
LOG.info("Starting OpenTelemetryMetricReporter");
@@ -122,6 +131,7 @@ public void open(final MetricConfig metricConfig) {
synchronized (this) {
exporter = createExporter(metricConfig);
configureBatching(metricConfig);
+ transformer = new MetricAttributeTransformer(metricConfig);
}
}
@@ -156,6 +166,9 @@ protected MetricExporter createExporter(final MetricConfig metricConfig) {
@Override
public synchronized void close() {
+ if (transformer != null) {
+ transformer.reset();
+ }
if (exporter != null) {
exporter.flush();
waitForLastReportToComplete();
@@ -172,17 +185,19 @@ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup gr
+ "."
+ metricName;
- Map variables =
+ final Map rawVariables =
group.getAllVariables().entrySet().stream()
.collect(
Collectors.toMap(
e -> VariableNameUtil.getVariableName(e.getKey()),
Entry::getValue));
- LOG.debug("Adding metric {} with variables {}", metricName, variables);
-
- MetricMetadata metricMetadata = new MetricMetadata(name, variables);
synchronized (this) {
+ final Map variables = transformer.transform(name, rawVariables);
+
+ LOG.debug("Adding metric {} with variables {}", metricName, variables);
+
+ final MetricMetadata metricMetadata = new MetricMetadata(name, variables);
switch (metric.getMetricType()) {
case COUNTER:
this.counters.put((Counter) metric, metricMetadata);
diff --git a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
index f810750b98570..5b2b571104c38 100644
--- a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
+++ b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
@@ -132,6 +132,71 @@ private OpenTelemetryReporterOptions() {}
"Timeout in milliseconds for waiting on async export completion.")
.build());
+ /** Prefix key used to identify attribute value length limit configuration entries. */
+ public static final String ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY =
+ "transform.attribute-value-length-limits.";
+
+ /**
+ * Config option for attribute value length limits. Only used for documentation purposes — the
+ * actual option is prefix-based and parsed by {@link MetricAttributeTransformer}.
+ *
+ * For example, to limit the {@code task_name} attribute to 60 characters set {@code
+ * metrics.reporter.otel.transform.attribute-value-length-limits.task_name: 60} in the Flink
+ * configuration. The special key {@code *} defines a global limit for all attributes not
+ * explicitly listed. {@code 0} drops an attribute; negative values disable the limit for that
+ * attribute (useful to override a global cap).
+ */
+ @PublicEvolving
+ public static final ConfigOption ATTRIBUTE_VALUE_LENGTH_LIMITS =
+ ConfigOptions.key(ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + "")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Limits of the exported attribute values length. Only applies to the metric "
+ + "reporter; ignored by the trace and event reporters. "
+ + "Configuration is prefix based, "
+ + "for example to limit `task_name` attribute set "
+ + "`metrics.reporter.otel.transform.attribute-value-length-limits.task_name: 60` "
+ + "in the config for OTel reporter. "
+ + "A special key '*' can be used to define a global limit for all attributes not "
+ + "explicitly listed. "
+ + "For example `metrics.reporter.otel.transform.attribute-value-length-limits.*: 1024` "
+ + "will limit all attributes to attributeValue.substring(0, 1024). "
+ + "Global limit defaults to Integer.MAX_VALUE if not set. Individual attribute "
+ + "limits always override the global limit and are verified by exact match on the "
+ + "attribute name. "
+ + "0 can be used to drop an attribute. Negative values are interpreted as no limit "
+ + "for the attribute (can be used for global limit overrides).")
+ .build());
+
+ /** Config key for the collision tracker's maximum size. */
+ public static final String COLLISION_TRACKING_MAX_SLOTS_KEY =
+ "transform.collision-tracking-max-slots";
+
+ @PublicEvolving
+ public static final ConfigOption COLLISION_TRACKING_MAX_SLOTS =
+ ConfigOptions.key(COLLISION_TRACKING_MAX_SLOTS_KEY)
+ .intType()
+ .defaultValue(50_000)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Maximum number of distinct (metric name, transformed attributes) "
+ + "slots the truncation-collision tracker retains. Only applies to "
+ + "the metric reporter; ignored by the trace and event reporters. "
+ + "When the cap is "
+ + "reached the least-recently-touched slot is evicted (LRU); a "
+ + "previously warned slot that later gets evicted may fire its warning "
+ + "again on re-entry. "
+ + "Only consulted when attribute-value length limits are configured — "
+ + "if no truncation is happening, there is nothing to track and this "
+ + "option has no effect. "
+ + "Set to 0 to disable collision tracking entirely. Malformed or "
+ + "negative values fall back to the default with a warning in the logs.")
+ .build());
+
@Internal
public static void tryConfigureTimeout(MetricConfig metricConfig, Consumer builder) {
final String timeoutConfKey = EXPORTER_TIMEOUT.key();
diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java
new file mode 100644
index 0000000000000..40c033271c575
--- /dev/null
+++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.otel;
+
+import org.apache.flink.metrics.MetricConfig;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+/** Unit tests for {@link MetricAttributeTransformer}. */
+class MetricAttributeTransformerTest {
+
+ private static final String TEST_VALUE = "test-attribute-value";
+
+ // Attribute that should be truncated to the global limit
+ private static final String GLOBAL_TRUNCATED_KEY = "global_truncated_key";
+ private static final int GLOBAL_LIMIT = 5;
+ private static final String GLOBAL_TRUNCATED_EXPECTED = TEST_VALUE.substring(0, GLOBAL_LIMIT);
+
+ // Attribute with a per-attribute limit smaller than the global limit
+ private static final String CUSTOM_SMALL_KEY = "custom_small_key";
+ private static final int CUSTOM_SMALL_LIMIT = 3;
+ private static final String CUSTOM_SMALL_EXPECTED = TEST_VALUE.substring(0, CUSTOM_SMALL_LIMIT);
+
+ // Attribute with a per-attribute limit larger than the global limit
+ private static final String CUSTOM_LARGE_KEY = "custom_large_key";
+ private static final int CUSTOM_LARGE_LIMIT = GLOBAL_LIMIT + CUSTOM_SMALL_LIMIT; // 8
+ private static final String CUSTOM_LARGE_EXPECTED = TEST_VALUE.substring(0, CUSTOM_LARGE_LIMIT);
+
+ // Attribute whose value is already shorter than the global limit
+ private static final String NON_TRUNCATED_KEY = "non_truncated_key";
+ private static final String NON_TRUNCATED_VALUE = TEST_VALUE.substring(0, GLOBAL_LIMIT - 1);
+
+ // Attribute with a zero limit (should be dropped)
+ private static final String ZERO_KEY = "zero_key";
+
+ // Attribute with a negative limit (no truncation, overrides global)
+ private static final String NEGATIVE_KEY = "negative_key";
+ private static final int NEGATIVE_LIMIT = -GLOBAL_LIMIT;
+
+ // Attribute with an Integer-typed config value — verifies MetricConfig.getInteger tolerates
+ // non-String values stored directly in the Properties map.
+ private static final String NUMERIC_KEY = "numeric_key";
+ private static final int NUMERIC_LIMIT = 42;
+
+ // Keys with invalid (non-integer) values that should be skipped
+ private static final String INVALID_STRING_KEY = "invalid_string";
+ private static final String INVALID_DURATION_KEY = "invalid_duration";
+
+ @Test
+ void testLimitsParsingFromConfiguration() {
+ final MetricConfig cfg = buildFullTestConfig();
+ final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg);
+
+ assertThat(transformer.getAttributeValueLimits())
+ .containsEntry(MetricAttributeTransformer.GLOBAL_LIMIT_KEY, GLOBAL_LIMIT)
+ .containsEntry(CUSTOM_SMALL_KEY, CUSTOM_SMALL_LIMIT)
+ .containsEntry(CUSTOM_LARGE_KEY, CUSTOM_LARGE_LIMIT)
+ .containsEntry(ZERO_KEY, 0)
+ .containsEntry(NEGATIVE_KEY, NEGATIVE_LIMIT)
+ .containsEntry(NUMERIC_KEY, NUMERIC_LIMIT)
+ .doesNotContainKey(INVALID_STRING_KEY)
+ .doesNotContainKey(INVALID_DURATION_KEY);
+ }
+
+ @Test
+ void testAttributeValueLengthLimitsParsingAndTruncation() {
+ final MetricConfig cfg = buildFullTestConfig();
+ final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg);
+
+ final Map input = new HashMap<>();
+ input.put(CUSTOM_SMALL_KEY, TEST_VALUE);
+ input.put(CUSTOM_LARGE_KEY, TEST_VALUE);
+ input.put(GLOBAL_TRUNCATED_KEY, TEST_VALUE);
+ input.put(NON_TRUNCATED_KEY, NON_TRUNCATED_VALUE);
+ input.put(NEGATIVE_KEY, TEST_VALUE);
+ input.put(ZERO_KEY, TEST_VALUE);
+
+ final Map result = transformer.transform("m", input);
+
+ assertThat(result)
+ .containsEntry(CUSTOM_SMALL_KEY, CUSTOM_SMALL_EXPECTED)
+ .containsEntry(CUSTOM_LARGE_KEY, CUSTOM_LARGE_EXPECTED)
+ .containsEntry(GLOBAL_TRUNCATED_KEY, GLOBAL_TRUNCATED_EXPECTED)
+ .containsEntry(NON_TRUNCATED_KEY, NON_TRUNCATED_VALUE)
+ .containsEntry(NEGATIVE_KEY, TEST_VALUE)
+ .doesNotContainKey(ZERO_KEY);
+
+ assertNotSame(input, result);
+ }
+
+ @Test
+ void testNoTruncationWhenConfigIsEmpty() {
+ final MetricConfig cfg = new MetricConfig();
+ final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg);
+
+ final Map input = new HashMap<>();
+ input.put("k", "v");
+
+ final Map result = transformer.transform("m", input);
+
+ assertSame(input, result);
+ }
+
+ @Test
+ void testTruncationWithoutGlobalLimit() {
+ final MetricConfig cfg = new MetricConfig();
+ cfg.setProperty(
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY
+ + CUSTOM_SMALL_KEY,
+ String.valueOf(CUSTOM_SMALL_LIMIT));
+
+ final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg);
+
+ final Map input = new HashMap<>();
+ input.put(CUSTOM_SMALL_KEY, TEST_VALUE);
+ input.put(NON_TRUNCATED_KEY, TEST_VALUE);
+
+ final Map result = transformer.transform("m", input);
+
+ assertThat(result)
+ .containsEntry(CUSTOM_SMALL_KEY, CUSTOM_SMALL_EXPECTED)
+ .containsEntry(NON_TRUNCATED_KEY, TEST_VALUE);
+ }
+
+ @Test
+ void testEmptyAttributeValueIsPreservedUnderNonZeroLimit() {
+ // Regression: an empty-string attribute value must survive any non-zero configured limit.
+ // A naive min(limit, length) computation would collapse to limit=0 for empty values and
+ // silently drop the attribute, changing exported series identity.
+ final MetricAttributeTransformer transformer = buildTransformerWithGlobalLimit(100);
+ final MetricConfig cfgWithPerAttr = new MetricConfig();
+ cfgWithPerAttr.setProperty(
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + "k", "50");
+ final MetricAttributeTransformer perAttrTransformer =
+ new MetricAttributeTransformer(cfgWithPerAttr);
+
+ final Map input = new HashMap<>();
+ input.put("k", "");
+
+ assertThat(transformer.transform("m", input)).containsEntry("k", "");
+ assertThat(perAttrTransformer.transform("m", input)).containsEntry("k", "");
+ }
+
+ @Test
+ void testNullAttributeValueIsPreserved() {
+ final MetricAttributeTransformer transformer = buildTransformerWithGlobalLimit(5);
+
+ final Map input = new HashMap<>();
+ input.put("null_key", null);
+ input.put("normal_key", "long_value_to_truncate");
+
+ final Map result = transformer.transform("m", input);
+
+ assertThat(result).containsEntry("null_key", null).containsEntry("normal_key", "long_");
+ assertNotSame(input, result);
+ }
+
+ @Test
+ void testNullAttributeValueNoOpWhenOnlyNullsPresent() {
+ final MetricAttributeTransformer transformer = buildTransformerWithGlobalLimit(5);
+
+ final Map input = new HashMap<>();
+ input.put("null_key", null);
+
+ final Map result = transformer.transform("m", input);
+
+ assertSame(input, result);
+ }
+
+ @Test
+ void testEmptyAttributeNameIsRejected() {
+ final MetricConfig cfg = new MetricConfig();
+ // A config key with no attribute name suffix (the dot is the last character) must be
+ // silently rejected; the parsed limits map must not contain an empty-string key.
+ cfg.setProperty(
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY, "10");
+
+ final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg);
+
+ assertThat(transformer.getAttributeValueLimits()).doesNotContainKey("");
+ }
+
+ @ParameterizedTest
+ @MethodSource("collisionCases")
+ void testCollisionSemantics(
+ final String caseName, final List rawTaskNames, final long expectedCollisions) {
+ final MetricAttributeTransformer transformer = buildTransformerWithGlobalLimit(3);
+
+ for (final String taskName : rawTaskNames) {
+ final Map raw = new HashMap<>();
+ raw.put("task_name", taskName);
+ transformer.transform("m", raw);
+ }
+
+ assertThat(transformer.getCollisionCount()).as(caseName).isEqualTo(expectedCollisions);
+ }
+
+ static Stream collisionCases() {
+ return Stream.of(
+ Arguments.of("single raw — no collision", Arrays.asList("tsk_AAA"), 0L),
+ Arguments.of(
+ "identical raws re-registered — no collision",
+ Arrays.asList("tsk_AAA", "tsk_AAA", "tsk_AAA"),
+ 0L),
+ Arguments.of(
+ "two distinct raws collapsing — one collision",
+ Arrays.asList("tsk_AAA", "tsk_BBB"),
+ 1L),
+ Arguments.of(
+ "multiple distinct raws at same slot — sentinel caps at one warning",
+ Arrays.asList("tsk_AAA", "tsk_BBB", "tsk_CCC", "tsk_DDD", "tsk_EEE"),
+ 1L));
+ }
+
+ @Test
+ void testNoOpTransformDoesNotOccupyTrackingSlot() {
+ final MetricAttributeTransformer transformer = buildTransformerWithGlobalLimit(100);
+
+ final Map raw = new HashMap<>();
+ // Value shorter than the limit — no truncation happens.
+ raw.put("task_name", "short");
+ transformer.transform("m", raw);
+
+ assertThat(transformer.getTrackedSlotCount())
+ .as("No-op transformation must not populate the collision tracker")
+ .isZero();
+ }
+
+ @Test
+ void testSlotTrackingIsBounded() {
+ // Per-attribute limits: `task_name` truncates; `task_attempt_id` is kept in full (negative
+ // limit), so its unique per-attempt value survives into the transformed map. Every
+ // registration therefore lands on a distinct slot — exactly the failover-loop pattern.
+ final int maxSlots = 32;
+ final MetricConfig cfg = new MetricConfig();
+ final String prefix = OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY;
+ cfg.setProperty(prefix + "task_name", "3");
+ cfg.setProperty(prefix + "task_attempt_id", "-1");
+ cfg.setProperty(
+ OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS_KEY,
+ String.valueOf(maxSlots));
+ final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg);
+
+ for (int i = 0; i < maxSlots * 4; i++) {
+ final Map raw = new HashMap<>();
+ raw.put("task_attempt_id", "attempt-" + i);
+ raw.put("task_name", "long_task_name_that_truncates");
+ transformer.transform("m", raw);
+ }
+
+ assertThat(transformer.getTrackedSlotCount())
+ .as("Slot tracker must saturate at the configured maximum under distinct-slot load")
+ .isEqualTo(maxSlots);
+ }
+
+ @Test
+ void testHotSlotSurvivesLruEviction() {
+ // Confirms the tracker is access-ordered LRU, not insertion-ordered or arbitrary:
+ // a slot that keeps being re-observed must outlast cold slots under cap pressure.
+ final int maxSlots = 16;
+ final MetricConfig cfg = new MetricConfig();
+ final String prefix = OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY;
+ cfg.setProperty(prefix + "task_name", "3"); // truncate task_name to 3 chars
+ cfg.setProperty(prefix + "task_attempt_id", "-1"); // keep per-attempt UUID in full
+ cfg.setProperty(
+ OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS_KEY,
+ String.valueOf(maxSlots));
+ final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg);
+
+ // Prime the hot slot — "hot_AAA" truncates to "hot".
+ final Map hotRawA = new HashMap<>();
+ hotRawA.put("task_name", "hot_AAA");
+ transformer.transform("m", hotRawA);
+
+ // Fill the tracker past capacity with cold, distinct slots. Between each cold
+ // registration, re-touch the hot slot so it stays MRU.
+ for (int i = 0; i < maxSlots * 4; i++) {
+ final Map coldRaw = new HashMap<>();
+ coldRaw.put("task_attempt_id", "attempt-" + i);
+ coldRaw.put("task_name", "long_task_name_that_truncates");
+ transformer.transform("m", coldRaw);
+ transformer.transform("m", hotRawA); // refresh LRU access order on the hot slot
+ }
+
+ // Register a second raw that collapses onto the same transformed slot as the hot one.
+ // If the hot slot is still tracked, this fires a collision WARN. If it was evicted
+ // (i.e., the map is not access-ordered LRU), no collision is recorded — the re-entry
+ // would just insert a new SlotState.
+ final Map hotRawB = new HashMap<>();
+ hotRawB.put("task_name", "hot_BBB");
+ transformer.transform("m", hotRawB);
+
+ assertThat(transformer.getCollisionCount())
+ .as("Hot slot must survive LRU eviction and fire a collision on re-entry")
+ .isEqualTo(1);
+ }
+
+ @Test
+ void testNegativeCollisionTrackingMaxSlotsFallsBackToDefault() {
+ final MetricConfig cfg = new MetricConfig();
+ cfg.setProperty(
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + "*", "3");
+ cfg.setProperty(OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS_KEY, "-1");
+
+ // Must not throw — invalid values log a WARN and fall back to the default, same as the
+ // attribute-limit parser's behavior for malformed entries.
+ final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg);
+
+ // Tracking is enabled with the default cap — two distinct raws collapsing must still WARN.
+ final Map firstRaw = new HashMap<>();
+ firstRaw.put("task_name", "tsk_AAA");
+ transformer.transform("m", firstRaw);
+ final Map secondRaw = new HashMap<>();
+ secondRaw.put("task_name", "tsk_BBB");
+ transformer.transform("m", secondRaw);
+
+ assertThat(transformer.getCollisionCount())
+ .as("Negative maxSlots must fall back to default, leaving tracking enabled")
+ .isEqualTo(1);
+ }
+
+ @Test
+ void testMalformedCollisionTrackingMaxSlotsFallsBackToDefault() {
+ final MetricConfig cfg = new MetricConfig();
+ cfg.setProperty(OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS_KEY, "not-a-num");
+
+ // Must not throw — malformed integers log a WARN and fall back to the default.
+ final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg);
+
+ assertThat(transformer.getTrackedSlotCount())
+ .as("Malformed maxSlots must fall back to default, leaving tracking enabled")
+ .isZero();
+ }
+
+ @Test
+ void testCollisionTrackingCanBeDisabled() {
+ final MetricConfig cfg = new MetricConfig();
+ cfg.setProperty(
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + "*", "3");
+ cfg.setProperty(OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS_KEY, "0");
+ final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg);
+
+ // Two distinct raws collapsing to the same transformed slot would normally WARN; with
+ // tracking disabled, nothing is recorded.
+ for (final String taskName : Arrays.asList("tsk_AAA", "tsk_BBB")) {
+ final Map raw = new HashMap<>();
+ raw.put("task_name", taskName);
+ transformer.transform("m", raw);
+ }
+
+ assertThat(transformer.getCollisionCount())
+ .as("No collisions must be recorded when tracking is disabled")
+ .isZero();
+ assertThat(transformer.getTrackedSlotCount())
+ .as("Tracked slot count must be zero when tracking is disabled")
+ .isZero();
+ }
+
+ private static MetricAttributeTransformer buildTransformerWithGlobalLimit(final int limit) {
+ final MetricConfig cfg = new MetricConfig();
+ cfg.setProperty(
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + "*",
+ String.valueOf(limit));
+ return new MetricAttributeTransformer(cfg);
+ }
+
+ @Test
+ void testTruncationWithIntegerConfigValue() {
+ final MetricConfig cfg = new MetricConfig();
+ // Put an Integer object (not a String) to verify that MetricConfig.getInteger handles
+ // non-String property values correctly.
+ cfg.put(
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + NUMERIC_KEY,
+ NUMERIC_LIMIT);
+
+ final MetricAttributeTransformer transformer = new MetricAttributeTransformer(cfg);
+
+ assertThat(transformer.getAttributeValueLimits()).containsEntry(NUMERIC_KEY, NUMERIC_LIMIT);
+ }
+
+ // -----------------------------------------------------------------------
+
+ private static MetricConfig buildFullTestConfig() {
+ final MetricConfig cfg = new MetricConfig();
+
+ cfg.setProperty(
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY
+ + MetricAttributeTransformer.GLOBAL_LIMIT_KEY,
+ String.valueOf(GLOBAL_LIMIT));
+ cfg.setProperty(
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY
+ + CUSTOM_SMALL_KEY,
+ String.valueOf(CUSTOM_SMALL_LIMIT));
+ cfg.setProperty(
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY
+ + CUSTOM_LARGE_KEY,
+ String.valueOf(CUSTOM_LARGE_LIMIT));
+ cfg.setProperty(
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + ZERO_KEY,
+ String.valueOf(0));
+ cfg.setProperty(
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY
+ + NEGATIVE_KEY,
+ String.valueOf(NEGATIVE_LIMIT));
+
+ // Integer-typed value — should be parsed correctly.
+ cfg.put(
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + NUMERIC_KEY,
+ NUMERIC_LIMIT);
+
+ // Non-numeric string — should be skipped.
+ cfg.setProperty(
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY
+ + INVALID_STRING_KEY,
+ "3.5");
+
+ // Non-string value — should be skipped.
+ cfg.put(
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY
+ + INVALID_DURATION_KEY,
+ Duration.ofSeconds(1));
+
+ return cfg;
+ }
+}
diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java
index e7a96940f9533..582a4c31511ac 100644
--- a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java
+++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java
@@ -40,8 +40,13 @@
import java.time.Clock;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import static org.assertj.core.api.Assertions.assertThat;
@@ -321,6 +326,99 @@ public void testReportWithBatching() throws Exception {
});
}
+ /**
+ * Exercises all four attribute-limit config semantics in a single end-to-end run, proving that
+ * the reporter routes the transformer through to the exported OTLP payload:
+ *
+ *
+ * - {@code task_name} — truncated by the global limit (5).
+ *
- {@code operator_name} — truncated by a per-attribute limit (3) overriding the global.
+ *
- {@code job_id} — dropped entirely via limit {@code 0}.
+ *
- {@code job_name} — kept at full length via limit {@code -1}, overriding the global.
+ *
+ */
+ @Test
+ public void testAttributeValueTruncation() throws Exception {
+ final MetricConfig metricConfig = createMetricConfig();
+ final String limitsPrefix =
+ OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY;
+ metricConfig.setProperty(limitsPrefix + "*", "5");
+ metricConfig.setProperty(limitsPrefix + "operator_name", "3");
+ metricConfig.setProperty(limitsPrefix + "job_id", "0");
+ metricConfig.setProperty(limitsPrefix + "job_name", "-1");
+
+ final String longTaskName = "task_name_longer_than_five";
+ final String longOperatorName = "operator_name_longer_than_three";
+ final String longJobId = "job_id_dropped_entirely";
+ final String longJobName = "job_name_kept_despite_global_limit";
+
+ final Map rawVariables = new HashMap<>();
+ rawVariables.put("", longTaskName);
+ rawVariables.put("", longOperatorName);
+ rawVariables.put("", longJobId);
+ rawVariables.put("", longJobName);
+ final MetricGroup group = new TestMetricGroupWithVariables(rawVariables);
+
+ reporter.open(metricConfig);
+ reporter.notifyOfAddedMetric(new SimpleCounter(), "trunc.counter", group);
+ reporter.report();
+ reporter.close();
+
+ final String metricName = "flink.logical.scope.trunc.counter";
+ eventuallyConsumeJson(
+ json -> {
+ assertThat(extractMetricNames(json)).contains(metricName);
+
+ assertThat(
+ collectAttributeValues(
+ json, metricName, "/sum/dataPoints", "task_name"))
+ .as("task_name must be truncated to the global limit (5)")
+ .containsExactly(longTaskName.substring(0, 5));
+
+ assertThat(
+ collectAttributeValues(
+ json, metricName, "/sum/dataPoints", "operator_name"))
+ .as("operator_name must be truncated by per-attribute limit (3)")
+ .containsExactly(longOperatorName.substring(0, 3));
+
+ assertThat(
+ collectAttributeValues(
+ json, metricName, "/sum/dataPoints", "job_id"))
+ .as("job_id must be dropped entirely (limit 0)")
+ .isEmpty();
+
+ assertThat(
+ collectAttributeValues(
+ json, metricName, "/sum/dataPoints", "job_name"))
+ .as(
+ "job_name must be kept at full length (negative limit overrides global)")
+ .containsExactly(longJobName);
+ });
+ }
+
+ /**
+ * Collects the string values of attribute {@code attrKey} on every data point of the first
+ * metric in {@code json} matching {@code metricName}.
+ */
+ private static List collectAttributeValues(
+ final JsonNode json,
+ final String metricName,
+ final String dataPointsPath,
+ final String attrKey) {
+ final List values = new ArrayList<>();
+ streamOf(json.findPath("resourceMetrics").findPath("scopeMetrics").findPath("metrics"))
+ .filter(metric -> metricName.equals(metric.findPath("name").asText()))
+ .flatMap(metric -> streamOf(metric.at(dataPointsPath)))
+ .flatMap(dp -> streamOf(dp.findPath("attributes")))
+ .filter(attr -> attrKey.equals(attr.findPath("key").asText()))
+ .forEach(attr -> values.add(attr.at("/value/stringValue").asText()));
+ return values;
+ }
+
+ private static Stream streamOf(final JsonNode node) {
+ return StreamSupport.stream(node.spliterator(), false);
+ }
+
static class TestMetricGroup extends UnregisteredMetricsGroup implements LogicalScopeProvider {
@Override
@@ -338,4 +436,22 @@ public MetricGroup getWrappedMetricGroup() {
return this;
}
}
+
+ /**
+ * A {@link TestMetricGroup} that additionally exposes a fixed set of variables via {@link
+ * #getAllVariables()}, enabling tests to verify attribute-level behaviour (e.g. truncation).
+ */
+ static class TestMetricGroupWithVariables extends TestMetricGroup {
+
+ private final Map variables;
+
+ TestMetricGroupWithVariables(final Map variables) {
+ this.variables = variables;
+ }
+
+ @Override
+ public Map getAllVariables() {
+ return variables;
+ }
+ }
}