Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/out_opentelemetry/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ set(src
opentelemetry_logs.c
opentelemetry_utils.c
opentelemetry_conf.c
opentelemetry_metadata.c
)

FLB_PLUGIN(out_opentelemetry "${src}" "")
107 changes: 104 additions & 3 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ extern void cmt_encode_opentelemetry_destroy(cfl_sds_t text);

#include "opentelemetry.h"
#include "opentelemetry_conf.h"
#include "opentelemetry_metadata.h"
#include "opentelemetry_utils.h"

static int is_http_status_code_retrayable(int http_code)
Expand Down Expand Up @@ -208,6 +209,8 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx,
struct flb_config_map_val *mv;
struct flb_http_client *c;
flb_sds_t signature = NULL;
flb_sds_t meta_token = NULL;
flb_sds_t meta_token_view = NULL;

compressed = FLB_FALSE;

Expand Down Expand Up @@ -340,7 +343,41 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx,
/* Map debug callbacks */
flb_http_client_debug(c, ctx->ins->callback);

ret = flb_http_do_with_oauth2(c, &b_sent, ctx->oauth2_ctx);
/* Metadata token: inject Bearer directly, handle 401 via invalidate+retry
* (the metadata endpoint is GET-only, so oauth2 POST retry would fail) */
if (ctx->metadata_token_url && ctx->oauth2_ctx != NULL) {
/* Hold mutex across get+copy to prevent concurrent refresh free */
pthread_mutex_lock(&ctx->metadata_mutex);
if (flb_oauth2_get_access_token(ctx->oauth2_ctx,
&meta_token_view, FLB_FALSE) != 0
|| meta_token_view == NULL) {
pthread_mutex_unlock(&ctx->metadata_mutex);
flb_plg_error(ctx->ins, "metadata: failed to get access token");
out_ret = FLB_RETRY;
goto cleanup;
}
meta_token = flb_sds_create(meta_token_view);
pthread_mutex_unlock(&ctx->metadata_mutex);
if (!meta_token) {
flb_plg_error(ctx->ins, "metadata: out of memory for access token");
out_ret = FLB_RETRY;
goto cleanup;
}
if (flb_http_bearer_auth(c, meta_token) != 0) {
flb_plg_error(ctx->ins, "metadata: failed to set bearer auth");
out_ret = FLB_RETRY;
goto cleanup;
}
ret = flb_http_do(c, &b_sent);
if (ret == 0 && c->resp.status == 401) {
flb_oauth2_invalidate_token(ctx->oauth2_ctx);
out_ret = FLB_RETRY;
goto cleanup;
}
}
else {
ret = flb_http_do_with_oauth2(c, &b_sent, ctx->oauth2_ctx);
}

if (ret == 0) {
/*
Expand Down Expand Up @@ -406,6 +443,10 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx,
flb_free(final_body);
}

if (meta_token) {
flb_sds_destroy(meta_token);
}

/* Destroy HTTP client context */
flb_http_client_destroy(c);

Expand All @@ -422,6 +463,7 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
const char *grpc_uri)
{
flb_sds_t oauth2_token;
flb_sds_t oauth2_token_view;
const char *compression_algorithm;
uint32_t wire_message_length;
size_t grpc_body_length;
Expand All @@ -433,6 +475,7 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
int result;

oauth2_token = NULL;
oauth2_token_view = NULL;

if (!ctx->enable_http2_flag) {
return opentelemetry_legacy_post(ctx,
Expand Down Expand Up @@ -556,18 +599,37 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
}

if (ctx->oauth2_ctx != NULL && ctx->oauth2_config.enabled == FLB_TRUE) {
if (ctx->metadata_token_url) {
pthread_mutex_lock(&ctx->metadata_mutex);
}
result = flb_oauth2_get_access_token(ctx->oauth2_ctx,
&oauth2_token,
&oauth2_token_view,
FLB_FALSE);
if (result != 0 || oauth2_token == NULL) {
if (result != 0 || oauth2_token_view == NULL) {
if (ctx->metadata_token_url) {
pthread_mutex_unlock(&ctx->metadata_mutex);
}
flb_plg_error(ctx->ins, "failed to obtain oauth2 access token");
flb_http_client_request_destroy(request, FLB_TRUE);

return FLB_RETRY;
}

oauth2_token = flb_sds_create(oauth2_token_view);
if (ctx->metadata_token_url) {
pthread_mutex_unlock(&ctx->metadata_mutex);
}
if (!oauth2_token) {
flb_plg_error(ctx->ins, "failed to copy oauth2 access token");
flb_http_client_request_destroy(request, FLB_TRUE);

return FLB_RETRY;
}

result = flb_http_request_set_parameters(request,
FLB_HTTP_CLIENT_ARGUMENT_BEARER_TOKEN(oauth2_token));
flb_sds_destroy(oauth2_token);
oauth2_token = NULL;

if (result != 0) {
flb_plg_error(ctx->ins, "error setting oauth2 authorization data");
Expand Down Expand Up @@ -1041,6 +1103,14 @@ static void cb_opentelemetry_flush(struct flb_event_chunk *event_chunk,
struct flb_config *config)
{
int result = FLB_RETRY;
struct opentelemetry_context *ctx = out_context;

/* Refresh the metadata token before dispatching; no-op when not configured */
if (ctx->metadata_token_url) {
if (flb_otel_metadata_token_refresh(ctx) != 0) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}
}

if (event_chunk->type == FLB_INPUT_METRICS){
result = process_metrics(event_chunk, out_flush, ins, out_context, config);
Expand Down Expand Up @@ -1279,6 +1349,37 @@ static struct flb_config_map config_map[] = {
},


/*
* Metadata token auth
* -------------------
*/
{
FLB_CONFIG_MAP_STR, "metadata_token_url", NULL,
0, FLB_TRUE, offsetof(struct opentelemetry_context, metadata_token_url),
"URL to fetch the IAM token from a cloud metadata endpoint via HTTP GET"
},
{
FLB_CONFIG_MAP_STR, "metadata_token_header", NULL,
0, FLB_TRUE, offsetof(struct opentelemetry_context, metadata_token_header),
"Optional HTTP header to include in the metadata token request "
"(e.g. \"Metadata-Flavor: Google\")"
},
{
FLB_CONFIG_MAP_INT, "metadata_token_refresh", "3600",
0, FLB_TRUE, offsetof(struct opentelemetry_context, metadata_token_refresh),
"Maximum token refresh interval in seconds (default: 3600, must be > 60)"
},
{
FLB_CONFIG_MAP_STR, "metadata_token_scope", NULL,
0, FLB_TRUE, offsetof(struct opentelemetry_context, metadata_token_scope),
"Scope value appended as ?scopes=<value> to metadata token GET request"
},
{
FLB_CONFIG_MAP_STR, "metadata_token_audience", NULL,
0, FLB_TRUE, offsetof(struct opentelemetry_context, metadata_token_audience),
"Audience value appended as ?audience=<value> to metadata token GET request"
},

/* EOF */
{0}
};
Expand Down
14 changes: 14 additions & 0 deletions plugins/out_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#ifndef FLB_OUT_OPENTELEMETRY_H
#define FLB_OUT_OPENTELEMETRY_H

#include <pthread.h>

#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_oauth2.h>
#include <fluent-bit/flb_record_accessor.h>
Expand Down Expand Up @@ -64,6 +66,17 @@ struct opentelemetry_context {
struct flb_oauth2 *oauth2_ctx;
const char *oauth2_auth_method;

/* Metadata token Auth */
const char *metadata_token_url;
const char *metadata_token_header;
int metadata_token_refresh;
const char *metadata_token_scope; /* appended as ?scopes=<value> */
const char *metadata_token_audience; /* appended as ?audience=<value> */
flb_sds_t metadata_token_path;
struct flb_upstream *metadata_u;
pthread_mutex_t metadata_mutex;
int metadata_mutex_initialized;

/* AWS Auth */
#ifdef FLB_HAVE_SIGNV4
#ifdef FLB_HAVE_AWS
Expand Down Expand Up @@ -97,6 +110,7 @@ struct opentelemetry_context {

/* HTTP client */
struct flb_http_client_ng http_client;
int http_client_initialized;

/* record metadata parsing */
flb_sds_t logs_metadata_key;
Expand Down
58 changes: 56 additions & 2 deletions plugins/out_opentelemetry/opentelemetry_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include "opentelemetry.h"
#include "opentelemetry_conf.h"
#include "opentelemetry_metadata.h"

/* create a single entry of log_body_key */
static int log_body_key_create(struct opentelemetry_context *ctx, char *ra_pattern)
Expand Down Expand Up @@ -303,6 +304,14 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output
}
}

/* metadata_token_url and standard OAuth2 are mutually exclusive */
if (ctx->metadata_token_url && ctx->oauth2_config.enabled == FLB_TRUE) {
flb_plg_error(ins,
"metadata_token_url and oauth2 cannot be used at the same time");
flb_opentelemetry_context_destroy(ctx);
return NULL;
}

if (ctx->max_resources < 0) {
flb_plg_error(ins, "max_resources must be greater than or equal to zero");
flb_opentelemetry_context_destroy(ctx);
Expand Down Expand Up @@ -463,6 +472,45 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output
}
}

/* metadata token mode: create a minimal oauth2 context for Bearer injection */
if (ctx->metadata_token_url) {
/*
* metadata_token_refresh must exceed FLB_OAUTH2_DEFAULT_SKEW_SECS.
* Lower values cause the oauth2 layer to treat every freshly-fetched
* token as expired, triggering a POST refresh that always fails -
* no data is ever delivered.
*/
if (ctx->metadata_token_refresh <= FLB_OAUTH2_DEFAULT_SKEW_SECS) {
flb_plg_error(ctx->ins,
"metadata_token_refresh must be > %d seconds",
FLB_OAUTH2_DEFAULT_SKEW_SECS);
flb_opentelemetry_context_destroy(ctx);
return NULL;
}

ctx->oauth2_ctx = flb_oauth2_create(config,
ctx->metadata_token_url,
ctx->metadata_token_refresh);
if (!ctx->oauth2_ctx) {
flb_plg_error(ctx->ins,
"failed to create oauth2 context for metadata token auth");
flb_opentelemetry_context_destroy(ctx);
return NULL;
}
ctx->oauth2_config.enabled = FLB_TRUE;
/*
* oauth2_apply_defaults() hard-sets cfg.enabled = FLB_FALSE; override it
* so that flb_oauth2_get_access_token() returns the pre-fetched token
* instead of bailing out immediately with -1.
*/
ctx->oauth2_ctx->cfg.enabled = FLB_TRUE;

if (flb_otel_metadata_token_create(ctx, config) != 0) {
flb_opentelemetry_context_destroy(ctx);
return NULL;
}
}

ctx->logs_uri_sanitized = sanitize_uri(ctx->logs_uri);
ctx->traces_uri_sanitized = sanitize_uri(ctx->traces_uri);
ctx->metrics_uri_sanitized = sanitize_uri(ctx->metrics_uri);
Expand Down Expand Up @@ -758,6 +806,9 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output

ctx = NULL;
}
else {
ctx->http_client_initialized = FLB_TRUE;
}

return ctx;
}
Expand All @@ -768,7 +819,9 @@ void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx)
return;
}

flb_http_client_ng_destroy(&ctx->http_client);
if (ctx->http_client_initialized) {
flb_http_client_ng_destroy(&ctx->http_client);
}

flb_kv_release(&ctx->kv_labels);

Expand Down Expand Up @@ -903,10 +956,11 @@ void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx)
#endif
#endif

flb_otel_metadata_token_destroy(ctx);

if (ctx->oauth2_ctx) {
flb_oauth2_destroy(ctx->oauth2_ctx);
}
flb_oauth2_config_destroy(&ctx->oauth2_config);

flb_free(ctx->proxy_host);
flb_free(ctx);
Expand Down
Loading
Loading