Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ dist/
.kiro/

/examples/build/*
/examples/*.zip
/examples/*.zip

.env
24 changes: 23 additions & 1 deletion examples/examples-catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,28 @@
"ApplicationLogLevel": "DEBUG",
"LogFormat": "JSON"
}
}
},
{
"name": "Map with Item Namer",
"description": "Map operation with custom item_namer for iteration naming",
"handler": "map_with_item_namer.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/map/map_with_item_namer.py"
},
{
"name": "Parallel with Named Branches",
"description": "Parallel operation with named branches using ParallelBranch",
"handler": "parallel_with_named_branches.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/parallel/parallel_with_named_branches.py"
}
]
}
30 changes: 30 additions & 0 deletions examples/src/map/map_with_item_namer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Example demonstrating map operations with custom iteration naming."""

from typing import Any

from aws_durable_execution_sdk_python.config import MapConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> list[str]:
"""Process orders using context.map() with custom iteration names."""
orders = [
{"id": "order-101", "amount": 25},
{"id": "order-102", "amount": 50},
{"id": "order-103", "amount": 75},
]

return context.map(
inputs=orders,
func=lambda ctx, order, index, _: ctx.step(
lambda _: f"processed-{order['id']}-${order['amount']}",
name=f"process_{order['id']}",
),
name="process_orders",
config=MapConfig(
max_concurrency=2,
item_namer=lambda order, index: f"order-{order['id']}",
),
).get_results()
51 changes: 51 additions & 0 deletions examples/src/parallel/parallel_with_named_branches.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Example demonstrating all parallel branch patterns."""

from typing import Any

from aws_durable_execution_sdk_python.config import ParallelBranch, ParallelConfig
from aws_durable_execution_sdk_python.context import (
DurableContext,
durable_parallel_branch,
)
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_parallel_branch(name="fetch-orders")
def fetch_orders(ctx: DurableContext) -> str:
return ctx.step(lambda _: "orders-loaded", name="load_orders")


@durable_parallel_branch()
def fetch_preferences(ctx: DurableContext) -> str:
return ctx.step(lambda _: "prefs-loaded", name="load_prefs")


@durable_execution
def handler(_event: Any, context: DurableContext) -> list[str]:
"""Execute parallel branches using all supported patterns."""

return context.parallel(
functions=[
# 1. Named parallel branch with ParallelBranch
ParallelBranch(
Comment thread
wangyb-A marked this conversation as resolved.
func=lambda ctx: ctx.step(
lambda _: "user-data-loaded", name="load_user"
),
name="fetch-user-data",
),
# 2. Named parallel branch with decorator
fetch_orders(),
# 3. Unnamed parallel branch with decorator
fetch_preferences(),
# 4. Unnamed parallel branch with ParallelBranch
ParallelBranch(
func=lambda ctx: ctx.step(
lambda _: "metrics-loaded", name="load_metrics"
),
),
# 5. No wrapper, just a raw callable
lambda ctx: ctx.step(lambda _: "config-loaded", name="load_config"),
],
name="load_all_data",
config=ParallelConfig(max_concurrency=3),
).get_results()
36 changes: 36 additions & 0 deletions examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,42 @@
"ExecutionTimeout": 300
}
}
},
"MapWithItemNamer": {
"Type": "AWS::Serverless::Function",
"Properties": {
"CodeUri": "build/",
"Handler": "map_with_item_namer.handler",
"Description": "Map operation with custom item_namer for iteration naming",
"Role": {
"Fn::GetAtt": [
"DurableFunctionRole",
"Arn"
]
},
"DurableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
}
}
},
"ParallelWithNamedBranches": {
"Type": "AWS::Serverless::Function",
"Properties": {
"CodeUri": "build/",
"Handler": "parallel_with_named_branches.handler",
"Description": "Parallel operation with named branches using ParallelBranch",
"Role": {
"Fn::GetAtt": [
"DurableFunctionRole",
"Arn"
]
},
"DurableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
}
}
}
}
}
39 changes: 39 additions & 0 deletions examples/test/map/test_map_with_item_namer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Tests for map_with_item_namer example."""

import pytest
from src.map import map_with_item_namer
from test.conftest import deserialize_operation_payload

from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import (
OperationStatus,
)


@pytest.mark.example
@pytest.mark.durable_execution(
handler=map_with_item_namer.handler,
lambda_function_name="map with item namer",
)
def test_map_with_item_namer(durable_runner):
"""Test map example with custom item_namer for iteration naming."""
with durable_runner:
result = durable_runner.run(input="test", timeout=10)

assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == [
"processed-order-101-$25",
"processed-order-102-$50",
"processed-order-103-$75",
]

# Get the map operation
map_op = result.get_context("process_orders")
assert map_op is not None
assert map_op.status is OperationStatus.SUCCEEDED

# Verify custom iteration names from item_namer
assert len(map_op.child_operations) == 3
child_names = {op.name for op in map_op.child_operations}
expected_names = {"order-order-101", "order-order-102", "order-order-103"}
assert child_names == expected_names
57 changes: 57 additions & 0 deletions examples/test/parallel/test_parallel_with_named_branches.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Tests for parallel_with_named_branches example."""

import pytest
from src.parallel import parallel_with_named_branches
from test.conftest import deserialize_operation_payload

from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import (
OperationStatus,
OperationType,
)


@pytest.mark.example
@pytest.mark.durable_execution(
handler=parallel_with_named_branches.handler,
lambda_function_name="parallel with named branches",
)
def test_parallel_with_named_branches(durable_runner):
"""Test parallel example with all branch patterns."""
with durable_runner:
result = durable_runner.run(input="test", timeout=10)

assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == [
"user-data-loaded",
"orders-loaded",
"prefs-loaded",
"metrics-loaded",
"config-loaded",
]

# Get the parallel operation
parallel_op = result.get_context("load_all_data")
assert parallel_op is not None
assert parallel_op.status is OperationStatus.SUCCEEDED

# Verify branch names: named branches have custom names, unnamed use defaults
assert len(parallel_op.child_operations) == 5

child_names = [op.name for op in parallel_op.child_operations]

# 1. Named ParallelBranch
assert child_names[0] == "fetch-user-data"
# 2. Named decorator
assert child_names[1] == "fetch-orders"
# 3. Unnamed decorator (None name falls back to index-based default)
assert child_names[2] == "parallel-branch-2"
# 4. Unnamed ParallelBranch (None name falls back to index-based default)
assert child_names[3] == "parallel-branch-3"
# 5. Raw callable (no ParallelBranch wrapper, index-based default)
assert child_names[4] == "parallel-branch-4"

# Verify all children succeeded
for child in parallel_op.child_operations:
assert child.operation_type == OperationType.CONTEXT
assert child.status is OperationStatus.SUCCEEDED
5 changes: 5 additions & 0 deletions src/aws_durable_execution_sdk_python/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
# Helper decorators - commonly used for step functions
# Concurrency
from aws_durable_execution_sdk_python.concurrency.models import BatchResult
from aws_durable_execution_sdk_python.config import ParallelBranch
from aws_durable_execution_sdk_python.context import (
DurableContext,
durable_parallel_branch,
durable_step,
durable_wait_for_callback,
durable_with_child_context,
Expand All @@ -27,15 +29,18 @@
# Essential context types - passed to user functions
from aws_durable_execution_sdk_python.types import StepContext


__all__ = [
"BatchResult",
"DurableContext",
"DurableExecutionsError",
"InvocationError",
"ParallelBranch",
"StepContext",
"ValidationError",
"__version__",
"durable_execution",
"durable_parallel_branch",
"durable_step",
"durable_wait_for_callback",
"durable_with_child_context",
Expand Down
10 changes: 9 additions & 1 deletion src/aws_durable_execution_sdk_python/concurrency/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ def execute_item(
"""Execute a single executable in a child context and return the result."""
raise NotImplementedError

def get_iteration_name(self, index: int) -> str:
"""Get the display name for an iteration/branch at the given index.

Subclasses can override this to provide custom naming (e.g., from item_namer
or branch names). The default returns "{name_prefix}{index}".
"""
return f"{self.name_prefix}{index}"

def execute(
self, execution_state: ExecutionState, executor_context: DurableContext
) -> BatchResult[ResultType]:
Expand Down Expand Up @@ -410,7 +418,7 @@ def _execute_item_in_child_context(
operation_id: str = executor_context._create_step_id_for_logical_step( # noqa: SLF001
executable.index
)
name: str = f"{self.name_prefix}{executable.index}"
name: str = self.get_iteration_name(executable.index)
is_virtual: bool = self.nesting_type is NestingType.FLAT

child_context: DurableContext = executor_context.create_child_context(
Expand Down
Loading