Fog computing architecture and IoT integration
┌─────────────────────────────────────────┐
│ Cloud (Tier 3) │
│ Long-term storage, ML training │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Fog Layer (Tier 2) │
│ Regional data centers, analytics │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Edge Layer (Tier 1) │
│ Gateways, local processing │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ IoT Devices (Tier 0) │
│ Sensors, actuators, controllers │
└─────────────────────────────────────────┘
import paho.mqtt.client as mqtt
# Fog node configuration
class FogNode:
def __init__(self, broker, node_id):
self.client = mqtt.Client(client_id=node_id)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.broker = broker
def on_connect(self, client, userdata, flags, rc):
# Subscribe with QoS levels
# QoS 0: At most once (fire and forget)
# QoS 1: At least once (acknowledged delivery)
# QoS 2: Exactly once (assured delivery)
client.subscribe("sensors/#", qos=1)
client.subscribe("alerts/#", qos=2)
def on_message(self, client, userdata, msg):
# Process at fog layer
payload = json.loads(msg.payload)
# Filter and aggregate
if self.should_process(payload):
result = self.process_locally(payload)
# Forward important data to cloud
if result.important:
client.publish("cloud/data", json.dumps(result))
def should_process(self, payload):
# Local processing decisions
return payload.get('priority') == 'high'
class DataAggregator:
def __init__(self, window_size=60):
self.window_size = window_size
self.buffer = []
def aggregate(self, data_points):
# Time-windowed aggregation
aggregated = {
'count': len(data_points),
'avg': sum(d['value'] for d in data_points) / len(data_points),
'min': min(d['value'] for d in data_points),
'max': max(d['value'] for d in data_points),
'sum': sum(d['value'] for d in data_points),
'window': self.window_size
}
# Downsampling for cloud
if aggregated['count'] > 1000:
return self.downsample(aggregated)
return aggregated
def downsample(self, data):
# Keep statistical summary
return {
'count': data['count'],
'avg': data['avg'],
'std': self.calculate_std(data),
'min': data['min'],
'max': data['max']
}
# Kubernetes deployment for fog
apiVersion: apps/v1