Skip to content

ref(kafka): Deduplicate kafka error logging#5811

Open
jjbayer wants to merge 5 commits intomasterfrom
ref/dedup-kafka-error
Open

ref(kafka): Deduplicate kafka error logging#5811
jjbayer wants to merge 5 commits intomasterfrom
ref/dedup-kafka-error

Conversation

@jjbayer
Copy link
Copy Markdown
Member

@jjbayer jjbayer commented Apr 10, 2026

Follow-up to #5607:

We are currently getting two sentry errors for producer failures:

  1. "failed to produce to Kafka" - This is specific to the store service and also catches configuration errors such as missing topics.
  2. "error sending kafka message" - This is generic in relay_kafka, but does not report config errors.

This PR removes the public send function on the KafkaClient so there is only one entry point send_kafka_message, and logs all errors there. For this, the outcome producer also needs to implement relay_kafka::Message.

return Err(ClientError::MissingTopic);
};

relay_log::configure_scope(|s| s.set_tag("topic", topic_name));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error is now logged higher up in the call stack, so we set the topic tag here as soon as it is known.

relay_log::error!(
error = error as &dyn std::error::Error,
tags.variant = message.variant(),
tags.abstract_topic = topic.as_str(),
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abstract_topic is not identical to topic, which is set above. But it might be useful for errors when the actual topic is not yet known.

@jjbayer jjbayer marked this pull request as ready for review April 10, 2026 13:14
@jjbayer jjbayer requested a review from a team as a code owner April 10, 2026 13:14
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

Bugbot Autofix prepared fixes for both issues found in the latest run.

  • ✅ Fixed: Test may fail in debug builds due to schema validation ordering
    • I changed the test message serialization to MsgPack so debug-only JSON schema validation is skipped and the assertion reliably reaches InvalidTopicName.
  • ✅ Fixed: Outcome errors now produce duplicate Sentry error reports
    • I removed the redundant outcome-service error logs at the Kafka call sites so failures are logged only once by the new send_message wrapper.

Create PR

Or push these changes by commenting:

@cursor push 161f1048d0
Preview (161f1048d0)
diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs
--- a/relay-server/src/services/outcome.rs
+++ b/relay-server/src/services/outcome.rs
@@ -1224,9 +1224,7 @@
             Self::Kafka(kafka_producer) => {
                 send_outcome_metric(&message, "kafka");
                 let raw_message = TrackRawOutcome::from_outcome(message, config);
-                if let Err(error) = self.send_kafka_message(kafka_producer, raw_message) {
-                    relay_log::error!(error = &error as &dyn Error, "failed to produce outcome");
-                }
+                let _ = self.send_kafka_message(kafka_producer, raw_message);
             }
             Self::ClientReport(producer) => {
                 send_outcome_metric(&message, "client_report");
@@ -1245,9 +1243,7 @@
             #[cfg(feature = "processing")]
             Self::Kafka(kafka_producer) => {
                 send_outcome_metric(&message, "kafka");
-                if let Err(error) = self.send_kafka_message(kafka_producer, message) {
-                    relay_log::error!(error = &error as &dyn Error, "failed to produce outcome");
-                }
+                let _ = self.send_kafka_message(kafka_producer, message);
             }
             Self::Http(producer) => {
                 send_outcome_metric(&message, "http");

diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs
--- a/relay-server/src/services/store.rs
+++ b/relay-server/src/services/store.rs
@@ -1889,7 +1889,7 @@
             }
 
             fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
-                Ok(SerializationOutput::Json(Cow::Borrowed(
+                Ok(SerializationOutput::MsgPack(Cow::Borrowed(
                     br#"{"timestamp": "foo", "outcome": 1}"#,
                 )))
             }

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 9b68c19. Configure here.

Ok(_) => Ok(()),
Err(kafka_error) => Err(OutcomeError::SendFailed(kafka_error)),
}
let _ = producer.client.send_message(topic, &message); // logs error internally
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't send_message then just not return an error?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants