Content deduplication for distribution. Use when: preventing duplicate posts to same channel, detecting cross-post duplicates, idempotent job execution.
ShareJob records for the same post + channelShareJob per (post, channel, account) tupleexternal_post_id — if set, post already exists on platformShareJob.status = "completed" + matching post/channel → skipcreate_auto_plan() before job creation# apps/distribution/services.py
from apps.distribution.models import ShareJob
def create_job_if_not_exists(
*, post, account, channel: str, plan=None, payload: dict | None = None,
) -> ShareJob | None:
"""Create a ShareJob only if no completed/pending job exists."""
existing = ShareJob.objects.filter(
post=post,
account=account,
channel=channel,
status__in=["pending", "completed", "retrying"],
).exists()
if existing:
return None # Already distributed or in progress
return ShareJob.objects.create(
post=post,
account=account,
channel=channel,
plan=plan,
payload=payload or {},
)
def execute_job_idempotent(*, job: ShareJob) -> dict:
"""Execute job with idempotency — safe to call multiple times."""
if job.status == "completed" and job.external_post_id:
return {"status": "already_completed", "id": job.external_post_id}
# Mark as in-progress to prevent concurrent execution
updated = ShareJob.objects.filter(
pk=job.pk, status="pending",
).update(status="in_progress")
if updated == 0:
# Another worker already picked this up
return {"status": "already_processing"}
job.refresh_from_db()
return dispatch_to_connector(job=job)
import hashlib
def content_hash(text: str) -> str:
"""Generate hash of content for dedup comparison."""
normalized = " ".join(text.lower().split())
return hashlib.sha256(normalized.encode()).hexdigest()[:16]
def is_near_duplicate(*, post, channel: str) -> bool:
"""Check if similar content was recently posted to this channel."""
recent_jobs = ShareJob.objects.filter(
channel=channel, status="completed",
created_at__gte=timezone.now() - timedelta(hours=24),
)
new_hash = content_hash(post.title + post.meta_description)
for job in recent_jobs:
old_hash = content_hash(job.payload.get("text", ""))
if old_hash == new_hash:
return True
return False
external_post_id check — can't verify if post exists on platform& .\.venv\Scripts\python.exe -m ruff check . --fix
& .\.venv\Scripts\python.exe -m ruff format .
& .\.venv\Scripts\python.exe manage.py check --settings=app.settings_dev