diff --git a/plugins/out_opentelemetry/CMakeLists.txt b/plugins/out_opentelemetry/CMakeLists.txt index f3a050c9c9d..bc9f299cf91 100644 --- a/plugins/out_opentelemetry/CMakeLists.txt +++ b/plugins/out_opentelemetry/CMakeLists.txt @@ -3,6 +3,7 @@ set(src opentelemetry_logs.c opentelemetry_utils.c opentelemetry_conf.c + opentelemetry_metadata.c ) FLB_PLUGIN(out_opentelemetry "${src}" "") diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index 0b795215eab..36ab6d1a18b 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -54,6 +54,7 @@ extern void cmt_encode_opentelemetry_destroy(cfl_sds_t text); #include "opentelemetry.h" #include "opentelemetry_conf.h" +#include "opentelemetry_metadata.h" #include "opentelemetry_utils.h" static int is_http_status_code_retrayable(int http_code) @@ -208,6 +209,8 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx, struct flb_config_map_val *mv; struct flb_http_client *c; flb_sds_t signature = NULL; + flb_sds_t meta_token = NULL; + flb_sds_t meta_token_view = NULL; compressed = FLB_FALSE; @@ -340,7 +343,41 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx, /* Map debug callbacks */ flb_http_client_debug(c, ctx->ins->callback); - ret = flb_http_do_with_oauth2(c, &b_sent, ctx->oauth2_ctx); + /* Metadata token: inject Bearer directly, handle 401 via invalidate+retry + * (the metadata endpoint is GET-only, so oauth2 POST retry would fail) */ + if (ctx->metadata_token_url && ctx->oauth2_ctx != NULL) { + /* Hold mutex across get+copy to prevent concurrent refresh free */ + pthread_mutex_lock(&ctx->metadata_mutex); + if (flb_oauth2_get_access_token(ctx->oauth2_ctx, + &meta_token_view, FLB_FALSE) != 0 + || meta_token_view == NULL) { + pthread_mutex_unlock(&ctx->metadata_mutex); + flb_plg_error(ctx->ins, "metadata: failed to get access token"); + out_ret = FLB_RETRY; + goto cleanup; + } + meta_token = flb_sds_create(meta_token_view); + pthread_mutex_unlock(&ctx->metadata_mutex); + if (!meta_token) { + flb_plg_error(ctx->ins, "metadata: out of memory for access token"); + out_ret = FLB_RETRY; + goto cleanup; + } + if (flb_http_bearer_auth(c, meta_token) != 0) { + flb_plg_error(ctx->ins, "metadata: failed to set bearer auth"); + out_ret = FLB_RETRY; + goto cleanup; + } + ret = flb_http_do(c, &b_sent); + if (ret == 0 && c->resp.status == 401) { + flb_oauth2_invalidate_token(ctx->oauth2_ctx); + out_ret = FLB_RETRY; + goto cleanup; + } + } + else { + ret = flb_http_do_with_oauth2(c, &b_sent, ctx->oauth2_ctx); + } if (ret == 0) { /* @@ -406,6 +443,10 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx, flb_free(final_body); } + if (meta_token) { + flb_sds_destroy(meta_token); + } + /* Destroy HTTP client context */ flb_http_client_destroy(c); @@ -422,6 +463,7 @@ int opentelemetry_post(struct opentelemetry_context *ctx, const char *grpc_uri) { flb_sds_t oauth2_token; + flb_sds_t oauth2_token_view; const char *compression_algorithm; uint32_t wire_message_length; size_t grpc_body_length; @@ -433,6 +475,7 @@ int opentelemetry_post(struct opentelemetry_context *ctx, int result; oauth2_token = NULL; + oauth2_token_view = NULL; if (!ctx->enable_http2_flag) { return opentelemetry_legacy_post(ctx, @@ -556,18 +599,37 @@ int opentelemetry_post(struct opentelemetry_context *ctx, } if (ctx->oauth2_ctx != NULL && ctx->oauth2_config.enabled == FLB_TRUE) { + if (ctx->metadata_token_url) { + pthread_mutex_lock(&ctx->metadata_mutex); + } result = flb_oauth2_get_access_token(ctx->oauth2_ctx, - &oauth2_token, + &oauth2_token_view, FLB_FALSE); - if (result != 0 || oauth2_token == NULL) { + if (result != 0 || oauth2_token_view == NULL) { + if (ctx->metadata_token_url) { + pthread_mutex_unlock(&ctx->metadata_mutex); + } flb_plg_error(ctx->ins, "failed to obtain oauth2 access token"); flb_http_client_request_destroy(request, FLB_TRUE); return FLB_RETRY; } + oauth2_token = flb_sds_create(oauth2_token_view); + if (ctx->metadata_token_url) { + pthread_mutex_unlock(&ctx->metadata_mutex); + } + if (!oauth2_token) { + flb_plg_error(ctx->ins, "failed to copy oauth2 access token"); + flb_http_client_request_destroy(request, FLB_TRUE); + + return FLB_RETRY; + } + result = flb_http_request_set_parameters(request, FLB_HTTP_CLIENT_ARGUMENT_BEARER_TOKEN(oauth2_token)); + flb_sds_destroy(oauth2_token); + oauth2_token = NULL; if (result != 0) { flb_plg_error(ctx->ins, "error setting oauth2 authorization data"); @@ -1041,6 +1103,14 @@ static void cb_opentelemetry_flush(struct flb_event_chunk *event_chunk, struct flb_config *config) { int result = FLB_RETRY; + struct opentelemetry_context *ctx = out_context; + + /* Refresh the metadata token before dispatching; no-op when not configured */ + if (ctx->metadata_token_url) { + if (flb_otel_metadata_token_refresh(ctx) != 0) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + } if (event_chunk->type == FLB_INPUT_METRICS){ result = process_metrics(event_chunk, out_flush, ins, out_context, config); @@ -1279,6 +1349,37 @@ static struct flb_config_map config_map[] = { }, + /* + * Metadata token auth + * ------------------- + */ + { + FLB_CONFIG_MAP_STR, "metadata_token_url", NULL, + 0, FLB_TRUE, offsetof(struct opentelemetry_context, metadata_token_url), + "URL to fetch the IAM token from a cloud metadata endpoint via HTTP GET" + }, + { + FLB_CONFIG_MAP_STR, "metadata_token_header", NULL, + 0, FLB_TRUE, offsetof(struct opentelemetry_context, metadata_token_header), + "Optional HTTP header to include in the metadata token request " + "(e.g. \"Metadata-Flavor: Google\")" + }, + { + FLB_CONFIG_MAP_INT, "metadata_token_refresh", "3600", + 0, FLB_TRUE, offsetof(struct opentelemetry_context, metadata_token_refresh), + "Maximum token refresh interval in seconds (default: 3600, must be > 60)" + }, + { + FLB_CONFIG_MAP_STR, "metadata_token_scope", NULL, + 0, FLB_TRUE, offsetof(struct opentelemetry_context, metadata_token_scope), + "Scope value appended as ?scopes= to metadata token GET request" + }, + { + FLB_CONFIG_MAP_STR, "metadata_token_audience", NULL, + 0, FLB_TRUE, offsetof(struct opentelemetry_context, metadata_token_audience), + "Audience value appended as ?audience= to metadata token GET request" + }, + /* EOF */ {0} }; diff --git a/plugins/out_opentelemetry/opentelemetry.h b/plugins/out_opentelemetry/opentelemetry.h index b45b97c92e6..d372902f6eb 100644 --- a/plugins/out_opentelemetry/opentelemetry.h +++ b/plugins/out_opentelemetry/opentelemetry.h @@ -20,6 +20,8 @@ #ifndef FLB_OUT_OPENTELEMETRY_H #define FLB_OUT_OPENTELEMETRY_H +#include + #include #include #include @@ -64,6 +66,17 @@ struct opentelemetry_context { struct flb_oauth2 *oauth2_ctx; const char *oauth2_auth_method; + /* Metadata token Auth */ + const char *metadata_token_url; + const char *metadata_token_header; + int metadata_token_refresh; + const char *metadata_token_scope; /* appended as ?scopes= */ + const char *metadata_token_audience; /* appended as ?audience= */ + flb_sds_t metadata_token_path; + struct flb_upstream *metadata_u; + pthread_mutex_t metadata_mutex; + int metadata_mutex_initialized; + /* AWS Auth */ #ifdef FLB_HAVE_SIGNV4 #ifdef FLB_HAVE_AWS @@ -97,6 +110,7 @@ struct opentelemetry_context { /* HTTP client */ struct flb_http_client_ng http_client; + int http_client_initialized; /* record metadata parsing */ flb_sds_t logs_metadata_key; diff --git a/plugins/out_opentelemetry/opentelemetry_conf.c b/plugins/out_opentelemetry/opentelemetry_conf.c index 8535110557e..1f980312d41 100644 --- a/plugins/out_opentelemetry/opentelemetry_conf.c +++ b/plugins/out_opentelemetry/opentelemetry_conf.c @@ -32,6 +32,7 @@ #include "opentelemetry.h" #include "opentelemetry_conf.h" +#include "opentelemetry_metadata.h" /* create a single entry of log_body_key */ static int log_body_key_create(struct opentelemetry_context *ctx, char *ra_pattern) @@ -303,6 +304,14 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output } } + /* metadata_token_url and standard OAuth2 are mutually exclusive */ + if (ctx->metadata_token_url && ctx->oauth2_config.enabled == FLB_TRUE) { + flb_plg_error(ins, + "metadata_token_url and oauth2 cannot be used at the same time"); + flb_opentelemetry_context_destroy(ctx); + return NULL; + } + if (ctx->max_resources < 0) { flb_plg_error(ins, "max_resources must be greater than or equal to zero"); flb_opentelemetry_context_destroy(ctx); @@ -463,6 +472,45 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output } } + /* metadata token mode: create a minimal oauth2 context for Bearer injection */ + if (ctx->metadata_token_url) { + /* + * metadata_token_refresh must exceed FLB_OAUTH2_DEFAULT_SKEW_SECS. + * Lower values cause the oauth2 layer to treat every freshly-fetched + * token as expired, triggering a POST refresh that always fails - + * no data is ever delivered. + */ + if (ctx->metadata_token_refresh <= FLB_OAUTH2_DEFAULT_SKEW_SECS) { + flb_plg_error(ctx->ins, + "metadata_token_refresh must be > %d seconds", + FLB_OAUTH2_DEFAULT_SKEW_SECS); + flb_opentelemetry_context_destroy(ctx); + return NULL; + } + + ctx->oauth2_ctx = flb_oauth2_create(config, + ctx->metadata_token_url, + ctx->metadata_token_refresh); + if (!ctx->oauth2_ctx) { + flb_plg_error(ctx->ins, + "failed to create oauth2 context for metadata token auth"); + flb_opentelemetry_context_destroy(ctx); + return NULL; + } + ctx->oauth2_config.enabled = FLB_TRUE; + /* + * oauth2_apply_defaults() hard-sets cfg.enabled = FLB_FALSE; override it + * so that flb_oauth2_get_access_token() returns the pre-fetched token + * instead of bailing out immediately with -1. + */ + ctx->oauth2_ctx->cfg.enabled = FLB_TRUE; + + if (flb_otel_metadata_token_create(ctx, config) != 0) { + flb_opentelemetry_context_destroy(ctx); + return NULL; + } + } + ctx->logs_uri_sanitized = sanitize_uri(ctx->logs_uri); ctx->traces_uri_sanitized = sanitize_uri(ctx->traces_uri); ctx->metrics_uri_sanitized = sanitize_uri(ctx->metrics_uri); @@ -758,6 +806,9 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output ctx = NULL; } + else { + ctx->http_client_initialized = FLB_TRUE; + } return ctx; } @@ -768,7 +819,9 @@ void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx) return; } - flb_http_client_ng_destroy(&ctx->http_client); + if (ctx->http_client_initialized) { + flb_http_client_ng_destroy(&ctx->http_client); + } flb_kv_release(&ctx->kv_labels); @@ -903,10 +956,11 @@ void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx) #endif #endif + flb_otel_metadata_token_destroy(ctx); + if (ctx->oauth2_ctx) { flb_oauth2_destroy(ctx->oauth2_ctx); } - flb_oauth2_config_destroy(&ctx->oauth2_config); flb_free(ctx->proxy_host); flb_free(ctx); diff --git a/plugins/out_opentelemetry/opentelemetry_metadata.c b/plugins/out_opentelemetry/opentelemetry_metadata.c new file mode 100644 index 00000000000..c2d7819f399 --- /dev/null +++ b/plugins/out_opentelemetry/opentelemetry_metadata.c @@ -0,0 +1,463 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2026 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "opentelemetry.h" +#include "opentelemetry_metadata.h" + +/* Maximum size of the token JSON response body */ +#define FLB_OTEL_METADATA_TOKEN_SIZE_MAX 16384 + +/* + * Parse access_token and expires_in from a metadata token JSON response. + * Sets *expires_in_out to 0 when the field is absent; caller must apply a + * default. Returns 0 on success, -1 on invalid JSON or missing access_token. + */ +static int metadata_parse_token_json(const char *json_data, + size_t json_size, + flb_sds_t *access_token_out, + uint64_t *expires_in_out) +{ + int i; + int ret; + int key_len; + int val_len; + const char *key; + const char *val; + jsmn_parser parser; + jsmntok_t *t; + jsmntok_t *tokens; + char tmp_num[32]; + unsigned long long parsed_val; + char *end; + flb_sds_t access_token = NULL; + uint64_t expires_in = 0; + int tokens_size = 32; + + jsmn_init(&parser); + tokens = flb_calloc(1, sizeof(jsmntok_t) * tokens_size); + if (!tokens) { + flb_errno(); + return -1; + } + + ret = jsmn_parse(&parser, json_data, json_size, tokens, tokens_size); + if (ret <= 0 || tokens[0].type != JSMN_OBJECT) { + flb_free(tokens); + return -1; + } + + for (i = 1; i < ret; i++) { + t = &tokens[i]; + + if (t->type != JSMN_STRING || t->start == -1 || t->end == -1) { + continue; + } + + key = json_data + t->start; + key_len = t->end - t->start; + + if (i + 1 >= ret) { + break; + } + + i++; + t = &tokens[i]; + val = json_data + t->start; + val_len = t->end - t->start; + + if (key_len == 12 && strncmp(key, "access_token", 12) == 0) { + if (access_token) { + flb_sds_destroy(access_token); + } + access_token = flb_sds_create_len(val, val_len); + if (!access_token) { + flb_free(tokens); + return -1; + } + } + else if (key_len == 10 && strncmp(key, "expires_in", 10) == 0) { + if (val_len <= 0 || val_len >= (int) sizeof(tmp_num)) { + continue; + } + strncpy(tmp_num, val, val_len); + tmp_num[val_len] = '\0'; + if (tmp_num[0] == '-') { + continue; + } + errno = 0; + parsed_val = strtoull(tmp_num, &end, 10); + if (errno == 0 && end != tmp_num && *end == '\0') { + expires_in = (uint64_t) parsed_val; + } + } + } + + flb_free(tokens); + + if (!access_token) { + return -1; + } + + *access_token_out = access_token; + *expires_in_out = expires_in; + return 0; +} + +int flb_otel_metadata_token_create(struct opentelemetry_context *ctx, + struct flb_config *config) +{ + int ret; + char *protocol = NULL; + char *host = NULL; + char *port = NULL; + char *uri = NULL; + const char *sep; + flb_sds_t tmp; + + ret = flb_utils_url_split(ctx->metadata_token_url, + &protocol, &host, &port, &uri); + if (ret != 0) { + flb_plg_error(ctx->ins, "metadata: failed to parse URL '%s'", + ctx->metadata_token_url); + return -1; + } + + /* Only plain HTTP is supported (metadata endpoint is link-local) */ + if (!protocol || strcasecmp(protocol, "http") != 0) { + flb_plg_error(ctx->ins, + "metadata_token_url only supports http:// URLs"); + flb_free(protocol); + flb_free(host); + flb_free(port); + flb_free(uri); + return -1; + } + + ctx->metadata_token_path = flb_sds_create(uri ? uri : "/"); + + flb_free(protocol); + flb_free(host); + flb_free(port); + flb_free(uri); + + if (!ctx->metadata_token_path) { + return -1; + } + + /* Append optional scope/audience query parameters */ + if ((ctx->metadata_token_scope && ctx->metadata_token_scope[0] != '\0') || + (ctx->metadata_token_audience && ctx->metadata_token_audience[0] != '\0')) { + sep = strchr(ctx->metadata_token_path, '?') ? "&" : "?"; + + if (ctx->metadata_token_scope && ctx->metadata_token_scope[0] != '\0') { + tmp = flb_sds_cat(ctx->metadata_token_path, + sep, 1); + if (!tmp) { + return -1; + } + ctx->metadata_token_path = tmp; + + tmp = flb_sds_cat(ctx->metadata_token_path, + "scopes=", sizeof("scopes=") - 1); + if (!tmp) { + return -1; + } + ctx->metadata_token_path = tmp; + + tmp = flb_sds_cat(ctx->metadata_token_path, + ctx->metadata_token_scope, + strlen(ctx->metadata_token_scope)); + if (!tmp) { + return -1; + } + ctx->metadata_token_path = tmp; + sep = "&"; + } + + if (ctx->metadata_token_audience && ctx->metadata_token_audience[0] != '\0') { + tmp = flb_sds_cat(ctx->metadata_token_path, + sep, 1); + if (!tmp) { + return -1; + } + ctx->metadata_token_path = tmp; + + tmp = flb_sds_cat(ctx->metadata_token_path, + "audience=", sizeof("audience=") - 1); + if (!tmp) { + return -1; + } + ctx->metadata_token_path = tmp; + + tmp = flb_sds_cat(ctx->metadata_token_path, + ctx->metadata_token_audience, + strlen(ctx->metadata_token_audience)); + if (!tmp) { + return -1; + } + ctx->metadata_token_path = tmp; + } + } + + /* Create synchronous upstream for the metadata endpoint */ + ctx->metadata_u = flb_upstream_create_url(config, ctx->metadata_token_url, + FLB_IO_TCP, NULL); + if (!ctx->metadata_u) { + flb_plg_error(ctx->ins, "metadata: failed to create upstream"); + flb_sds_destroy(ctx->metadata_token_path); + ctx->metadata_token_path = NULL; + return -1; + } + + flb_stream_disable_async_mode(&ctx->metadata_u->base); + + ret = pthread_mutex_init(&ctx->metadata_mutex, NULL); + if (ret != 0) { + flb_plg_error(ctx->ins, "metadata: failed to init mutex"); + flb_upstream_destroy(ctx->metadata_u); + ctx->metadata_u = NULL; + flb_sds_destroy(ctx->metadata_token_path); + ctx->metadata_token_path = NULL; + return -1; + } + ctx->metadata_mutex_initialized = FLB_TRUE; + + return 0; +} + +int flb_otel_metadata_token_refresh(struct opentelemetry_context *ctx) +{ + int ret; + size_t b_sent; + struct flb_connection *conn; + struct flb_http_client *c; + const char *sep; + time_t now; + time_t effective_ttl; + flb_sds_t payload; + flb_sds_t new_token = NULL; + flb_sds_t new_token_type = NULL; + uint64_t raw_expires_in; + size_t name_len; + size_t val_len; + + if (!ctx->metadata_token_url) { + return 0; + } + + pthread_mutex_lock(&ctx->metadata_mutex); + + /* Check expiry under both locks (metadata_mutex + oauth2 lock) */ + ret = flb_lock_acquire(&ctx->oauth2_ctx->lock, + FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + if (ret != 0) { + flb_plg_error(ctx->ins, "metadata: failed to acquire oauth2 lock"); + pthread_mutex_unlock(&ctx->metadata_mutex); + return -1; + } + now = time(NULL); + if (ctx->oauth2_ctx->expires_at > 0 && + now < ctx->oauth2_ctx->expires_at - FLB_OAUTH2_DEFAULT_SKEW_SECS) { + flb_lock_release(&ctx->oauth2_ctx->lock, + FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + flb_plg_debug(ctx->ins, "metadata: token still valid, skipping refresh"); + pthread_mutex_unlock(&ctx->metadata_mutex); + return 0; + } + flb_lock_release(&ctx->oauth2_ctx->lock, + FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + + conn = flb_upstream_conn_get(ctx->metadata_u); + if (!conn) { + flb_plg_error(ctx->ins, "metadata: failed to connect to endpoint"); + pthread_mutex_unlock(&ctx->metadata_mutex); + return -1; + } + + c = flb_http_client(conn, FLB_HTTP_GET, ctx->metadata_token_path, + NULL, 0, NULL, 0, NULL, 0); + if (!c) { + flb_upstream_conn_release(conn); + pthread_mutex_unlock(&ctx->metadata_mutex); + return -1; + } + + flb_http_buffer_size(c, FLB_OTEL_METADATA_TOKEN_SIZE_MAX); + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + + /* Add optional custom header (e.g. "Metadata-Flavor: Google") */ + if (ctx->metadata_token_header) { + sep = strstr(ctx->metadata_token_header, ": "); + if (sep) { + name_len = (size_t)(sep - ctx->metadata_token_header); + val_len = strlen(sep + 2); + flb_http_add_header(c, + ctx->metadata_token_header, name_len, + sep + 2, val_len); + } + else { + flb_plg_warn(ctx->ins, + "metadata: metadata_token_header '%s' is not in " + "'Name: Value' format; header will not be sent", + ctx->metadata_token_header); + } + } + + ret = flb_http_do(c, &b_sent); + if (ret != 0 || c->resp.status != 200) { + if (ret != 0) { + flb_plg_warn(ctx->ins, + "metadata: HTTP GET failed (ret=%d)", ret); + } + else { + flb_plg_warn(ctx->ins, + "metadata: HTTP GET returned status=%d", + c->resp.status); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(conn); + pthread_mutex_unlock(&ctx->metadata_mutex); + return -1; + } + + payload = flb_sds_create_len(c->resp.payload, c->resp.payload_size); + flb_http_client_destroy(c); + flb_upstream_conn_release(conn); + + if (!payload) { + pthread_mutex_unlock(&ctx->metadata_mutex); + return -1; + } + + /* Parse token JSON; raw_expires_in is 0 when the field is absent */ + ret = metadata_parse_token_json(payload, flb_sds_len(payload), + &new_token, &raw_expires_in); + flb_sds_destroy(payload); + + if (ret != 0) { + flb_plg_error(ctx->ins, "metadata: failed to parse token JSON response"); + pthread_mutex_unlock(&ctx->metadata_mutex); + return -1; + } + + if (raw_expires_in == 0) { + effective_ttl = (time_t) FLB_OAUTH2_DEFAULT_EXPIRES; + } + else { + effective_ttl = (time_t) raw_expires_in; + } + + /* Cap the effective TTL to metadata_token_refresh if configured */ + if (ctx->metadata_token_refresh > 0 && + ctx->metadata_token_refresh < (int) effective_ttl) { + effective_ttl = (time_t) ctx->metadata_token_refresh; + } + + /* Clamp TTL above skew to prevent oauth2 POST refresh (no credentials) */ + if (effective_ttl <= FLB_OAUTH2_DEFAULT_SKEW_SECS) { + flb_plg_warn(ctx->ins, + "metadata: server expires_in %llu is <= skew (%d); " + "clamping to %d", + (unsigned long long) raw_expires_in, + FLB_OAUTH2_DEFAULT_SKEW_SECS, + FLB_OAUTH2_DEFAULT_SKEW_SECS + 1); + effective_ttl = FLB_OAUTH2_DEFAULT_SKEW_SECS + 1; + } + + new_token_type = flb_sds_create("Bearer"); + if (!new_token_type) { + flb_sds_destroy(new_token); + pthread_mutex_unlock(&ctx->metadata_mutex); + return -1; + } + + /* Update the oauth2 context under its lock */ + ret = flb_lock_acquire(&ctx->oauth2_ctx->lock, + FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + if (ret != 0) { + flb_plg_error(ctx->ins, "metadata: failed to acquire oauth2 lock"); + flb_sds_destroy(new_token); + flb_sds_destroy(new_token_type); + pthread_mutex_unlock(&ctx->metadata_mutex); + return -1; + } + + if (ctx->oauth2_ctx->access_token) { + flb_sds_destroy(ctx->oauth2_ctx->access_token); + } + if (ctx->oauth2_ctx->token_type) { + flb_sds_destroy(ctx->oauth2_ctx->token_type); + } + ctx->oauth2_ctx->access_token = new_token; + ctx->oauth2_ctx->token_type = new_token_type; + ctx->oauth2_ctx->expires_in = (uint64_t) effective_ttl; + ctx->oauth2_ctx->expires_at = time(NULL) + effective_ttl; + + flb_lock_release(&ctx->oauth2_ctx->lock, + FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + + flb_plg_debug(ctx->ins, + "metadata: token refreshed, expires in %ld seconds", + (long) effective_ttl); + pthread_mutex_unlock(&ctx->metadata_mutex); + + return 0; +} + +void flb_otel_metadata_token_destroy(struct opentelemetry_context *ctx) +{ + if (!ctx) { + return; + } + + if (ctx->metadata_mutex_initialized) { + pthread_mutex_destroy(&ctx->metadata_mutex); + ctx->metadata_mutex_initialized = FLB_FALSE; + } + + if (ctx->metadata_u) { + flb_upstream_destroy(ctx->metadata_u); + ctx->metadata_u = NULL; + } + + if (ctx->metadata_token_path) { + flb_sds_destroy(ctx->metadata_token_path); + ctx->metadata_token_path = NULL; + } +} diff --git a/plugins/out_opentelemetry/opentelemetry_metadata.h b/plugins/out_opentelemetry/opentelemetry_metadata.h new file mode 100644 index 00000000000..23a8f3fe30b --- /dev/null +++ b/plugins/out_opentelemetry/opentelemetry_metadata.h @@ -0,0 +1,32 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2026 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_OPENTELEMETRY_METADATA_H +#define FLB_OUT_OPENTELEMETRY_METADATA_H + +#include + +struct opentelemetry_context; + +int flb_otel_metadata_token_create(struct opentelemetry_context *ctx, + struct flb_config *config); +int flb_otel_metadata_token_refresh(struct opentelemetry_context *ctx); +void flb_otel_metadata_token_destroy(struct opentelemetry_context *ctx); + +#endif /* FLB_OUT_OPENTELEMETRY_METADATA_H */ diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index 7bc6a7d7153..831439a73c0 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -260,6 +260,7 @@ if(FLB_IN_LIB) FLB_RT_TEST(FLB_OUT_TD "out_td.c") FLB_RT_TEST(FLB_OUT_INFLUXDB "out_influxdb.c") FLB_RT_TEST(FLB_OUT_CHRONICLE "out_chronicle.c") + FLB_RT_TEST(FLB_OUT_OPENTELEMETRY "out_opentelemetry.c") endif() diff --git a/tests/runtime/out_opentelemetry.c b/tests/runtime/out_opentelemetry.c new file mode 100644 index 00000000000..0ad925acae6 --- /dev/null +++ b/tests/runtime/out_opentelemetry.c @@ -0,0 +1,1801 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2026 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include "flb_tests_runtime.h" +#include "../../plugins/out_opentelemetry/opentelemetry.h" + +/* Test function declarations */ +void flb_test_otel_default_config(void); +void flb_test_metadata_token_url_sets_context(void); +void flb_test_metadata_token_default_refresh(void); +void flb_test_metadata_token_custom_refresh(void); +void flb_test_metadata_token_mutual_exclusion(void); +void flb_test_metadata_token_https_rejected(void); +void flb_test_metadata_token_low_refresh_rejected(void); +void flb_test_no_metadata_token_backward_compat(void); +void flb_test_metadata_token_fetch_on_first_flush(void); +void flb_test_metadata_token_refresh_on_expiry(void); +void flb_test_metadata_token_custom_header(void); +void flb_test_metadata_token_fetch_failure(void); +void flb_test_metadata_token_legacy_post(void); +void flb_test_metadata_token_401_recovery(void); +void flb_test_metadata_token_refresh_interval_override(void); +void flb_test_metadata_token_missing_expires_in(void); +void flb_test_metadata_token_short_expires_in(void); +void flb_test_metadata_token_scope_query_param(void); +void flb_test_metadata_token_audience_query_param(void); +void flb_test_metadata_token_both_query_params(void); +void flb_test_metadata_token_scope_without_url_ignored(void); +void flb_test_metadata_token_scope_url_with_existing_query(void); +void flb_test_metadata_token_audience_url_with_existing_query(void); +void flb_test_metadata_token_empty_scope_ignored(void); +void flb_test_metadata_token_empty_audience_ignored(void); + +/* Test list */ +TEST_LIST = { + {"default_config", flb_test_otel_default_config}, + {"metadata_token_url_sets_context", flb_test_metadata_token_url_sets_context}, + {"metadata_token_default_refresh", flb_test_metadata_token_default_refresh}, + {"metadata_token_custom_refresh", flb_test_metadata_token_custom_refresh}, + {"metadata_token_mutual_exclusion", flb_test_metadata_token_mutual_exclusion}, + {"metadata_token_https_rejected", flb_test_metadata_token_https_rejected}, + {"metadata_token_low_refresh_rejected", + flb_test_metadata_token_low_refresh_rejected}, + {"no_metadata_token_backward_compat", flb_test_no_metadata_token_backward_compat}, + {"metadata_token_fetch_on_first_flush", flb_test_metadata_token_fetch_on_first_flush}, + {"metadata_token_refresh_on_expiry", flb_test_metadata_token_refresh_on_expiry}, + {"metadata_token_custom_header", flb_test_metadata_token_custom_header}, + {"metadata_token_fetch_failure", flb_test_metadata_token_fetch_failure}, + {"metadata_token_legacy_post", flb_test_metadata_token_legacy_post}, + {"metadata_token_401_recovery", flb_test_metadata_token_401_recovery}, + {"metadata_token_refresh_interval_override", + flb_test_metadata_token_refresh_interval_override}, + {"metadata_token_missing_expires_in", + flb_test_metadata_token_missing_expires_in}, + {"metadata_token_short_expires_in", + flb_test_metadata_token_short_expires_in}, + {"metadata_token_scope_query_param", + flb_test_metadata_token_scope_query_param}, + {"metadata_token_audience_query_param", + flb_test_metadata_token_audience_query_param}, + {"metadata_token_both_query_params", + flb_test_metadata_token_both_query_params}, + {"metadata_token_scope_without_url_ignored", + flb_test_metadata_token_scope_without_url_ignored}, + {"metadata_token_scope_url_with_existing_query", + flb_test_metadata_token_scope_url_with_existing_query}, + {"metadata_token_audience_url_with_existing_query", + flb_test_metadata_token_audience_url_with_existing_query}, + {"metadata_token_empty_scope_ignored", + flb_test_metadata_token_empty_scope_ignored}, + {"metadata_token_empty_audience_ignored", + flb_test_metadata_token_empty_audience_ignored}, + {NULL, NULL} +}; + +/* Helper: return the opentelemetry plugin context from a running output instance. */ +static struct opentelemetry_context *get_otel_ctx(flb_ctx_t *ctx, int out_ffd) +{ + struct flb_output_instance *ins; + + ins = flb_output_get_instance(ctx->config, out_ffd); + if (!ins) { + return NULL; + } + return (struct opentelemetry_context *) ins->context; +} + +void flb_test_otel_default_config(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_metadata_token_url_sets_context(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + "metadata_token_url", "http://169.254.169.254/metadata/token", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + TEST_CHECK(otel_ctx->oauth2_ctx != NULL); + TEST_CHECK(otel_ctx->oauth2_config.enabled == FLB_TRUE); + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_metadata_token_default_refresh(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + "metadata_token_url", "http://169.254.169.254/metadata/token", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + TEST_CHECK(otel_ctx->metadata_token_refresh == 3600); + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_metadata_token_custom_refresh(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + "metadata_token_url", "http://169.254.169.254/metadata/token", + "metadata_token_refresh", "1800", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + TEST_CHECK(otel_ctx->metadata_token_refresh == 1800); + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_metadata_token_mutual_exclusion(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + "metadata_token_url", "http://169.254.169.254/metadata/token", + "oauth2.enable", "true", + "oauth2.token_url", "http://localhost:19999/token", + "oauth2.client_id", "test-client", + "oauth2.client_secret", "test-secret", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret != 0); + + flb_destroy(ctx); +} + +void flb_test_metadata_token_https_rejected(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + "metadata_token_url", "https://169.254.169.254/metadata/token", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret != 0); + + flb_destroy(ctx); +} + +void flb_test_metadata_token_low_refresh_rejected(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + "metadata_token_url", "http://169.254.169.254/metadata/token", + "metadata_token_refresh", "60", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret != 0); + + flb_destroy(ctx); +} + +void flb_test_no_metadata_token_backward_compat(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + TEST_CHECK(otel_ctx->metadata_token_url == NULL); + TEST_CHECK(otel_ctx->oauth2_ctx == NULL); + + flb_stop(ctx); + flb_destroy(ctx); +} + +/* Port for the mock metadata HTTP server used in Task 4 tests. */ +#define MOCK_METADATA_PORT 18901 + +/* + * JSON responses returned by the mock metadata endpoint. + * The short-expiry variant is used to force a token refresh in tests. + */ +#define MOCK_TOKEN_RESPONSE \ + "{\"access_token\":\"test-token-123\"," \ + "\"token_type\":\"Bearer\"," \ + "\"expires_in\":3600}" + +#define MOCK_TOKEN_SHORT_EXPIRY \ + "{\"access_token\":\"test-token-123\"," \ + "\"token_type\":\"Bearer\"," \ + "\"expires_in\":1}" + +/* Shared state updated by the mock server callback. */ +static pthread_mutex_t g_meta_lock = PTHREAD_MUTEX_INITIALIZER; +static int g_meta_calls = 0; /* times the endpoint was hit */ +static int g_short_expiry = 0; /* if set, return expires_in:1 */ + +static void meta_state_reset(void) +{ + pthread_mutex_lock(&g_meta_lock); + g_meta_calls = 0; + g_short_expiry = 0; + pthread_mutex_unlock(&g_meta_lock); +} + +/* Monkey server callback for the mock metadata endpoint. */ +static void cb_mock_metadata(mk_request_t *request, void *data) +{ + const char *resp; + (void) data; + (void) request; + + pthread_mutex_lock(&g_meta_lock); + g_meta_calls++; + resp = g_short_expiry ? MOCK_TOKEN_SHORT_EXPIRY : MOCK_TOKEN_RESPONSE; + pthread_mutex_unlock(&g_meta_lock); + + mk_http_status(request, 200); + mk_http_header(request, "Content-Type", 12, "application/json", 16); + mk_http_send(request, (char *) resp, strlen(resp), NULL); + mk_http_done(request); +} + +/* Start a Monkey HTTP server at 127.0.0.1:port serving the mock metadata JSON. */ +static mk_ctx_t *mock_meta_start(int port) +{ + char addr[32]; + mk_ctx_t *mk; + int vid; + + mk = mk_create(); + if (!mk) { + return NULL; + } + + snprintf(addr, sizeof(addr), "127.0.0.1:%d", port); + mk_config_set(mk, "Listen", addr, NULL); + + vid = mk_vhost_create(mk, NULL); + mk_vhost_set(mk, vid, "Name", "mock-metadata", NULL); + mk_vhost_handler(mk, vid, "/", cb_mock_metadata, NULL); + + mk_start(mk); + return mk; +} + +/* Stop and destroy a Monkey server started by mock_meta_start(). */ +static void mock_meta_stop(mk_ctx_t *mk) +{ + if (mk) { + mk_stop(mk); + mk_destroy(mk); + } +} + +/* + * Variant mock server for flb_test_metadata_token_custom_header. + * This callback verifies that the client sent the expected custom header + * ("Metadata-Flavor: Google") in the metadata GET request. + */ +#define MOCK_METADATA_PORT_CH 18904 + +static pthread_mutex_t g_ch_lock = PTHREAD_MUTEX_INITIALIZER; +static int g_ch_header_seen = 0; + +static void ch_state_reset(void) +{ + pthread_mutex_lock(&g_ch_lock); + g_ch_header_seen = 0; + pthread_mutex_unlock(&g_ch_lock); +} + +static void cb_mock_metadata_ch(mk_request_t *request, void *data) +{ + struct mk_http_header *hdr; + (void) data; + + /* "Metadata-Flavor" is a custom (non-standard) header; use MK_HEADER_OTHER + * to search the extra-headers array populated by the parser. + * The parser lowercases all custom header keys, so search with lowercase. */ + hdr = mk_http_header_get(MK_HEADER_OTHER, request, "metadata-flavor", 15); + if (hdr != NULL && hdr->val.data != NULL && + hdr->val.len >= 6 && + strncmp(hdr->val.data, "Google", 6) == 0) { + pthread_mutex_lock(&g_ch_lock); + g_ch_header_seen = 1; + pthread_mutex_unlock(&g_ch_lock); + } + + mk_http_status(request, 200); + mk_http_header(request, "Content-Type", 12, "application/json", 16); + mk_http_send(request, (char *) MOCK_TOKEN_RESPONSE, + strlen(MOCK_TOKEN_RESPONSE), NULL); + mk_http_done(request); +} + +static mk_ctx_t *mock_meta_start_ch(int port) +{ + char addr[32]; + mk_ctx_t *mk; + int vid; + + mk = mk_create(); + if (!mk) { + return NULL; + } + + snprintf(addr, sizeof(addr), "127.0.0.1:%d", port); + mk_config_set(mk, "Listen", addr, NULL); + + vid = mk_vhost_create(mk, NULL); + mk_vhost_set(mk, vid, "Name", "mock-metadata-ch", NULL); + mk_vhost_handler(mk, vid, "/", cb_mock_metadata_ch, NULL); + + mk_start(mk); + return mk; +} + +void flb_test_metadata_token_fetch_on_first_flush(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + char metadata_url[128]; + mk_ctx_t *mk; + int calls; + + meta_state_reset(); + + mk = mock_meta_start(MOCK_METADATA_PORT); + TEST_CHECK(mk != NULL); + + snprintf(metadata_url, sizeof(metadata_url), + "http://127.0.0.1:%d/metadata/token", MOCK_METADATA_PORT); + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "0.5", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "19998", + "metadata_token_url", metadata_url, + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_lib_push(ctx, in_ffd, "[0, {\"msg\": \"hello\"}]", 21); + sleep(2); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + TEST_CHECK(otel_ctx->oauth2_ctx != NULL); + + pthread_mutex_lock(&g_meta_lock); + calls = g_meta_calls; + pthread_mutex_unlock(&g_meta_lock); + TEST_CHECK(calls >= 1); + + TEST_CHECK(otel_ctx->oauth2_ctx != NULL); + if (otel_ctx->oauth2_ctx) { + TEST_CHECK(otel_ctx->oauth2_ctx->access_token != NULL); + if (otel_ctx->oauth2_ctx->access_token) { + TEST_CHECK(strcmp(otel_ctx->oauth2_ctx->access_token, + "test-token-123") == 0); + } + } + + flb_stop(ctx); + flb_destroy(ctx); + mock_meta_stop(mk); +} + +void flb_test_metadata_token_refresh_on_expiry(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char metadata_url[128]; + mk_ctx_t *mk; + int calls_after_first; + int calls_after_second; + + meta_state_reset(); + + pthread_mutex_lock(&g_meta_lock); + g_short_expiry = 1; + pthread_mutex_unlock(&g_meta_lock); + + mk = mock_meta_start(MOCK_METADATA_PORT); + TEST_CHECK(mk != NULL); + + snprintf(metadata_url, sizeof(metadata_url), + "http://127.0.0.1:%d/metadata/token", MOCK_METADATA_PORT); + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "0.5", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "19998", + "metadata_token_url", metadata_url, + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* First flush: fetch the short-lived token. */ + flb_lib_push(ctx, in_ffd, "[0, {\"msg\": \"first\"}]", 21); + sleep(2); + + pthread_mutex_lock(&g_meta_lock); + calls_after_first = g_meta_calls; + pthread_mutex_unlock(&g_meta_lock); + TEST_CHECK(calls_after_first >= 1); + + /* Wait for the 1-second token to expire, then trigger another flush. */ + sleep(2); + flb_lib_push(ctx, in_ffd, "[0, {\"msg\": \"second\"}]", 22); + sleep(2); + + pthread_mutex_lock(&g_meta_lock); + calls_after_second = g_meta_calls; + pthread_mutex_unlock(&g_meta_lock); + + TEST_CHECK(calls_after_second > calls_after_first); + + flb_stop(ctx); + flb_destroy(ctx); + mock_meta_stop(mk); +} + +void flb_test_metadata_token_custom_header(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + char metadata_url[128]; + mk_ctx_t *mk; + int header_seen; + + ch_state_reset(); + + mk = mock_meta_start_ch(MOCK_METADATA_PORT_CH); + TEST_CHECK(mk != NULL); + + snprintf(metadata_url, sizeof(metadata_url), + "http://127.0.0.1:%d/metadata/token", MOCK_METADATA_PORT_CH); + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "0.5", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "19998", + "metadata_token_url", metadata_url, + "metadata_token_header", "Metadata-Flavor: Google", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + + TEST_CHECK(otel_ctx->metadata_token_header != NULL); + if (otel_ctx->metadata_token_header) { + TEST_CHECK(strcmp(otel_ctx->metadata_token_header, + "Metadata-Flavor: Google") == 0); + } + + flb_lib_push(ctx, in_ffd, "[0, {\"msg\": \"hello\"}]", 21); + sleep(2); + + pthread_mutex_lock(&g_ch_lock); + header_seen = g_ch_header_seen; + pthread_mutex_unlock(&g_ch_lock); + + TEST_CHECK(otel_ctx->oauth2_ctx != NULL); + if (otel_ctx->oauth2_ctx) { + TEST_CHECK(otel_ctx->oauth2_ctx->access_token != NULL); + } + + /* The mock must have received the Metadata-Flavor: Google header. */ + TEST_CHECK(header_seen == 1); + + flb_stop(ctx); + flb_destroy(ctx); + mock_meta_stop(mk); +} + +void flb_test_metadata_token_fetch_failure(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "0.5", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "19998", + "metadata_token_url", "http://127.0.0.1:19997/metadata/token", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_lib_push(ctx, in_ffd, "[0, {\"msg\": \"hello\"}]", 21); + sleep(2); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + TEST_CHECK(otel_ctx->oauth2_ctx != NULL); + TEST_CHECK(otel_ctx->oauth2_ctx->access_token == NULL); + + flb_stop(ctx); + flb_destroy(ctx); +} + +/* Separate port for Task 7 mock server to avoid conflicts with Task 4 tests. */ +#define MOCK_METADATA_PORT_T7 18902 + +/* Response type selectors for the Task 7 mock. */ +#define MOCK_T7_RESP_NORMAL 0 /* expires_in: 3600 */ +#define MOCK_T7_RESP_60S 1 /* expires_in: 60 */ +#define MOCK_T7_RESP_NO_EXPIRY 2 /* no expires_in field */ +#define MOCK_T7_RESP_120S 3 /* expires_in: 120 */ + +#define MOCK_TOKEN_60S_EXPIRY \ + "{\"access_token\":\"test-token-123\"," \ + "\"token_type\":\"Bearer\"," \ + "\"expires_in\":60}" + +#define MOCK_TOKEN_120S_EXPIRY \ + "{\"access_token\":\"test-token-123\"," \ + "\"token_type\":\"Bearer\"," \ + "\"expires_in\":120}" + +#define MOCK_TOKEN_NO_EXPIRY \ + "{\"access_token\":\"test-token-123\"," \ + "\"token_type\":\"Bearer\"}" + +static pthread_mutex_t g_t7_lock = PTHREAD_MUTEX_INITIALIZER; +static int g_t7_calls = 0; +static int g_t7_resp_type = MOCK_T7_RESP_NORMAL; + +static void t7_state_reset(void) +{ + pthread_mutex_lock(&g_t7_lock); + g_t7_calls = 0; + g_t7_resp_type = MOCK_T7_RESP_NORMAL; + pthread_mutex_unlock(&g_t7_lock); +} + +static void cb_mock_metadata_t7(mk_request_t *request, void *data) +{ + const char *resp; + (void) data; + (void) request; + + pthread_mutex_lock(&g_t7_lock); + g_t7_calls++; + switch (g_t7_resp_type) { + case MOCK_T7_RESP_60S: + resp = MOCK_TOKEN_60S_EXPIRY; + break; + case MOCK_T7_RESP_NO_EXPIRY: + resp = MOCK_TOKEN_NO_EXPIRY; + break; + case MOCK_T7_RESP_120S: + resp = MOCK_TOKEN_120S_EXPIRY; + break; + default: + resp = MOCK_TOKEN_RESPONSE; + break; + } + pthread_mutex_unlock(&g_t7_lock); + + mk_http_status(request, 200); + mk_http_header(request, "Content-Type", 12, "application/json", 16); + mk_http_send(request, (char *) resp, strlen(resp), NULL); + mk_http_done(request); +} + +static mk_ctx_t *mock_meta_start_t7(int port) +{ + char addr[32]; + mk_ctx_t *mk; + int vid; + + mk = mk_create(); + if (!mk) { + return NULL; + } + + snprintf(addr, sizeof(addr), "127.0.0.1:%d", port); + mk_config_set(mk, "Listen", addr, NULL); + + vid = mk_vhost_create(mk, NULL); + mk_vhost_set(mk, vid, "Name", "mock-metadata-t7", NULL); + mk_vhost_handler(mk, vid, "/", cb_mock_metadata_t7, NULL); + + mk_start(mk); + return mk; +} + +/* + * Mock OTel HTTP/1.1 destination server used by flb_test_metadata_token_legacy_post + * and flb_test_metadata_token_401_recovery. + * + * The callback records the Authorization header value from each POST request + * and can be configured to serve a configurable number of 401 responses before + * switching to 200, allowing tests to exercise the 401 recovery code path in + * opentelemetry.c. + */ +#define MOCK_OTEL_PORT 18903 + +static pthread_mutex_t g_otel_lock = PTHREAD_MUTEX_INITIALIZER; +static int g_otel_calls = 0; +static int g_otel_401_remaining = 0; +static char g_otel_auth_header[256]; + +static void otel_state_reset(void) +{ + pthread_mutex_lock(&g_otel_lock); + g_otel_calls = 0; + g_otel_401_remaining = 0; + memset(g_otel_auth_header, 0, sizeof(g_otel_auth_header)); + pthread_mutex_unlock(&g_otel_lock); +} + +static void cb_mock_otel(mk_request_t *request, void *data) +{ + struct mk_http_header *auth; + char auth_val[256]; + int serve_401; + (void) data; + + /* Read the Authorization header before acquiring the global lock. */ + auth_val[0] = '\0'; + auth = mk_http_header_get(MK_HEADER_AUTHORIZATION, request, NULL, 0); + if (auth != NULL && auth->val.data != NULL && auth->val.len > 0) { + snprintf(auth_val, sizeof(auth_val), + "%.*s", (int) auth->val.len, auth->val.data); + } + + pthread_mutex_lock(&g_otel_lock); + g_otel_calls++; + serve_401 = (g_otel_401_remaining > 0); + if (serve_401) { + g_otel_401_remaining--; + } + if (auth_val[0] != '\0') { + strncpy(g_otel_auth_header, auth_val, sizeof(g_otel_auth_header) - 1); + g_otel_auth_header[sizeof(g_otel_auth_header) - 1] = '\0'; + } + pthread_mutex_unlock(&g_otel_lock); + + if (serve_401) { + mk_http_status(request, 401); + mk_http_done(request); + return; + } + + mk_http_status(request, 200); + mk_http_done(request); +} + +static mk_ctx_t *mock_otel_start(int port) +{ + char addr[32]; + mk_ctx_t *mk; + int vid; + + mk = mk_create(); + if (!mk) { + return NULL; + } + + snprintf(addr, sizeof(addr), "127.0.0.1:%d", port); + mk_config_set(mk, "Listen", addr, NULL); + + vid = mk_vhost_create(mk, NULL); + mk_vhost_set(mk, vid, "Name", "mock-otel", NULL); + mk_vhost_handler(mk, vid, "/", cb_mock_otel, NULL); + + mk_start(mk); + return mk; +} + +static void mock_otel_stop(mk_ctx_t *mk) +{ + if (mk) { + mk_stop(mk); + mk_destroy(mk); + } +} + +void flb_test_metadata_token_legacy_post(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + char metadata_url[128]; + char otel_port_str[16]; + mk_ctx_t *mk_meta; + mk_ctx_t *mk_otel; + int calls; + char auth_header[256]; + + t7_state_reset(); + otel_state_reset(); + + mk_meta = mock_meta_start_t7(MOCK_METADATA_PORT_T7); + TEST_CHECK(mk_meta != NULL); + + mk_otel = mock_otel_start(MOCK_OTEL_PORT); + TEST_CHECK(mk_otel != NULL); + + snprintf(metadata_url, sizeof(metadata_url), + "http://127.0.0.1:%d/metadata/token", MOCK_METADATA_PORT_T7); + snprintf(otel_port_str, sizeof(otel_port_str), "%d", MOCK_OTEL_PORT); + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "0.5", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", otel_port_str, + "http2", "off", + "metadata_token_url", metadata_url, + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_lib_push(ctx, in_ffd, "[0, {\"msg\": \"hello\"}]", 21); + sleep(2); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + + TEST_CHECK(otel_ctx->enable_http2_flag == FLB_FALSE); + + pthread_mutex_lock(&g_t7_lock); + calls = g_t7_calls; + pthread_mutex_unlock(&g_t7_lock); + TEST_CHECK(calls >= 1); + + TEST_CHECK(otel_ctx->oauth2_ctx != NULL); + if (otel_ctx->oauth2_ctx) { + TEST_CHECK(otel_ctx->oauth2_ctx->access_token != NULL); + if (otel_ctx->oauth2_ctx->access_token) { + TEST_CHECK(strcmp(otel_ctx->oauth2_ctx->access_token, + "test-token-123") == 0); + } + } + + /* The mock OTel server must have received the Bearer token in the + * Authorization header, proving the legacy POST actually carries the token + * (not just that the token was stored in oauth2_ctx). */ + pthread_mutex_lock(&g_otel_lock); + strncpy(auth_header, g_otel_auth_header, sizeof(auth_header) - 1); + auth_header[sizeof(auth_header) - 1] = '\0'; + pthread_mutex_unlock(&g_otel_lock); + TEST_CHECK(strcmp(auth_header, "Bearer test-token-123") == 0); + + flb_stop(ctx); + flb_destroy(ctx); + mock_meta_stop(mk_meta); + mock_otel_stop(mk_otel); +} + +void flb_test_metadata_token_401_recovery(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char metadata_url[128]; + char otel_port_str[16]; + mk_ctx_t *mk_meta; + mk_ctx_t *mk_otel; + int calls_after_first; + int calls_after_second; + int otel_calls; + + t7_state_reset(); + otel_state_reset(); + + pthread_mutex_lock(&g_otel_lock); + g_otel_401_remaining = 1; + pthread_mutex_unlock(&g_otel_lock); + + mk_meta = mock_meta_start_t7(MOCK_METADATA_PORT_T7); + TEST_CHECK(mk_meta != NULL); + + mk_otel = mock_otel_start(MOCK_OTEL_PORT); + TEST_CHECK(mk_otel != NULL); + + snprintf(metadata_url, sizeof(metadata_url), + "http://127.0.0.1:%d/metadata/token", MOCK_METADATA_PORT_T7); + snprintf(otel_port_str, sizeof(otel_port_str), "%d", MOCK_OTEL_PORT); + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "0.5", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", otel_port_str, + "http2", "off", + "metadata_token_url", metadata_url, + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* + * First flush: token is fetched from metadata (g_t7_calls becomes 1), + * POST is sent to OTel mock which returns 401. + * opentelemetry_legacy_post() calls flb_oauth2_invalidate_token() + * (sets expires_at = 0) and returns FLB_RETRY directly. + * No second metadata fetch happens yet. + */ + flb_lib_push(ctx, in_ffd, "[0, {\"msg\": \"first\"}]", 21); + sleep(2); + + pthread_mutex_lock(&g_t7_lock); + calls_after_first = g_t7_calls; + pthread_mutex_unlock(&g_t7_lock); + TEST_CHECK(calls_after_first >= 1); + + /* + * Second flush: flb_otel_metadata_token_refresh() sees expires_at = 0 + * (set by flb_oauth2_invalidate_token()), fetches a new token from the + * metadata endpoint, then sends the POST to OTel which returns 200. + */ + flb_lib_push(ctx, in_ffd, "[0, {\"msg\": \"second\"}]", 22); + sleep(2); + + pthread_mutex_lock(&g_t7_lock); + calls_after_second = g_t7_calls; + pthread_mutex_unlock(&g_t7_lock); + + /* The second flush must have triggered a new metadata fetch. */ + TEST_CHECK(calls_after_second > calls_after_first); + + pthread_mutex_lock(&g_otel_lock); + otel_calls = g_otel_calls; + pthread_mutex_unlock(&g_otel_lock); + TEST_CHECK(otel_calls >= 2); + + flb_stop(ctx); + flb_destroy(ctx); + mock_meta_stop(mk_meta); + mock_otel_stop(mk_otel); +} + +void flb_test_metadata_token_refresh_interval_override(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + char metadata_url[128]; + mk_ctx_t *mk; + time_t before_fetch; + time_t expires_at; + + t7_state_reset(); + + pthread_mutex_lock(&g_t7_lock); + g_t7_resp_type = MOCK_T7_RESP_120S; + pthread_mutex_unlock(&g_t7_lock); + + mk = mock_meta_start_t7(MOCK_METADATA_PORT_T7); + TEST_CHECK(mk != NULL); + + snprintf(metadata_url, sizeof(metadata_url), + "http://127.0.0.1:%d/metadata/token", MOCK_METADATA_PORT_T7); + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "0.5", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "19998", + "metadata_token_url", metadata_url, + "metadata_token_refresh", "90", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + before_fetch = time(NULL); + flb_lib_push(ctx, in_ffd, "[0, {\"msg\": \"hello\"}]", 21); + sleep(2); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + + TEST_CHECK(otel_ctx->oauth2_ctx != NULL); + if (otel_ctx && otel_ctx->oauth2_ctx) { + expires_at = otel_ctx->oauth2_ctx->expires_at; + /* TTL must be ~90s, not 120s (metadata_token_refresh caps expires_in). */ + TEST_CHECK(expires_at <= before_fetch + 92); + TEST_CHECK(expires_at >= before_fetch + 88); + } + + flb_stop(ctx); + flb_destroy(ctx); + mock_meta_stop(mk); +} + +void flb_test_metadata_token_missing_expires_in(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + char metadata_url[128]; + mk_ctx_t *mk; + time_t before_fetch; + time_t expires_at; + + t7_state_reset(); + + pthread_mutex_lock(&g_t7_lock); + g_t7_resp_type = MOCK_T7_RESP_NO_EXPIRY; + pthread_mutex_unlock(&g_t7_lock); + + mk = mock_meta_start_t7(MOCK_METADATA_PORT_T7); + TEST_CHECK(mk != NULL); + + snprintf(metadata_url, sizeof(metadata_url), + "http://127.0.0.1:%d/metadata/token", MOCK_METADATA_PORT_T7); + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "0.5", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "19998", + "metadata_token_url", metadata_url, + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + before_fetch = time(NULL); + flb_lib_push(ctx, in_ffd, "[0, {\"msg\": \"hello\"}]", 21); + sleep(2); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + + TEST_CHECK(otel_ctx->oauth2_ctx != NULL); + if (otel_ctx && otel_ctx->oauth2_ctx) { + TEST_CHECK(otel_ctx->oauth2_ctx->access_token != NULL); + expires_at = otel_ctx->oauth2_ctx->expires_at; + /* + * No expires_in in the mock response: metadata_parse_token_json() + * returns raw_expires_in=0, causing the caller to fall back to + * FLB_OAUTH2_DEFAULT_EXPIRES. metadata_token_refresh defaults to + * 3600 which is larger, so FLB_OAUTH2_DEFAULT_EXPIRES wins. + */ + TEST_CHECK(expires_at >= before_fetch + FLB_OAUTH2_DEFAULT_EXPIRES - 10); + TEST_CHECK(expires_at <= before_fetch + FLB_OAUTH2_DEFAULT_EXPIRES + 10); + } + + flb_stop(ctx); + flb_destroy(ctx); + mock_meta_stop(mk); +} + +void flb_test_metadata_token_short_expires_in(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + char metadata_url[128]; + mk_ctx_t *mk; + time_t before_fetch; + time_t expires_at; + + t7_state_reset(); + + pthread_mutex_lock(&g_t7_lock); + g_t7_resp_type = MOCK_T7_RESP_60S; + pthread_mutex_unlock(&g_t7_lock); + + mk = mock_meta_start_t7(MOCK_METADATA_PORT_T7); + TEST_CHECK(mk != NULL); + + snprintf(metadata_url, sizeof(metadata_url), + "http://127.0.0.1:%d/metadata/token", MOCK_METADATA_PORT_T7); + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "0.5", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "19998", + "metadata_token_url", metadata_url, + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + before_fetch = time(NULL); + flb_lib_push(ctx, in_ffd, "[0, {\"msg\": \"hello\"}]", 21); + sleep(2); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + + TEST_CHECK(otel_ctx->oauth2_ctx != NULL); + if (otel_ctx && otel_ctx->oauth2_ctx) { + TEST_CHECK(otel_ctx->oauth2_ctx->access_token != NULL); + expires_at = otel_ctx->oauth2_ctx->expires_at; + /* + * Server returned expires_in:60 which equals FLB_OAUTH2_DEFAULT_SKEW_SECS. + * The refresh path must clamp it to SKEW+1 so the token is not + * immediately considered expired by flb_oauth2_get_access_token(). + */ + TEST_CHECK(expires_at >= before_fetch + FLB_OAUTH2_DEFAULT_SKEW_SECS + 1); + TEST_CHECK(expires_at <= before_fetch + FLB_OAUTH2_DEFAULT_SKEW_SECS + 3); + } + + flb_stop(ctx); + flb_destroy(ctx); + mock_meta_stop(mk); +} + +void flb_test_metadata_token_scope_query_param(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + "metadata_token_url", "http://169.254.169.254/metadata/token", + "metadata_token_scope", "https://www.googleapis.com/auth/cloud-platform", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + if (otel_ctx) { + TEST_CHECK(otel_ctx->metadata_token_path != NULL); + if (otel_ctx->metadata_token_path) { + TEST_CHECK(strstr(otel_ctx->metadata_token_path, + "?scopes=https://www.googleapis.com/auth/cloud-platform") + != NULL); + } + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_metadata_token_audience_query_param(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + "metadata_token_url", "http://169.254.169.254/metadata/token", + "metadata_token_audience", "my-service-account@project.iam.gserviceaccount.com", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + if (otel_ctx) { + TEST_CHECK(otel_ctx->metadata_token_path != NULL); + if (otel_ctx->metadata_token_path) { + TEST_CHECK(strstr(otel_ctx->metadata_token_path, + "?audience=my-service-account@project.iam.gserviceaccount.com") + != NULL); + } + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_metadata_token_both_query_params(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + "metadata_token_url", "http://169.254.169.254/metadata/token", + "metadata_token_scope", "cloud-platform", + "metadata_token_audience", "my-audience", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + if (otel_ctx) { + TEST_CHECK(otel_ctx->metadata_token_path != NULL); + if (otel_ctx->metadata_token_path) { + TEST_CHECK(strstr(otel_ctx->metadata_token_path, + "?scopes=cloud-platform&audience=my-audience") + != NULL); + } + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_metadata_token_scope_without_url_ignored(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + "metadata_token_scope", "https://www.googleapis.com/auth/cloud-platform", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + if (otel_ctx) { + TEST_CHECK(otel_ctx->oauth2_ctx == NULL); + TEST_CHECK(otel_ctx->metadata_token_path == NULL); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_metadata_token_scope_url_with_existing_query(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + "metadata_token_url", "http://169.254.169.254/metadata/token?format=json", + "metadata_token_scope", "cloud-platform", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + if (otel_ctx) { + TEST_CHECK(otel_ctx->metadata_token_path != NULL); + if (otel_ctx->metadata_token_path) { + /* URL already has '?', so scope must use '&' not '?' */ + TEST_CHECK(strstr(otel_ctx->metadata_token_path, + "&scopes=cloud-platform") != NULL); + TEST_CHECK(strstr(otel_ctx->metadata_token_path, + "?scopes=") == NULL); + } + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_metadata_token_audience_url_with_existing_query(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + "metadata_token_url", "http://169.254.169.254/metadata/token?format=json", + "metadata_token_audience", "my-audience", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + if (otel_ctx) { + TEST_CHECK(otel_ctx->metadata_token_path != NULL); + if (otel_ctx->metadata_token_path) { + /* URL already has '?', so audience must use '&' not '?' */ + TEST_CHECK(strstr(otel_ctx->metadata_token_path, + "&audience=my-audience") != NULL); + TEST_CHECK(strstr(otel_ctx->metadata_token_path, + "?audience=") == NULL); + } + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_metadata_token_empty_scope_ignored(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + "metadata_token_url", "http://169.254.169.254/metadata/token", + "metadata_token_scope", "", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + if (otel_ctx) { + TEST_CHECK(otel_ctx->metadata_token_path != NULL); + if (otel_ctx->metadata_token_path) { + /* empty scope must not append scopes= query parameter */ + TEST_CHECK(strstr(otel_ctx->metadata_token_path, "scopes=") == NULL); + } + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_metadata_token_empty_audience_ignored(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct opentelemetry_context *otel_ctx; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + flb_service_set(ctx, + "Flush", "10", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "opentelemetry", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "host", "127.0.0.1", + "port", "14317", + "metadata_token_url", "http://169.254.169.254/metadata/token", + "metadata_token_audience", "", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + otel_ctx = get_otel_ctx(ctx, out_ffd); + TEST_CHECK(otel_ctx != NULL); + if (otel_ctx) { + TEST_CHECK(otel_ctx->metadata_token_path != NULL); + if (otel_ctx->metadata_token_path) { + /* empty audience must not append audience= query parameter */ + TEST_CHECK(strstr(otel_ctx->metadata_token_path, "audience=") == NULL); + } + } + + flb_stop(ctx); + flb_destroy(ctx); +}