Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions sdks/python/apache_beam/transforms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
if TYPE_CHECKING:
from apache_beam.runners.pipeline_context import PipelineContext

_LOGGER = logging.getLogger(__name__)

__all__ = [
'BatchElements',
'CoGroupByKey',
Expand Down Expand Up @@ -499,7 +501,7 @@ def get_secret_bytes(self) -> bytes:
request={"name": secret_version_path})
return response.payload.data
except api_exceptions.NotFound:
logging.info(
_LOGGER.info(
"Secret version %s not found. "
"Creating new secret and version.",
secret_version_path)
Expand Down Expand Up @@ -704,7 +706,7 @@ def expand(self, pcoll):
try:
coder = coder.as_deterministic_coder(self.label)
except ValueError:
logging.warning(
_LOGGER.warning(
'GroupByEncryptedKey %s: '
'The key coder is not deterministic. This may result in incorrect '
'pipeline output. This can be fixed by adding a type hint to the '
Expand Down Expand Up @@ -1025,7 +1027,7 @@ def finish_bundle(self):
self._batch = None
self._running_batch_size = 0
self._target_batch_size = self._batch_size_estimator.next_batch_size()
logging.info(
_LOGGER.info(
"BatchElements statistics: " + self._batch_size_estimator.stats())


Expand Down Expand Up @@ -1957,15 +1959,15 @@ def process(
log_line += ', pane_info=' + repr(pane_info)

if self.level == logging.DEBUG:
logging.debug(log_line)
_LOGGER.debug(log_line)
elif self.level == logging.INFO:
logging.info(log_line)
_LOGGER.info(log_line)
elif self.level == logging.WARNING:
logging.warning(log_line)
_LOGGER.warning(log_line)
elif self.level == logging.ERROR:
logging.error(log_line)
_LOGGER.error(log_line)
elif self.level == logging.CRITICAL:
logging.critical(log_line)
_LOGGER.critical(log_line)
else:
print(log_line)

Expand Down
23 changes: 23 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,29 @@ def to_row(element):
def expand_composite_transform(spec, scope):
spec = normalize_inputs_outputs(normalize_source_sink(spec))

original_transforms = spec['transforms']
has_explicit_io = any(
io in t for t in original_transforms for io in ('input', 'output'))

if not has_explicit_io:
new_transforms = []
for ix, transform in enumerate(original_transforms):
transform = dict(transform)
if ix == 0:
composite_input = spec.get('input', {})
if is_explicitly_empty(composite_input):
transform['input'] = composite_input
elif not is_empty(composite_input):
transform['input'] = {key: key for key in composite_input.keys()}
else:
transform['input'] = new_transforms[-1]['__uuid__']
new_transforms.append(transform)

if new_transforms:
spec = dict(spec, transforms=new_transforms)
if 'output' not in spec:
spec['output'] = {'__implicit_outputs__': new_transforms[-1]['__uuid__']}

inner_scope = Scope(
scope.root,
{
Expand Down
20 changes: 20 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_transform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,26 @@ def test_composite(self):
providers=TEST_PROVIDERS)
assert_that(result, equal_to([1, 4, 9, 1, 8, 27]))

def test_composite_implicit_input_chaining(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = p | beam.Create([1, 2, 3])
result = elements | YamlTransform(
'''
type: composite
transforms:
- type: PyMap
name: Square
config:
fn: "lambda x: x * x"
- type: PyMap
name: Increment
config:
fn: "lambda x: x + 1"
''',
providers=TEST_PROVIDERS)
assert_that(result, equal_to([2, 5, 10]))

def test_chain_with_input(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
Expand Down
Loading