Skip to content

[Java] context.Output within try catch can catch errors from downstream transforms #39101

Description

@stankiewicz

Related to #20935

Replace with try{i = parse}catch{} if i not null output

  • HL7v2IO

@ProcessElement
public void processElement(ProcessContext context) {
String msgId = context.element();
try {
context.output(client.fetchMessage(msgId));
} catch (Exception e) {
context.output(HL7v2IO.Read.DEAD_LETTER, HealthcareIOError.of(msgId, e));
}
}
}
}
}

@ProcessElement
public void processElement(ProcessContext context) {
String msgId = context.element().getHl7v2MessageId();
try {
HL7v2ReadResponse response =
HL7v2ReadResponse.of(context.element().getMetadata(), client.fetchMessage(msgId));
context.output(response);
} catch (Exception e) {
HealthcareIOError<HL7v2ReadParameter> error =
HealthcareIOError.of(context.element(), e);
context.output(HL7v2IO.HL7v2Read.DEAD_LETTER, error);
}

  • FhirIO

@ProcessElement
public void processElement(ProcessContext context) {
String resourceId = context.element();
try {
context.output(fetchResource(this.client, resourceId));
} catch (Exception e) {
READ_RESOURCE_ERRORS.inc();
LOG.warn(
"Error fetching Fhir resource with ID {} writing to Dead Letter Queue. ",
resourceId,
e);
context.output(FhirIO.Read.DEAD_LETTER, HealthcareIOError.of(resourceId, e));
}
}

  • KafkaWriteSchemaTransformProvider

    @ProcessElement
    public void process(@DoFn.Element Row row, MultiOutputReceiver receiver) {
    KV<byte[], T> output = null;
    try {
    output = KV.of(new byte[1], conversionFn.apply(row));
    } catch (Exception e) {
    if (!handleErrors) {
    throw new RuntimeException(e);
    }
    errorsInBundle += 1;
    LOG.warn("Error while processing the element", e);
    receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, row, e));
    }
    if (output != null) {
    receiver.get(successTag).output(output);
    }
    }

  • BigQueryIO

try {
if (reader.start()) {
outputReceiver.get(rowTag).output(reader.getCurrent());
} else {
return;
}
} catch (ParseException e) {
GenericRecord record = errorHandlingParseFn.getSchemaAndRecord().getRecord();
badRecordRouter.route(
outputReceiver,
record,
AvroCoder.of(record.getSchema()),
(Exception) e.getCause(),
"Unable to parse record reading from BigQuery");
}
while (true) {
try {
if (reader.advance()) {
outputReceiver.get(rowTag).output(reader.getCurrent());
} else {
return;
}
} catch (ParseException e) {
GenericRecord record = errorHandlingParseFn.getSchemaAndRecord().getRecord();
badRecordRouter.route(
outputReceiver,
record,
AvroCoder.of(record.getSchema()),
(Exception) e.getCause(),
"Unable to parse record reading from BigQuery");
}
}
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions