Guides development and usage of I/O connectors in Apache Beam. Use when working with I/O connectors, creating new connectors, or debugging data source/sink issues.
I/O connectors enable reading from and writing to external data sources. Beam provides 51+ Java I/O connectors and several Python connectors.
sdks/java/io/
| Category | Connectors |
|---|---|
| Cloud Storage | google-cloud-platform (BigQuery, Bigtable, Spanner, Pub/Sub, GCS), amazon-web-services2, azure, azure-cosmos |
| Databases | jdbc, mongodb, cassandra, hbase, redis, neo4j, clickhouse, influxdb, singlestore, elasticsearch |
| Messaging | kafka, pulsar, rabbitmq, amqp, jms, mqtt, solace |
| File Formats | parquet, csv, json, xml, thrift, iceberg |
| Other | snowflake, splunk, cdap, debezium, hadoop-format, kudu, solr, tika |
./gradlew :sdks:java:io:kafka:test
./gradlew :sdks:java:io:jdbc:test
./gradlew :sdks:java:io:google-cloud-platform:integrationTest
./gradlew :sdks:java:io:google-cloud-platform:integrationTest \
-PgcpProject=<project> \
-PgcpTempRoot=gs://<bucket>/path
./gradlew :sdks:java:io:jdbc:integrationTest \
-DbeamTestPipelineOptions='["--runner=TestDirectRunner"]'
Located at it/ directory:
it/common/ - Common test utilitiesit/google-cloud-platform/ - GCP-specific test infrastructureit/jdbc/ - JDBC test infrastructureit/kafka/ - Kafka test infrastructureit/testcontainers/ - Testcontainers support@RunWith(JUnit4.class)
public class MyIOIT {
@Rule public TestPipeline readPipeline = TestPipeline.create();
@Rule public TestPipeline writePipeline = TestPipeline.create();
@Test
public void testWriteAndRead() {
// Write data
writePipeline.apply(Create.of(testData))
.apply(MyIO.write().to(destination));
writePipeline.run().waitUntilFinish();
// Read and verify
PCollection<String> results = readPipeline.apply(MyIO.read().from(destination));
PAssert.that(results).containsInAnyOrder(expectedData);
readPipeline.run().waitUntilFinish();
}
}
@Rule public TestPipeline pipeline = TestPipeline.create();
TestPipeline:
beamTestPipelineOptions system property// Read
pipeline.apply(BigQueryIO.readTableRows().from("project:dataset.table"));
// Write
data.apply(BigQueryIO.writeTableRows()
.to("project:dataset.table")
.withSchema(schema)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// Read
pipeline.apply(PubsubIO.readStrings().fromTopic("projects/project/topics/topic"));
// Write
data.apply(PubsubIO.writeStrings().to("projects/project/topics/topic"));
// Read
pipeline.apply(TextIO.read().from("gs://bucket/path/*.txt"));
// Write
data.apply(TextIO.write().to("gs://bucket/output").withSuffix(".txt"));
// Read
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class));
// Write
data.apply(KafkaIO.<String, String>write()
.withBootstrapServers("localhost:9092")
.withTopic("topic")
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class));
// Read
pipeline.apply(JdbcIO.<Row>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create("org.postgresql.Driver", "jdbc:postgresql://host/db"))
.withQuery("SELECT * FROM table"));
// Write
data.apply(JdbcIO.<Row>write()
.withDataSourceConfiguration(config)
.withStatement("INSERT INTO table VALUES (?, ?)"));
sdks/python/apache_beam/io/
textio - Text filesfileio - General file operationsavroio - Avro filesparquetio - Parquet filesgcp/ - GCP connectors (BigQuery, Pub/Sub, Datastore, etc.)Beam supports using I/O connectors from one SDK in another via the expansion service.
# Start Java expansion service
./gradlew :sdks:java:io:expansion-service:runExpansionService
Key components: