Guides understanding and working with Apache Beam runners (Direct, Dataflow, Flink, Spark, etc.). Use when configuring pipelines for different execution environments or debugging runner-specific issues.
Runners execute Beam pipelines on distributed processing backends. Each runner translates the portable Beam model to its native execution engine.
| Runner | Location | Description |
|---|---|---|
| Direct | runners/direct-java/ | Local execution for testing |
| Prism | runners/prism/ | Portable local runner |
| Dataflow | runners/google-cloud-dataflow-java/ | Google Cloud Dataflow |
| Flink | runners/flink/ | Apache Flink |
| Spark | runners/spark/ | Apache Spark |
| Samza | runners/samza/ |
| Apache Samza |
| Jet | runners/jet/ | Hazelcast Jet |
| Twister2 | runners/twister2/ | Twister2 |
For local development and testing.
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(options);
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner'
p = beam.Pipeline(options=options)
--runner=DirectRunner
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("my-project");
options.setRegion("us-central1");
options.setTempLocation("gs://my-bucket/temp");
options = PipelineOptions([
'--runner=DataflowRunner',
'--project=my-project',
'--region=us-central1',
'--temp_location=gs://my-bucket/temp'
])
--experiments=use_runner_v2
--sdkContainerImage=gcr.io/project/beam_java11_sdk:custom
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("[local]");
options.setFlinkMaster("host:port");
options = PipelineOptions([
'--runner=FlinkRunner',
'--flink_master=host:port',
'--environment_type=LOOPBACK' # or DOCKER, EXTERNAL
])
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("local[*]"); # or spark://host:port
options = PipelineOptions([
'--runner=SparkRunner',
'--spark_master_url=local[*]'
])
Tests that validate runner correctness:
# Direct Runner
./gradlew :runners:direct-java:validatesRunner
# Flink Runner
./gradlew :runners:flink:1.18:validatesRunner
# Spark Runner
./gradlew :runners:spark:3:validatesRunner
# Dataflow Runner
./gradlew :runners:google-cloud-dataflow-java:validatesRunner
@Rule public TestPipeline pipeline = TestPipeline.create();
// Set runner via system property
-DbeamTestPipelineOptions='["--runner=TestDataflowRunner"]'
DOCKER - SDK in Docker containerLOOPBACK - SDK in same process (testing)EXTERNAL - SDK at specified addressPROCESS - SDK in subprocessStart Flink job server:
./gradlew :runners:flink:1.18:job-server:runShadow
Start Spark job server:
./gradlew :runners:spark:3:job-server:runShadow
| Option | Description |
|---|---|
--project | GCP project |
--region | GCP region |
--tempLocation | GCS temp location |
--stagingLocation | GCS staging |
--numWorkers | Initial workers |
--maxNumWorkers | Max workers |
--workerMachineType | VM type |
| Option | Description |
|---|---|
--flinkMaster | Flink master address |
--parallelism | Default parallelism |
--checkpointingInterval | Checkpoint interval |
| Option | Description |
|---|---|
--sparkMaster | Spark master URL |
--sparkConf | Additional Spark config |
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar
./gradlew :runners:flink:1.18:job-server:shadowJar
./gradlew :runners:spark:3:job-server:shadowJar
-Dorg.slf4j.simpleLogger.defaultLogLevel=debug--targetParallelism=1 for deterministic execution--experiments=upload_graph for graph debugging