OpenCV video processing patterns — keyframe extraction via scene change detection, frame normalization, batch processing, temporary file management, and ffmpeg integration.
import cv2
import numpy as np
async def extract_keyframes(
video_path: str,
threshold: float = 30.0,
max_frames: int = 20,
min_interval_seconds: float = 1.0,
) -> list[np.ndarray]:
"""Extract keyframes by detecting scene changes via histogram comparison."""
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
min_interval_frames = int(fps * min_interval_seconds)
keyframes: list[np.ndarray] = []
prev_hist = None
frame_idx = 0
last_keyframe_idx = -min_interval_frames
while cap.isOpened() and len(keyframes) < max_frames:
ret, frame = cap.read()
if not ret:
break
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
hist = cv2.calcHist([gray], [0], None, [256], [0, 256])
cv2.normalize(hist, hist)
if prev_hist is not None:
diff = cv2.compareHist(prev_hist, hist, cv2.HISTCMP_CHISQR)
if diff > threshold and (frame_idx - last_keyframe_idx) >= min_interval_frames:
keyframes.append(frame)
last_keyframe_idx = frame_idx
prev_hist = hist
frame_idx += 1
cap.release()
return keyframes
def normalize_frame(frame: np.ndarray, target_size: tuple[int, int] = (640, 480)) -> bytes:
"""Resize frame and encode as JPEG bytes for API consumption."""
resized = cv2.resize(frame, target_size, interpolation=cv2.INTER_AREA)
_, buffer = cv2.imencode(".jpg", resized, [cv2.IMWRITE_JPEG_QUALITY, 85])
return buffer.tobytes()
If scene detection finds too few keyframes, fall back to uniform time sampling:
def extract_uniform_frames(video_path: str, num_frames: int = 10) -> list[np.ndarray]:
cap = cv2.VideoCapture(video_path)
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
indices = np.linspace(0, total - 1, num_frames, dtype=int)
frames = []
for idx in indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if ret:
frames.append(frame)
cap.release()
return frames
import tempfile
from pathlib import Path
from contextlib import asynccontextmanager
@asynccontextmanager
async def temp_video_file(content: bytes, suffix: str = ".mp4"):
"""Context manager for temporary video files with guaranteed cleanup."""
tmp = Path(tempfile.mkdtemp()) / f"upload{suffix}"
try:
tmp.write_bytes(content)
yield str(tmp)
finally:
tmp.unlink(missing_ok=True)
tmp.parent.rmdir()
def get_video_metadata(video_path: str) -> dict[str, float | int]:
cap = cv2.VideoCapture(video_path)
return {
"fps": cap.get(cv2.CAP_PROP_FPS),
"frame_count": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)),
"width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
"height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
"duration_seconds": cap.get(cv2.CAP_PROP_FRAME_COUNT) / max(cap.get(cv2.CAP_PROP_FPS), 1),
}
cap.release() — use try/finally or context managers.asyncio.to_thread) to avoid blocking the event loop.cap.isOpened()).