Guide on how to add and propagate new metadata fields in Apache Beam's WindowedValue, extending protos, windmill persistence, and runner interfaces to avoid metadata loss.
This skill provides a comprehensive guide on adding new metadata (e.g., CDC metadata, drain mode flags, OpenTelemetry trace context) to Apache Beam's WindowedValue and ensuring it propagates correctly through the execution engine. Failing to propagate metadata in all necessary places will result in metadata loss during pipeline execution.
When adding new metadata that must cross worker boundaries or be serialized by the Fn API, the proto definitions must be updated.
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.protoElementMetadata).The WindowedValue is the core container for elements flowing through a Beam pipeline. It holds the value, timestamp, windows, pane info, and any additional metadata.
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.javaYou must update all concrete implementations of WindowedValue to store and return the new metadata. If you miss one, metadata will be silently dropped.
ValueInGlobalWindowValueInSingleWindowValueInEmptyWindows (often used inside runners, like Dataflow's worker package)of()), fields in these classes and coders.context.outputWindowedValue(...) or WindowedValue.of(value, timestamp, windows, pane). This causes brittleness and breaks the API for every new metadata field.OutputBuilder (sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java) to accept the new metadata (e.g., .withDrainMode(...), .withTraceContext(...)). Use the builder pattern when constructing outputs to propagate offset and record IDs smoothly.For the Dataflow streaming runner, metadata must survive serialization to and from the Windmill backend.
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.javaWindowedValue, and add it to already created ElementMetadata proto builder.runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java and WindowingWindmillReader.javaWindowedValue using the updated factory methods/builders that include the metadata. This is incremental work, as plenty of metadata is already extracted from the proto.Metadata must be explicitly copied or forwarded whenever a WindowedValue is transformed, buffered, or processed.
You must ensure that when a DoFn processes an element and outputs a new element, the appropriate metadata from the input is propagated to the output (unless explicitly changed by the logic).
runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.javarunners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.javarunners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.javarunners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.javasdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.javaAction: When these runners call outputWindowedValue(), they should extract the metadata from the input or current context and attach it using the OutputBuilder or the new WindowedValue interfaces.
runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.javarunners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.javaCausedByDrain), it is correctly passed into the ReduceFnContextFactory and propagated when outputting the grouped results.runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.javasdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.javaIf metadata needs to survive timer firings (e.g., knowing an @OnTimer fired because of a system drain), it must be added to Timer data structures. This is a bit of uncharted area which was only implemented for CausedByDrain metadata that comes from backend, not from persisted metadata. In order to persist all WindowedValue metadata across timer, more work has to be done, below are some pointers:
runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java and implementations (e.g., WindmillTimerInternals.java in Dataflow).runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java (or generic TimerData).TimerData, next to CausedByDrain. Propagate it when setting the timer and expose it when the timer fires so it bubbles up.User needs to access the metadata in their DoFn (e.g., @ProcessElement public void process(ProcessContext c, CausedByDrain drain) { ... }), you must update the reflection and bytecode generation logic.
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.javasdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.javasdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.javasdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.javaWindowedValue or context and pass it as an argument during method invocation.beam_fn_api.proto (if applicable).WindowedValue interface.ValueInGlobalWindow, ValueInSingleWindow, ValueInEmptyWindows to store the metadata.OutputBuilder to accept the metadata.WindmillSink to serialize the metadata to the backend.UngroupedWindmillReader and WindowingWindmillReader to deserialize the metadata.WindmillKeyedWorkItem.SimpleDoFnRunner, StatefulDoFnRunner, and FnApiDoFnRunner to propagate the metadata from input to output.ReduceFnRunner and OutputAndTimeBoundedSplittableProcessElementInvoker for complex transform propagation.TimerData and TimerInternals.DoFnSignatures and ByteBuddyDoFnInvokerFactory.WindowedValue fields correctly in their specific operators/runners.