Manage Unstract connectors - add, remove, or modify database, filesystem, and queue connectors. This skill handles backend code, JSON schemas, tests, logo fetching, and dependency management. Use when the user wants to create a new connector, delete an existing one, or modify connector behavior.
Manage Unstract connectors with full lifecycle support: add, remove, and modify operations for database, filesystem, and queue connector types.
Connectors live in /unstract/connectors/src/unstract/connectors/ with three categories:
| Type | Base Class | Directory | Mode |
|---|---|---|---|
| Database | UnstractDB | databases/ | ConnectorMode.DATABASE |
| Filesystem | UnstractFileSystem | filesystems/ | ConnectorMode.FILE_SYSTEM |
| Queue | UnstractQueue | queues/ | ConnectorMode.MANUAL_REVIEW |
Each connector follows this structure:
connector_name/
├── __init__.py # Metadata dict with is_active flag
├── connector_name.py # Main connector class
├── constants.py # Optional constants
└── static/
├── json_schema.json # Configuration UI schema
└── settings.yaml # Optional settings
| File | Purpose |
|---|---|
base.py | Root UnstractConnector abstract class |
connectorkit.py | Singleton registry for all connectors |
databases/unstract_db.py | Database connector base class |
filesystems/unstract_file_system.py | Filesystem connector base class |
queues/unstract_queue.py | Queue connector base class |
databases/register.py | Auto-discovery for database connectors |
filesystems/register.py | Auto-discovery for filesystem connectors |
Ask the user for:
Use web search to discover:
s3fs for S3, adlfs for Azure, gcsfs for GCS, boxfs for Box). Always prefer wrapping an existing provider over building a custom one.Document findings before proceeding. If no fsspec provider exists, note that a custom AbstractFileSystem subclass will be needed (see Step 5b).
Create a unique connector ID using this pattern:
f"{short_name}|{uuid4()}"
# Example: "redis|a1b2c3d4-e5f6-7890-abcd-ef1234567890"
# For database connector
mkdir -p unstract/connectors/src/unstract/connectors/databases/{connector_name}/static
# For filesystem connector
mkdir -p unstract/connectors/src/unstract/connectors/filesystems/{connector_name}/static
# For queue connector
mkdir -p unstract/connectors/src/unstract/connectors/queues/{connector_name}/static
__init__.pyfrom .{connector_name} import {ClassName}
metadata = {
"name": {ClassName}.__name__,
"version": "1.0.0",
"connector": {ClassName},
"description": "{Description of the connector}",
"is_active": True,
}
Use the appropriate template from assets/templates/:
database.py.template for database connectorsfilesystem.py.template for filesystem connectorsqueue.py.template for queue connectorsRead the template and adapt it for the specific service. Key methods to implement:
For Database Connectors:
get_engine() → Return database connectionsql_to_db_mapping() → Map Python types to DB typesexecute() → Execute queries (inherited, may override)For Filesystem Connectors:
There are two approaches depending on whether an fsspec provider exists:
Approach 1: Wrapping an existing fsspec provider (PREFERRED)
When a library like s3fs, adlfs, gcsfs, boxfs exists, the connector only extends UnstractFileSystem:
get_fsspec_fs() → Return the provider's filesystem instance (e.g., S3FileSystem(...))test_credentials() → Verify connection worksextract_metadata_file_hash() → Extract file hash from metadatais_dir_by_metadata() → Check if path is directoryextract_modified_date() → Extract last modified datels, walk, info, etc. — the provider handles all file operations.minio.py for reference.Approach 2: Custom AbstractFileSystem (ONLY when no provider exists)
Create a custom AbstractFileSystem subclass with ONLY these core methods:
ls(path, detail=True) — List directory contents (REQUIRED)info(path) — Get single item metadata (optimization, optional)_open(path, mode) — Open a file for readingcat_file(path) — Read file contentspipe_file(path, value) — Write bytes (NOT write_bytes — that's an alias)rm(path) — Delete a filemkdir(path) — Create directoryCRITICAL: NEVER override listdir(), stat(), exists(), isdir(), isfile(), delete(), read_bytes(), or write_bytes(). These are provided by AbstractFileSystem and delegate to ls()/info()/pipe_file().
walk() exception: Override walk() ONLY if the filesystem has path normalization that differs from _strip_protocol (e.g., SharePoint maps "" to "root"), or if the service raises exceptions that aren't FileNotFoundError/OSError (the only types the base catches). If you override walk(), you MUST support the detail kwarg (return dicts when True, lists when False) and the on_error callback.
sharepoint.py for reference.For Queue Connectors:
get_engine() → Return queue connectionenqueue() → Add message to queuedequeue() → Get message from queuepeek() → View next message without removingGenerate static/json_schema.json based on researched auth modes:
{
"title": "{Connector Display Name}",
"type": "object",
"allOf": [
{
"required": ["connectorName"],
"properties": {
"connectorName": {
"type": "string",
"title": "Name of the connector"
}
}
},
{
"oneOf": [
// Add each auth mode as a separate option
]
}
]
}
Use "format": "password" for sensitive fields.
Execute the logo fetch script:
python .claude/skills/connector-ops/scripts/fetch_logo.py "{service_name}" "{output_path}"
The script tries these sources in order:
Place logo at: /frontend/public/icons/connector-icons/{ConnectorName}.png
Research required Python packages and add to pyproject.toml:
# Read current dependencies
cat unstract/connectors/pyproject.toml
# Add new dependency in the [project.dependencies] section
# Follow existing version pinning patterns (e.g., ~=, ==, >=)
Generate both mock-based and integration tests in unstract/connectors/tests/:
import unittest
from unittest.mock import patch, Mock
class Test{ClassName}(unittest.TestCase):
def setUp(self):
self.config = {
# Test configuration
}
@patch("{module_path}.{connection_method}")
def test_connection_params(self, mock_connect):
mock_connect.return_value = Mock()
connector = {ClassName}(self.config)
# Assertions
import os
import unittest
class Test{ClassName}Integration(unittest.TestCase):
@unittest.skipUnless(
os.getenv("{CONNECTOR}_HOST"),
"Integration test requires {CONNECTOR}_* environment variables"
)
def test_real_connection(self):
config = {
"host": os.getenv("{CONNECTOR}_HOST"),
# ... other env vars
}
connector = {ClassName}(config)
self.assertTrue(connector.test_credentials())
Run verification in order:
# 1. Syntax/type check
cd unstract/connectors && python -m py_compile src/unstract/connectors/{type}/{name}/{name}.py
# 2. Run mock tests
cd unstract/connectors && python -m pytest tests/{type}/test_{name}.py -v
# 3. Run full connector test suite
cd unstract/connectors && python -m pytest tests/ -v --ignore=tests/{type}/test_{name}_integration.py
Provide summary:
## Connector Added: {Name}
**Files created:**
- `src/unstract/connectors/{type}/{name}/__init__.py`
- `src/unstract/connectors/{type}/{name}/{name}.py`
- `src/unstract/connectors/{type}/{name}/static/json_schema.json`
- `tests/{type}/test_{name}.py`
- `tests/{type}/test_{name}_integration.py`
- `frontend/public/icons/connector-icons/{Name}.png`
**Dependencies added:**
- `{package}~={version}`
**Verification:**
- Syntax check: PASSED
- Mock tests: PASSED (X tests)
- Test suite: PASSED
**To run integration tests:**
```bash
export {CONNECTOR}_HOST=your_host
export {CONNECTOR}_USER=your_user
export {CONNECTOR}_PASSWORD=your_password
cd unstract/connectors && python -m pytest tests/{type}/test_{name}_integration.py -v
---
## Operation: REMOVE Connector
### Step 1: Identify Connector
Locate the connector by name or ID:
```bash
# Search for connector
grep -r "class {Name}" unstract/connectors/src/
Search for usages across the codebase:
grep -r "{connector_id}" --include="*.py" .
grep -r "from.*{connector_name}" --include="*.py" .
Warn user if connector is referenced elsewhere.
# Remove connector directory
rm -rf unstract/connectors/src/unstract/connectors/{type}/{name}/
# Remove tests
rm -f unstract/connectors/tests/{type}/test_{name}*.py
# Remove icon
rm -f frontend/public/icons/connector-icons/{Name}.png
If the removed connector was the only user of a dependency, offer to remove it from pyproject.toml.
# Ensure no import errors
cd unstract/connectors && python -c "from unstract.connectors.connectorkit import Connectorkit; Connectorkit()"
# Run test suite
cd unstract/connectors && python -m pytest tests/ -v
## Connector Removed: {Name}
**Files deleted:**
- `src/unstract/connectors/{type}/{name}/` (directory)
- `tests/{type}/test_{name}.py`
- `tests/{type}/test_{name}_integration.py`
- `frontend/public/icons/connector-icons/{Name}.png`
**Verification:**
- Import check: PASSED
- Test suite: PASSED
Ask user what modification is needed:
# Find connector files
find unstract/connectors -name "*{connector_name}*" -type f
Based on modification type:
Adding configuration field:
static/json_schema.json with new field__init__ to read new fieldAdding authentication mode:
oneOf option in JSON schemaNote: For connectors that support both OAuth and Client Credentials (like SharePoint/OneDrive), use the Dual Authentication pattern documented in references/connector_patterns.md. This pattern uses an auth_type field with JSON Schema dependencies to conditionally show/hide credential fields based on the selected authentication method.
Bug fix:
Dependency update:
pyproject.tomlEnsure tests cover the modification:
# Type check modified files
cd unstract/connectors && python -m py_compile src/unstract/connectors/{type}/{name}/{name}.py
# Run connector tests
cd unstract/connectors && python -m pytest tests/{type}/test_{name}*.py -v
# Run full suite
cd unstract/connectors && python -m pytest tests/ -v
## Connector Modified: {Name}
**Changes:**
- {Description of each change}
**Files modified:**
- {List of modified files}
**Tests:**
- Added: {count} new test(s)
- Modified: {count} existing test(s)
**Verification:**
- Syntax check: PASSED
- Connector tests: PASSED
- Full suite: PASSED
Consult these files for detailed patterns:
references/connector_patterns.md - Common patterns and anti-patternsreferences/json_schema_examples.md - JSON schema examples for all auth typesreferences/test_patterns.md - Test patterns and fixturesUse these templates as starting points:
assets/templates/database.py.templateassets/templates/filesystem.py.templateassets/templates/queue.py.templateassets/templates/init.py.templateassets/templates/json_schema_template.jsonassets/templates/test_mock.py.templateassets/templates/test_integration.py.templateFork Safety: For connectors using Google APIs or gRPC, implement lazy loading to prevent SIGSEGV in Celery workers. See Google Drive connector for pattern.
UUID Consistency: Once a connector ID is assigned, never change it. Existing installations may reference it.
Schema Backwards Compatibility: When modifying schemas, ensure existing configurations remain valid.
Icon Naming: Use PascalCase with spaces URL-encoded (e.g., Google%20Drive.png).
Test Isolation: Mock tests should never require external services. Use @unittest.skipUnless for integration tests.
fsspec Delegation: NEVER override walk(), listdir(), stat(), exists(), isdir(), isfile(), delete(), read_bytes(), or write_bytes() in custom AbstractFileSystem subclasses. These are provided by fsspec's base class and delegate to ls()/info()/pipe_file(). Overriding them creates maintenance burden and introduces subtle bugs. See references/connector_patterns.md for the "Anti-Pattern: Reimplementing fsspec Base Methods" section.
pipe_file vs write_bytes: When implementing file writes in a custom AbstractFileSystem, override pipe_file() (not write_bytes()). The base write_bytes() is an alias that delegates to pipe_file().