Migrate to Google Cloud Dataflow Client#37639
Migrate to Google Cloud Dataflow Client#37639jrmccluskey wants to merge 26 commits intoapache:masterfrom
Conversation
Summary of ChangesHello @jrmccluskey, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request initiates a significant internal refactoring of how Apache Beam's Python SDK interacts with the Google Cloud Dataflow API. The core change involves switching from an older, generated client library to the more modern and officially supported Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #37639 +/- ##
=============================================
+ Coverage 40.08% 56.54% +16.46%
Complexity 3416 3416
=============================================
Files 1178 1174 -4
Lines 187433 185225 -2208
Branches 3589 3588 -1
=============================================
+ Hits 75130 104739 +29609
+ Misses 108912 77096 -31816
+ Partials 3391 3390 -1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
assign set of reviewers |
|
Assigning reviewers: R: @shunping for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
Reminder, please take a look at this pr: @shunping |
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @damccorm for label python. Available commands:
|
|
Is this ready for review? I see you assigned reviewers, but also the description calls it out as a WIP - trying to figure out next steps |
|
/gemini review |
|
Warning Gemini encountered an error creating the review. You can try again by commenting |
damccorm
left a comment
There was a problem hiding this comment.
Thanks - just had some smaller comments
| distribution_update.translate_to_histogram(dist_update_proto.histogram) | ||
| # if isinstance(distribution_update, DataflowDistributionCounter): | ||
| # dist_update_proto.histogram = dataflow.Histogram() | ||
| # distribution_update.translate_to_histogram(dist_update_proto.histogram) |
There was a problem hiding this comment.
I'm a bit confused by this chunk and had 2 questions:
- Why don't we need the
to_split_intcall anymore? - Why is this piece now commented out?
| cy_combiners.CountCombineFn: ( | ||
| dataflow.CounterMetadata.KindValueValuesEnum.SUM, | ||
| MetricUpdateTranslators.translate_scalar_counter_int), | ||
| 'Sum', MetricUpdateTranslators.translate_scalar_counter_int), |
There was a problem hiding this comment.
Should we define our own enum for these types (Sum/Min/Distribution/etc...) to avoid hardcoded strings?
| metric_update.distribution['count'], distribution_update.count) | ||
|
|
||
|
|
||
| # def test_translate_distribution_using_dataflow_distribution_counter(self): |
There was a problem hiding this comment.
Related to above, is there a reason to keep this commented out?
| namespace = _get_match( | ||
| metric.name.context.additionalProperties, | ||
| lambda x: x.key == 'namespace').value | ||
| carried_namespace = metric.name.context['namespace'] |
There was a problem hiding this comment.
Does this still need to be in a try except? If yes, can we just check for existence of the property first instead?
| values_enum.JOB_STATE_RESOURCE_CLEANING_UP: PipelineState. | ||
| RESOURCE_CLEANING_UP, | ||
| values_enum.JOB_STATE_PAUSING: PipelineState.PAUSING, | ||
| values_enum.JOB_STATE_PAUSED: PipelineState.PAUSED, |
There was a problem hiding this comment.
Did you mean to remove these?
There was a problem hiding this comment.
The Python client doesn't have a concept of pausing/paused - https://github.com/googleapis/google-cloud-python/blob/cf50cea185f2ad4763a14f1f0c14ca50fc2fe418/packages/google-cloud-dataflow-client/google/cloud/dataflow_v1beta3/types/jobs.py#L104
Something we could probably comment out with a TODO to restore later
There was a problem hiding this comment.
I think we should not remove this functionality, this will keep this state from rendering correctly in the UI. We should be able to regenerate these clients before making this change
| state_update_callback(current_state) | ||
| _LOGGER.info('Job %s is in state %s', job_id, current_state) | ||
| last_job_state = current_state | ||
| if str(current_state) not in ('JOB_STATE_RUNNING'): |
There was a problem hiding this comment.
Same here - I think we need to keep the paused/pausing logic (this is being actively built out)
There was a problem hiding this comment.
same thing, this client doesn't have that concept yet
| duration_timedout_result = DataflowPipelineResult( | ||
| duration_timedout_runner.job, duration_timedout_runner, options) | ||
| result = duration_timedout_result.wait_until_finish(5000) | ||
| self.assertEqual(result, PipelineState.PAUSED) |
There was a problem hiding this comment.
Same as above, I think we want paused/pausing tests (here and below)
|
/gemini review |
|
Warning Gemini encountered an error creating the review. You can try again by commenting |
Migrate from the apitools generated Dataflow client to google-cloud-dataflow-client.
DO NOT MERGE - requires google3 changes before import
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.