Deploy and operate CAPEv2 sandbox for automated malware analysis with behavioral monitoring, payload extraction, configuration parsing, and anti-evasion capabilities.
CAPE (Config And Payload Extraction) is an open-source malware sandbox derived from Cuckoo that automates behavioral analysis, payload dumping, and configuration extraction. CAPEv2 features API hooking for behavioral instrumentation, captures files created/modified/deleted during execution, records network traffic in PCAP format, and includes 70+ custom configuration extractors (cape-parsers) for families like Emotet, TrickBot, Cobalt Strike, AsyncRAT, and Rhadamanthys. The signature system includes 1000+ behavioral signatures detecting evasion techniques, persistence, credential theft, and ransomware behavior. CAPE's debugger enables dynamic anti-evasion bypasses combining debugger actions within YARA signatures. Recommended deployment: Ubuntu LTS host with Windows 10 21H2 guest VM.
#!/usr/bin/env python3
"""CAPE sandbox API client for automated malware submission and analysis."""
import requests
import json
import time
import sys
from pathlib import Path
class CAPEClient:
def __init__(self, base_url="http://localhost:8000", api_token=None):
self.base_url = base_url.rstrip("/")
self.headers = {}
if api_token:
self.headers["Authorization"] = f"Token {api_token}"
def submit_file(self, filepath, options=None):
"""Submit a file for analysis."""
url = f"{self.base_url}/apiv2/tasks/create/file/"
files = {"file": open(filepath, "rb")}
data = options or {}
data.setdefault("timeout", 120)
data.setdefault("enforce_timeout", False)
resp = requests.post(url, files=files, data=data, headers=self.headers)
resp.raise_for_status()
result = resp.json()
task_id = result.get("data", {}).get("task_ids", [None])[0]
print(f"[+] Submitted {filepath} -> Task ID: {task_id}")
return task_id
def get_status(self, task_id):
"""Check task analysis status."""
url = f"{self.base_url}/apiv2/tasks/status/{task_id}/"
resp = requests.get(url, headers=self.headers)
return resp.json().get("data", "unknown")
def wait_for_completion(self, task_id, poll_interval=15, max_wait=600):
"""Wait for analysis to complete."""
elapsed = 0
while elapsed < max_wait:
status = self.get_status(task_id)
if status == "reported":
print(f"[+] Task {task_id} completed")
return True
time.sleep(poll_interval)
elapsed += poll_interval
print(f" Waiting... ({elapsed}s, status: {status})")
return False
def get_report(self, task_id):
"""Retrieve full analysis report."""
url = f"{self.base_url}/apiv2/tasks/get/report/{task_id}/"
resp = requests.get(url, headers=self.headers)
return resp.json()
def get_config(self, task_id):
"""Get extracted malware configuration."""
report = self.get_report(task_id)
configs = report.get("CAPE", {}).get("configs", [])
return configs
def get_dropped_files(self, task_id):
"""List files dropped during analysis."""
report = self.get_report(task_id)
return report.get("dropped", [])
def get_network_iocs(self, task_id):
"""Extract network IOCs from analysis."""
report = self.get_report(task_id)
network = report.get("network", {})
iocs = {
"dns": [d.get("request") for d in network.get("dns", [])],
"http": [h.get("uri") for h in network.get("http", [])],
"tcp": [f"{h.get('dst')}:{h.get('dport')}"
for h in network.get("tcp", [])],
}
return iocs
def analyze_sample(self, filepath):
"""Full automated analysis pipeline."""
task_id = self.submit_file(filepath)
if not task_id:
return None
if self.wait_for_completion(task_id):
report = {
"task_id": task_id,
"config": self.get_config(task_id),
"network_iocs": self.get_network_iocs(task_id),
"dropped_files": len(self.get_dropped_files(task_id)),
}
return report
return None
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <malware_sample> [cape_url]")
sys.exit(1)
url = sys.argv[2] if len(sys.argv) > 2 else "http://localhost:8000"
client = CAPEClient(url)
result = client.analyze_sample(sys.argv[1])
if result:
print(json.dumps(result, indent=2))