Optimize Kafka IO: Remove Guava Stopwatch overhead from consumer.poll() loop#37705
Optimize Kafka IO: Remove Guava Stopwatch overhead from consumer.poll() loop#37705junaiddshaukat wants to merge 1 commit intoapache:masterfrom
Conversation
Summary of ChangesHello @junaiddshaukat, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the performance of Kafka IO readers by addressing and removing two sources of unnecessary CPU overhead. By replacing an expensive timer mechanism with a more efficient alternative and eliminating redundant custom RPC latency metrics, the Kafka consumer's Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Abacn
left a comment
There was a problem hiding this comment.
It's deleting functioning metrics and codes. Please check generated code changes before pushing PRs
Replace Stopwatch (backed by System.nanoTime()) with lightweight System.currentTimeMillis() for timeout tracking in the hot consumer.poll() loop. The Stopwatch was adding more latency than it measured when Kafka had prefetched records ready to return immediately. This change keeps the updateSuccessfulRpcMetrics() calls and all shared metric infrastructure (KafkaMetrics, KafkaSinkMetrics) intact.
3201cbd to
67997a3
Compare
Thanks for the feedback, you were right that I accidentally removed existing metrics and related code in my initial change. I’ve updated the branch to:
I’ve re-run ./gradlew :sdks:java:io:kafka:test locally and it passes. Please take another look and let me know if you’d like me to narrow or adjust the change further. Thanks |
| final KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics(); | ||
| try { | ||
| while (Duration.ZERO.compareTo(remainingTimeout) < 0) { | ||
| // TODO: Remove this timer and use the existing fetch-latency-avg metric. |
There was a problem hiding this comment.
The TODO is still valid. It's referring to Kafka metrics: https://kafka.apache.org/20/generated/consumer_metrics.html
Stopwatch and System.currentTimeMillis doesn't make a difference here
There was a problem hiding this comment.
Thanks for the clarification, and I think there was some misunderstanding on my side about the intent of the TODO.
I originally read
“Remove this timer and use the existing fetch‑latency‑avg metric.”
as “the custom Stopwatch‑based timer in the hot consumer.poll() loop is too expensive and should be removed / replaced,” so I focused this PR on changing how we measure the latency (from Stopwatch to System.currentTimeMillis) while keeping the existing Beam metric behavior.
From your comments and the link to the Kafka consumer metrics docs, I now understand that your intent is more about relying on Kafka’s own fetch-latency-avg JMX metric instead of having a separate Beam RpcLatency metric here, rather than just which Java clock is used. In that sense, you’re right: this PR doesn’t actually implement the TODO, it just refactors the timer and doesn’t add much value.
Given that, I’m happy to either:
- Close this PR as a misaligned optimization attempt, or
- If you think it’s worthwhile, follow up with a separate change that truly removes the custom latency timer / RpcLatency metric in favor of Kafka’s metrics (with tests and a clearer design discussion up front).
Please let me know which direction you’d prefer; if you’d rather keep the current behavior and leave the TODO for later, I’ll go ahead and close this PR.
Fixes #37704
What happened?
This PR optimizes the Kafka IO readers (
ReadFromKafkaDoFnandKafkaUnboundedReader) by removing the GuavaStopwatchand the custom RPC latency metric tracking from the hotconsumer.poll()loop.The Problem:
Guava's
Stopwatchrelies onSystem.nanoTime(), which is a relatively expensive system call. In the Kafka IO module, this timer was being started and stopped repeatedly inside theconsumer.poll()loop. When Kafka prefetches records, this loop executes extremely rapidly, causing theStopwatchto introduce significant and unnecessary CPU overhead.The Solution:
Stopwatch: Replaced theStopwatchtimeout tracking with a lightweightSystem.currentTimeMillis()calculation to track theremainingTimeout.updateSuccessfulRpcMetricscalls and associated histograms (KafkaMetrics,KafkaSinkMetrics).fetch-latency-avg), so we can safely drop the custom implementation to regain performance.Changes made:
ReadFromKafkaDoFn.javaandKafkaUnboundedReader.javato useSystem.currentTimeMillis()for timeout tracking instead ofStopwatch.KafkaMetrics.javaandKafkaSinkMetrics.javaby removingRPC_LATENCYhistograms andupdateSuccessfulRpcMetrics.KafkaMetricsTest.javaandKafkaSinkMetricsTest.javato reflect the removed metrics.Testing Done
./gradlew :sdks:java:io:kafka:test).java.time.DurationandSystem.currentTimeMillis().Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.