Skip to content

out_pgsql: refactor plugin logic to follow new config maps and threads#11700

Open
sxd wants to merge 2 commits intofluent:masterfrom
sxd:out_pgsql_to_new_model
Open

out_pgsql: refactor plugin logic to follow new config maps and threads#11700
sxd wants to merge 2 commits intofluent:masterfrom
sxd:out_pgsql_to_new_model

Conversation

@sxd
Copy link
Copy Markdown
Member

@sxd sxd commented Apr 11, 2026

The plugin was too old and required an update as follow:

  • Use of the config map for the config options
  • Decoder for the log events
  • Use of the internal instances to avoid having a pool of connections
    test_local_out_pgsql.valgrind.log

Also, some unit tests were added to make it easy to maintain the plugin


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • [ N/A] Example configuration file for the change
  • [ N/A] Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

Documentation

  • Documentation required for this feature

Backporting

  • [N/A] Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • Prepared INSERTs with explicit transactions for more reliable, efficient writes.
  • Bug Fixes

    • Centralized error reporting and automatic rollback on failures.
    • More robust event decoding with consistent timestamp formatting and safer JSON body handling.
  • Refactor

    • Simplified connection lifecycle to a single active connection and config-driven settings.
  • Tests

    • Added extensive unit tests covering SQL generation, timestamps, JSON bodies, connections, and error/retry scenarios.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 11, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Configuration now uses a flb_config_map; plugin builds and prepares a single parameterized INSERT at init; flush decodes msgpack with flb_log_event_decoder, formats timestamps and JSON bodies, executes per-event PQexecPrepared inside an explicit transaction on a single PGconn; connection pool removed; helpers and unit tests added.

Changes

Cohort / File(s) Summary
Plugin Core
plugins/out_pgsql/pgsql.c
Use config_map for config; construct insert_query (pgsql_build_insert_query) and prepare once (PQprepare); flush uses flb_log_event_decoder, formats timestamp/body JSON, executes PQexecPrepared per event inside BEGIN/COMMIT/ROLLBACK; centralized libpq/result error logging and decoder-to-fluent-bit result mapping; metadata updated (event_type, config_map, flags, workers).
Public API / Types
plugins/out_pgsql/pgsql.h
Added packing/SDS/Msgpack includes and new helper declarations; removed pool-related structs/macros; struct flb_pgsql_config now holds PGconn *conn_current, db_table_escaped, insert_query, and insert_statement_prepared; new helpers for timestamp formatting, msgpack→JSON, result/conn logging, and status mapping.
Connection Management
plugins/out_pgsql/pgsql_connections.c
Replaced pooled-connections model with a single PGconn *: pgsql_create_connection returns raw PGconn *; pgsql_start_connections creates/assigns one connection; pgsql_destroy_connections drains remaining results and calls PQfinish; removed queue, pool sizing, nonblocking and connection-rotation logic.
Tests & Build
tests/internal/CMakeLists.txt, tests/internal/pgsql.c
Added unit-test wiring for pgsql plugin; new tests/internal/pgsql.c provides a libpq mock and tests for INSERT generation (Postgres vs CockroachDB), timestamp formatting, msgpack→JSON formatting/cleanup, decoder result translation, conn-status mapping, connection-edgecases, and transactional flush behaviors including prepare/exec failure and recovery.

Sequence Diagram(s)

sequenceDiagram
    participant Plugin as Plugin
    participant Decoder as flb_log_event_decoder
    participant LibPQ as libpq (PGconn)
    participant DB as PostgreSQL
    Plugin->>Decoder: decode msgpack batch → events
    loop per event
        Decoder-->>Plugin: event (timestamp, body)
        Plugin->>Plugin: format timestamp & body JSON
        Plugin->>LibPQ: PQexecPrepared(insert_stmt, params)
        LibPQ->>DB: execute prepared statement
        DB-->>LibPQ: result (OK / ERROR)
        LibPQ-->>Plugin: result status
    end
    alt all events succeeded
        Plugin->>LibPQ: execute "COMMIT"
    else any error
        Plugin->>LibPQ: execute "ROLLBACK"
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 I built an insert, prepared and neat,
One connection hopping on steady feet.
Events decoded into JSON bright,
Transactions rolled with careful might.
Tests nibble bugs until it's right.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main change: refactoring the out_pgsql plugin to use config maps and the new threading model, which aligns with the primary objectives of the PR.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1b39a245f8

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread plugins/out_pgsql/pgsql.c Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@plugins/out_pgsql/pgsql.c`:
- Around line 430-431: The branch is only checking for == 1 but
pgsql_next_connection() can return -1 on failure, so change the condition to
check for non-zero: replace the if (pgsql_next_connection(ctx) == 1) {
FLB_OUTPUT_RETURN(FLB_RETRY); } with a non-zero check (if
(pgsql_next_connection(ctx) != 0) { FLB_OUTPUT_RETURN(FLB_RETRY); }) to ensure
failures (e.g., -1) also return FLB_RETRY before calling PQstatus() on
ctx->conn_current.
- Around line 468-500: Wrap the entire chunk processing loop in an explicit
PostgreSQL transaction: before entering the flb_log_event_decoder_next loop call
PQexec(ctx->pg_conn, "BEGIN") (or use the existing helper if any), on successful
completion after the loop call COMMIT, and on any failure path (any non-zero
return from pgsql_insert_record, json == NULL, or decoder_result !=
FLB_EVENT_DECODER_SUCCESS) call ROLLBACK before returning FLB_RETRY or
translating the decoder result; update the error paths that call flb_plg_error,
pgsql_free_body_json, flb_log_event_decoder_destroy, and FLB_OUTPUT_RETURN so
they perform ROLLBACK using the same pg connection, ensuring
pgsql_insert_record, flb_log_event_decoder_get_last_result and
pgsql_translate_decoder_result remain intact.

In `@tests/internal/pgsql.c`:
- Around line 45-210: Add two unit tests exercising cb_pgsql_flush failure
paths: (1) a mid-batch insert failure test that builds a minimal struct
flb_pgsql_config and a fake/initialized struct flb_pgsql_conn where the
connection is made to fail (simulate a failing PG exec state) then call
cb_pgsql_flush and assert it returns FLB_RETRY and that reconnect/reprepare
behavior is triggered (e.g., connection marked for reconnect or ctx.conn_current
moved); (2) a decoder terminal error test that drives the event decoder to
return FLB_EVENT_DECODER_ERROR_WRONG_ROOT_TYPE during iteration and assert
cb_pgsql_flush returns FLB_ERROR (or the expected terminal code) and that
resources are cleaned up. Use existing test helpers/assertions (TEST_CHECK) and
reference cb_pgsql_flush, struct flb_pgsql_config, struct flb_pgsql_conn, and
FLB_EVENT_DECODER_ERROR_WRONG_ROOT_TYPE/FLB_RETRY to locate where to hook the
simulated failures.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 514328fe-44be-4e01-aa2a-07f1194ddc64

📥 Commits

Reviewing files that changed from the base of the PR and between d86a053 and 1b39a24.

📒 Files selected for processing (5)
  • plugins/out_pgsql/pgsql.c
  • plugins/out_pgsql/pgsql.h
  • plugins/out_pgsql/pgsql_connections.c
  • tests/internal/CMakeLists.txt
  • tests/internal/pgsql.c

Comment thread plugins/out_pgsql/pgsql.c Outdated
Comment thread plugins/out_pgsql/pgsql.c Outdated
Comment thread tests/internal/pgsql.c
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@plugins/out_pgsql/pgsql_connections.c`:
- Around line 53-63: PQstatus() is being called on conn without checking for
NULL after PQsetdbLogin; update the connection handling in the block that calls
PQsetdbLogin(ctx->db_hostname, ctx->db_port, ctx->conn_options, NULL,
ctx->db_name, ctx->db_user, ctx->db_passwd) to first test if conn == NULL and
handle that error path (log via pgsql_log_conn_error or a suitable logger and
avoid calling PQstatus/PQfinish on a NULL pointer), then only call
PQstatus(conn) and PQfinish(conn) when conn is non-NULL to prevent null pointer
dereference.

In `@plugins/out_pgsql/pgsql.c`:
- Around line 485-493: The decoder init block currently treats any
flb_log_event_decoder_init() failure as FLB_RETRY; change it to call
pgsql_translate_decoder_result(decoder_result) and use its returned FLB_* code
so terminal decoder errors map to FLB_ERROR instead of always retrying.
Concretely, after calling flb_log_event_decoder_init(&log_decoder, (char *)
event_chunk->data, event_chunk->size) and getting decoder_result !=
FLB_EVENT_DECODER_SUCCESS, replace the unconditional FLB_RETRY return with a
translated return value from pgsql_translate_decoder_result(decoder_result)
while keeping the flb_plg_error(ctx->ins, ...) log call intact; this ensures
decoder_result is mapped consistently with the post-loop handling.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 83d5dff7-164e-4fd2-b641-7172f1610175

📥 Commits

Reviewing files that changed from the base of the PR and between 1b39a24 and be8d652.

📒 Files selected for processing (5)
  • plugins/out_pgsql/pgsql.c
  • plugins/out_pgsql/pgsql.h
  • plugins/out_pgsql/pgsql_connections.c
  • tests/internal/CMakeLists.txt
  • tests/internal/pgsql.c
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/internal/CMakeLists.txt

Comment thread plugins/out_pgsql/pgsql_connections.c
Comment thread plugins/out_pgsql/pgsql.c
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
tests/internal/pgsql.c (1)

575-587: Make this a real precision regression test.

This case uses .500000000, which is exactly the kind of timestamp that hides the floating-point conversion path above. Using a value such as .123456789 would turn this into a regression test for precision loss in pgsql_format_timestamp().

Suggested test tweak
-    flb_time_set(&timestamp, 1700000000, 500000000);
+    flb_time_set(&timestamp, 1700000000, 123456789);

     result = pgsql_format_timestamp(buffer, sizeof(buffer), &timestamp);

     TEST_CHECK(result > 0);
-    TEST_CHECK(strcmp(buffer, "1700000000.500000000") == 0);
+    TEST_CHECK(strcmp(buffer, "1700000000.123456789") == 0);
As per coding guidelines, "Add or update tests for behavior changes, especially protocol parsing and encoder/decoder paths".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/internal/pgsql.c` around lines 575 - 587, The test
test_pgsql_format_timestamp currently uses a fractional part of .500000000 which
masks floating-point conversion issues; update the test to use a non-trivial
fractional value (e.g. .123456789) so pgsql_format_timestamp is exercised for
precision loss, set flb_time_set(&timestamp, 1700000000, 123456789), call
pgsql_format_timestamp(buffer, sizeof(buffer), &timestamp) and assert the result
is >0 and strcmp(buffer, "1700000000.123456789") == 0 to catch any regression in
precision handling.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@plugins/out_pgsql/pgsql.c`:
- Around line 48-75: The insert path currently converts event times via
flb_time_to_double in pgsql_format_timestamp and binds the lossy double as $2;
change the query and binding to preserve exact timestamps instead—either (A)
update pgsql_build_insert_query to accept a text timestamp parameter (bind the
formatted ISO8601 timestamp string as $2 and use the DB's parse function, e.g.,
VALUES ($1, $2::timestamptz, $3::jsonb)) and adjust pgsql_format_timestamp to
emit the full-text timestamp, or (B) modify pgsql_build_insert_query to accept
two integer parameters (sec and nsec) and construct the timestamp in SQL (e.g.,
DATE '1970-01-01' + ($2::bigint * INTERVAL '1 second' + $3::bigint * INTERVAL '1
nanosecond')), updating pgsql_format_timestamp to bind sec and nsec instead of
flb_time_to_double; pick one approach and update the prepared statement,
parameter positions, and bindings accordingly.

---

Nitpick comments:
In `@tests/internal/pgsql.c`:
- Around line 575-587: The test test_pgsql_format_timestamp currently uses a
fractional part of .500000000 which masks floating-point conversion issues;
update the test to use a non-trivial fractional value (e.g. .123456789) so
pgsql_format_timestamp is exercised for precision loss, set
flb_time_set(&timestamp, 1700000000, 123456789), call
pgsql_format_timestamp(buffer, sizeof(buffer), &timestamp) and assert the result
is >0 and strcmp(buffer, "1700000000.123456789") == 0 to catch any regression in
precision handling.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: da7795e7-2974-4f76-9f5c-84bc04e764df

📥 Commits

Reviewing files that changed from the base of the PR and between fbd5809 and 75e4133.

📒 Files selected for processing (5)
  • plugins/out_pgsql/pgsql.c
  • plugins/out_pgsql/pgsql.h
  • plugins/out_pgsql/pgsql_connections.c
  • tests/internal/CMakeLists.txt
  • tests/internal/pgsql.c
✅ Files skipped from review due to trivial changes (1)
  • tests/internal/CMakeLists.txt

Comment thread plugins/out_pgsql/pgsql.c
@cosmo0920
Copy link
Copy Markdown
Contributor

Hi, thanks for your contribution.

We need to address this type of linter error:

Run python .github/scripts/commit_prefix_check.py

❌ Commit 75e41337f0 failed:
Subject prefix 'out_pgsql:' does not match files changed.
Expected one of: CMakeLists:, build:, out_pgsql:, pgsql:, tests:


Commit prefix validation failed.

Usually, we split this type of commits into two of the commits at least.

Current

out_pgsql: CMakeLists:, build:, tests: refactor plugin for the modern design

Ideal

out_pgsql:  refactor plugin for the modern design
tests: out_pqsql: Add test cases

Could you address this linter failure?

sxd added 2 commits April 16, 2026 18:22
The plugin was too old and required an update as follow:
* Use of the config map for the config options
* Decoder for the log events
* Use of the internal instances to avoid having a pool of connections

Signed-off-by: Jonathan Gonzalez V. <jonathan.abdiel@gmail.com>
Signed-off-by: Jonathan Gonzalez V. <jonathan.abdiel@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants