Use when creating a new Python DataTransformer, defining input/output schemas, writing transformation logic, and adding tests.
Guided walkthrough for adding transformation logic to this project.
Ask the user:
Edit defineInputDatasets() in transformer.py:
def defineInputDatasets() -> List[Datastore]:
return [
Datastore(
"YourSourceStore",
datasets=[
Dataset(
"your_dataset",
schema=DDLTable(columns=[
DDLColumn("id", Integer(), NullableStatus.NOT_NULLABLE, PrimaryKeyStatus.PK),
# Add columns matching your source data
])
)
]
)
]
Edit defineOutputDatastore() in transformer.py:
def defineOutputDatastore() -> Datastore:
return Datastore(
"YourOutputStore",
datasets=[
Dataset(
"your_output",
schema=DDLTable(columns=[
DDLColumn("id", Integer(), NullableStatus.NOT_NULLABLE, PrimaryKeyStatus.PK),
# Add columns matching your output schema
])
)
]
)
Edit executeTransformer() in transformer.py. Key APIs:
def executeTransformer(connection: Connection, dataset_mapping: DataTransformerContext) -> Optional[Any]:
# Get input/output table names
input_mappings = dataset_mapping.getInputTableMappings()
output_table = dataset_mapping.getOutputTableNameForDataset("your_output")
# Get config (if using StaticConfigMap or EnvRefConfigMap)
config = dataset_mapping.getConfig()
# Get extra credentials (if defined in model)
api_key = dataset_mapping.getExtraCredential("your_credential")
# Your SQL/Python transformation
connection.execute(text(f'INSERT INTO "{output_table}" ...'))
Edit tests/test_transformer.py:
def test_your_transformation(self) -> None:
self.inject_data("your_dataset", [
{"id": 1, "column": "value"},
])
status = self.run_dt_job()
self.assertEqual(status, JobStatus.DONE)
output = self.get_output_data("your_output")
# Assert your transformation logic produced correct results
pytest tests/ -v
Checkpoint: All tests pass. If not, debug using IDE breakpoints on executeTransformer().
git add -A
git commit -m "feat: add <your transformer name>"
For more complex scenarios, see:
StaticConfigMap for config values, access via dataset_mapping.getConfig()__iud_type__ columnSee Advanced DT Patterns.