Implement and extend PostHog Data warehouse import sources. Use when adding a new source under posthog/temporal/data_imports/sources, adding datasets/endpoints to an existing source, or adding incremental sync support, pagination, credentials validation, and source tests.
Use this skill when building or updating Data warehouse sources in posthog/temporal/data_imports/sources/.
Before coding, read:
posthog/temporal/data_imports/sources/source.templateposthog/temporal/data_imports/sources/README.mdsettings.py + transport logic (e.g. klaviyo, github). For dependent-resource fan-out (parent→child with type: "resolve"), also read posthog/temporal/data_imports/sources/common/rest_source/__init__.py and config_setup.py (e.g. process_parent_data_item, make_parent_key_name).For API-backed sources, use this split:
source.py: source registration, source form fields, schema list, credential validation, and pipeline handoff.settings.py: endpoint catalog, incremental fields, primary key and partition defaults.{source}.py: API client/auth, paginator, request params, row normalization, and SourceResponse.This keeps endpoint behavior declarative and easy to extend.
For REST sources that mix top-level and fan-out endpoints, keep endpoint metadata in settings.py and route in {source}.py with this priority:
Copy this and track progress:
Source implementation:
- [ ] Define source fields in `get_source_config`
- [ ] Implement credential validation
- [ ] Define schemas in `get_schemas`
- [ ] Add/confirm endpoint settings (`settings.py`)
- [ ] Implement transport and paginator (`{source}.py`)
- [ ] Return correct `SourceResponse` (keys, partitioning, sort mode)
- [ ] Add non-retryable auth/permission errors
- [ ] Add source tests
- [ ] Add transport tests
- [ ] Add icon in `frontend/public/services/`
- [ ] Run `pnpm run generate:source-configs`
- [ ] Run `pnpm run schema:build`
- [ ] For Beta: set `betaSource=True` in `SourceConfig`; omit `unreleasedSource` (or set `False`) when releasing.
@SourceRegistry.register.SimpleSource[GeneratedConfig] unless resumable/webhook behavior is required.table_format="delta" in endpoint resources.primary_keys for incremental merge safety; they are endpoint-specific (declare in settings.py, not always id).partition_mode="datetime" with stable datetime field when available.get_non_retryable_errors() for known permanent failures (401/403/invalid credentials).db_incremental_field_last_value.sort_mode="desc" only if the endpoint truly returns descending order and cannot return ascending.db_incremental_field_earliest_value is considered.created_at, dateCreated, firstSeen) over mutable fields (updated_at, lastSeen) when both exist.Before finalizing endpoint logic, verify these from docs (or reliable API examples):
{"data": [...]}).If behavior is not documented, keep parsing/merge logic conservative and add a code comment documenting the uncertainty.
posthog/temporal/data_imports/sources/<source>/api_inventory.md) so future endpoint additions stay consistent.Top-level endpoints are list/read endpoints that do not require parent-row expansion.
settings.py (path, primary_key, incremental_fields, partition_key, sort_mode).get_resource(...) style helper) and keep transport branches minimal.limit, required filters).Link headers — check both rel="next" and any results flag the API may use.update_request to avoid duplicate query params.429, transient 5xx).429; fall back to exponential backoff when unavailable.stop_after_attempt), and preserve clear terminal behavior:
Fan-out means iterating a parent resource (for example projects) and then querying child endpoints per parent (for example project issues).
Prefer dependent resources when you have a single parent→child. Use rest_api_resources with a parent resource and a child that declares type: "resolve" for the parent field (e.g. parent slug or id). The shared infra (rest_source/__init__.py, config_setup.process_parent_data_item) paginates the parent and calls the child per parent row. Add include_from_parent so child rows get parent fields; they are injected as _<parent>_<field> via make_parent_key_name.
Make fan-out declarative in endpoint config. Add a fan-out config object in settings.py (for example DependentEndpointConfig) with:
parent_nameresolve_paramresolve_fieldinclude_from_parentid -> project_id)Then route all single-hop fan-out endpoints through a shared helper (for example common/rest_source/fanout.py:build_dependent_resource) so callers do not reimplement parent/child config assembly.
Parent field rename mapping belongs in the helper. If a helper supports declarative renames, apply the map there. Callers should not branch on whether renames exist.
Use per-endpoint pagination/selectors through fan-out helper overrides. build_dependent_resource supports optional endpoint overrides so you can keep single-hop fan-out declarative even when parent and child have different response shapes/pagination contracts:
parent_endpoint_extra and child_endpoint_extra: pass endpoint-level paginator and data_selector (for wrapped payloads like {"items": [...]}).page_size_param: override default page-size query param (limit) for APIs that use a different name (for example page_size).This means you can often avoid custom iterators for single-hop fan-out even when parent and child paginate differently (e.g. Typeform forms page-number + responses cursor token).
Path pre-formatting: Child paths often have multiple placeholders (e.g. org and resource slug). process_parent_data_item only does str.format() with the resolved param. Pre-format any static placeholders with .replace() on the child path before passing to the resource config, so only the resolved placeholder remains and DLT does not raise KeyError.
When to keep a custom iterator: If fan-out requires two or more levels (e.g. parent → mid-level list → detail per mid-level), where an intermediate API call discovers values that become part of the URL, that cannot be expressed as a single parent→child in rest_api_resources. Implement a custom HTTP iterator for that endpoint only; reuse the same pagination/retry helpers as elsewhere.
Add at least two test modules:
tests/test_<source>_source.py:
source_typeget_source_config fields and labelsget_schemas outputsvalidate_credentials success/failuresource_for_pipeline argument plumbingtests/test_<source>.py:
rest_api_resources, pass rows with _<parent>_<field> keys to exercise parent-field injection and rename behaviorsettings.pyPrefer behavior tests over config-shape tests. Avoid brittle assertions on internal config dict structure unless they protect a known regression that cannot be asserted via output behavior.
Use parameterized tests for status codes and edge cases.
After changing source fields, run the generation commands from the checklist and targeted tests for the new source.
generate:source-configs after updating fields.get_non_retryable_errors.KeyError: pre-format static path placeholders (see Fan-out).Endpoint, ClientConfig, IncrementalConfig) to keep static checks precise.