-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Handle some cases during infer schema from dataclass #37855
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ed6e34b
49fa4dc
57214f5
283509a
5dbfa9d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -176,8 +176,50 @@ def match_is_named_tuple(user_type): | |
| hasattr(user_type, '__annotations__') and hasattr(user_type, '_fields')) | ||
|
|
||
|
|
||
| def match_is_dataclass(user_type): | ||
| return dataclasses.is_dataclass(user_type) and isinstance(user_type, type) | ||
| def match_dataclass_for_row(user_type): | ||
| """Match whether the type is a dataclass handled by row coder. | ||
|
|
||
| For frozen dataclasses, only true when explicitly registered with row coder: | ||
|
|
||
| beam.coders.typecoders.registry.register_coder( | ||
| MyDataClass, beam.coders.RowCoder) | ||
|
|
||
| (for backward-compatibility reason). | ||
|
|
||
| For non-frozen dataclasses, default to true otherwise explicitly registered | ||
| with a coder other than the row coder. | ||
| """ | ||
|
|
||
| if not dataclasses.is_dataclass(user_type): | ||
| return False | ||
|
|
||
| # pylint: disable=wrong-import-position | ||
| try: | ||
| from apache_beam.options.pipeline_options_context import get_pipeline_options # pylint: disable=line-too-long | ||
| except AttributeError: | ||
| pass | ||
| else: | ||
| opts = get_pipeline_options() | ||
| if opts and opts.is_compat_version_prior_to("2.73.0"): | ||
Abacn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return False | ||
|
|
||
| is_frozen = user_type.__dataclass_params__.frozen | ||
| # avoid circular import | ||
| try: | ||
| from apache_beam.coders.typecoders import registry as coders_registry | ||
| from apache_beam.coders import RowCoder | ||
| except AttributeError: | ||
| # coder registery not yet initialized so it must be absent | ||
| return not is_frozen | ||
|
|
||
| if is_frozen: | ||
| return ( | ||
| user_type in coders_registry._coders and | ||
| coders_registry._coders[user_type] == RowCoder) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this still necessary if we are already doing the is_compat_version_prior_to check? I guess if we do not want to change the users type from dataclass -> named tuple (unless explicitly using row coder) then this check makes sense. But not necessarily for upgrade compatibility.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
yes and this affected some internal tests (a fraction of the targets listed in b/492300593#comment4 internally) |
||
| else: | ||
| return ( | ||
| user_type not in coders_registry._coders or | ||
| coders_registry._coders[user_type] == RowCoder) | ||
|
|
||
|
|
||
| def _match_is_optional(user_type): | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part fixes
#1-reassure backward compatibility of default coder for frozen dataclassThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wasn't clear to me why default coders were affected at all.
So to be clear, coders are only changed when schema_from_element_type is called e.g. for union types (if it can be normalized
beam/sdks/python/apache_beam/typehints/typehints.py
Line 612 in 153875e
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it affects UnionHint, but even for types incompatible for union schema, e.g.
(SomeDataclass | None)orOptionalHint, default coder for the former still gets changed because "named_fields_from_element_type" (calls schema_from_element_type) get evaluated before checking schema compatibility. This made blasting radius fairly large.https://github.com/apache/beam/pull/37855/changes/BASE..ed6e34ba22f5fb69063b687688edd48f14ccdee1#diff-d31b9184f7423473c4e6deda80b237aa474228143a0bab2faedd9afe2e944982L661