Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.transforms.BatchElements;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand Down Expand Up @@ -82,6 +80,8 @@ public abstract static class Invoke<InputT extends BaseInput, OutputT extends Ba

abstract @Nullable BaseModelParameters parameters();

abstract BatchElements.@Nullable BatchConfig batchConfig();

abstract Builder<InputT, OutputT> builder();

@AutoValue.Builder
Expand All @@ -92,6 +92,8 @@ abstract Builder<InputT, OutputT> setHandler(

abstract Builder<InputT, OutputT> setParameters(BaseModelParameters modelParameters);

abstract Builder<InputT, OutputT> setBatchConfig(BatchElements.BatchConfig batchConfig);

abstract Invoke<InputT, OutputT> build();
}

Expand All @@ -106,21 +108,26 @@ public Invoke<InputT, OutputT> withParameters(BaseModelParameters modelParameter
return builder().setParameters(modelParameters).build();
}

/** Configures the batching behavior for the inputs. */
public Invoke<InputT, OutputT> withBatchConfig(BatchElements.BatchConfig batchConfig) {
return builder().setBatchConfig(batchConfig).build();
}

@Override
public PCollection<Iterable<PredictionResult<InputT, OutputT>>> expand(
PCollection<InputT> input) {
checkArgument(handler() != null, "handler() is required");
checkArgument(parameters() != null, "withParameters() is required");
return input
.apply(
"WrapInputInList",
MapElements.via(
new SimpleFunction<InputT, List<InputT>>() {
@Override
public List<InputT> apply(InputT element) {
return Collections.singletonList(element);
}
}))

BatchElements.BatchConfig config = batchConfig();
PCollection<List<InputT>> batchedInput;
if (config != null) {
batchedInput = input.apply("BatchElements", BatchElements.withConfig(config));
} else {
batchedInput = input.apply("BatchElements", BatchElements.withDefaults());
}

return batchedInput
// Pass the list to the inference function
.apply("RemoteInference", ParDo.of(new RemoteInferenceFn<InputT, OutputT>(this)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,36 +518,54 @@ public void testPredictionResultMapping() {
pipeline.run().waitUntilFinish();
}

// Temporary behaviour until we introduce java BatchElements transform
// to batch elements in RemoteInference
@Test
public void testMultipleInputsProduceSeparateBatches() {
List<TestInput> inputs = Arrays.asList(new TestInput("input1"), new TestInput("input2"));
private static class GenerateInputsFn
extends org.apache.beam.sdk.transforms.DoFn<Integer, TestInput> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new TestInput("input1"));
c.output(new TestInput("input2"));
}
}

@Test
public void testBatchingProducesCombinedBatches() {
TestParameters params = TestParameters.builder().setConfig("test-config").build();

// Use a single element to trigger generation of inputs within the same bundle,
// ensuring DirectRunner doesn't split them before BatchElements processes them.
PCollection<TestInput> inputCollection =
pipeline.apply(
"CreateInputs", Create.of(inputs).withCoder(SerializableCoder.of(TestInput.class)));
pipeline
.apply("CreateTrigger", Create.of(1))
.apply(
"GenerateInputs", org.apache.beam.sdk.transforms.ParDo.of(new GenerateInputsFn()))
.setCoder(SerializableCoder.of(TestInput.class));

// Configure BatchElements to force a batch of exactly 2
org.apache.beam.sdk.transforms.BatchElements.BatchConfig batchConfig =
org.apache.beam.sdk.transforms.BatchElements.BatchConfig.builder()
.withMinBatchSize(2)
.withMaxBatchSize(2)
.build();

PCollection<Iterable<PredictionResult<TestInput, TestOutput>>> results =
inputCollection.apply(
"RemoteInference",
RemoteInference.<TestInput, TestOutput>invoke()
.handler(MockSuccessHandler.class)
.withBatchConfig(batchConfig)
.withParameters(params));

PAssert.that(results)
.satisfies(
batches -> {
int batchCount = 0;
int totalElements = 0;
for (Iterable<PredictionResult<TestInput, TestOutput>> batch : batches) {
batchCount++;
int elementCount = (int) StreamSupport.stream(batch.spliterator(), false).count();
// Each batch should contain exactly 1 element
assertEquals("Each batch should contain 1 element", 1, elementCount);
totalElements += (int) StreamSupport.stream(batch.spliterator(), false).count();
}
assertEquals("Expected 2 batches", 2, batchCount);
assertEquals("Expected 1 batch", 1, batchCount);
assertEquals("Total output elements should be 2", 2, totalElements);
Comment thread
jrmccluskey marked this conversation as resolved.
return null;
});

Expand Down
Loading