Build sensor data pipelines with I2C, SPI, UART, and GPIO protocols. Use when integrating sensors with Raspberry Pi, Jetson, or other SBCs for data collection, calibration, and anomaly detection.
"In embedded systems, the sensor is the source of truth. If you don't trust your sensor, you don't trust your system." -- Jack Ganssle, The Art of Designing Embedded Systems
This skill coordinates the full lifecycle of sensor integration: from physical wiring and protocol configuration through calibration, validation, and data publishing. Sensors produce raw analog or digital signals that must be treated with skepticism until proven reliable.
Non-Negotiable Constraints:
| Principle | Description | Priority |
|---|---|---|
| Data Integrity | Every reading must include timestamp, sensor ID, and validity flag | Critical |
| Calibration First | No sensor enters production without a documented calibration procedure | Critical |
| Protocol Selection | Choose the simplest protocol that meets bandwidth and latency requirements | High |
| Sample Rate Management | Match sample rate to the physical phenomenon; oversampling wastes resources, undersampling loses data | High |
| Fault Tolerance | Every sensor read must handle timeout, CRC error, and bus conflict | Critical |
| Wiring Verification | Confirm physical connections before software debugging | Critical |
| Power Budget | Account for sensor current draw, especially on battery-powered systems | High |
| Noise Reduction | Apply hardware filtering (decoupling caps) before software filtering | Medium |
| Reproducibility | Same hardware + same code = same readings within tolerance | High |
| Documentation | Every sensor integration must include wiring diagram, calibration data, and protocol configuration | Medium |
Use search_knowledge (grounded-code-mcp) to ground decisions in authoritative references.
| Query | When to Call |
|---|---|
search_knowledge("I2C SPI UART GPIO protocol selection") | During IDENTIFY — choosing the right bus protocol for a new sensor |
search_knowledge("sensor calibration offset gain correction") | During CALIBRATE — implementing linear or multi-point calibration |
search_knowledge("Raspberry Pi sensor I2C smbus2") | During CONNECT — setting up I2C on Raspberry Pi or Jetson |
search_knowledge("Python sensor data pipeline async") | During PUBLISH — designing async data acquisition loops |
search_knowledge("anomaly detection outlier sensor validation") | During VALIDATE — building range checks and anomaly flagging |
search_code_examples("I2C sensor read Python") | Before writing driver code — find canonical patterns |
search_code_examples("MQTT publish sensor data Python") | Before writing publisher — find MQTT client patterns |
Protocol: Search automation and robotics collections for hardware-specific guidance. Always cite the source path from KB results. If KB returns nothing useful for a sensor-specific question, fall back to the datasheet.
┌────────────────────────────────────────────────────────────────────────┐
│ │
│ ┌──────────┐ ┌─────────┐ ┌───────────┐ ┌───────────┐ │
│ │ IDENTIFY │───>│ CONNECT │───>│ CONFIGURE │───>│ CALIBRATE │──┐ │
│ └──────────┘ └─────────┘ └───────────┘ └───────────┘ │ │
│ │ │
│ ┌──────────────────────────────────────────────────────────┘ │
│ │ │
│ v │
│ ┌──────────┐ ┌─────────┐ │
│ │ VALIDATE │───>│ PUBLISH │──────────────────────────────────────┐ │
│ └──────────┘ └─────────┘ │ │
│ ^ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ (continuous monitoring loop) │
│ │
└────────────────────────────────────────────────────────────────────────┘
Before writing any sensor code, verify:
┌─────────────────────────────────────────────────────────┐
│ Pre-Flight Checklist │
├─────────────────────────────────────────────────────────┤
│ □ Sensor datasheet downloaded and reviewed │
│ □ Operating voltage confirmed (3.3V vs 5V) │
│ □ Protocol identified (I2C / SPI / UART / GPIO) │
│ □ Pin assignments documented │
│ □ Pull-up / pull-down resistors installed if needed │
│ □ Decoupling capacitor placed near sensor VCC │
│ □ I2C address confirmed (no conflicts on bus) │
│ □ Python libraries installed (smbus2 / spidev / etc.) │
│ □ User has permission to access /dev/ devices │
│ □ Test harness ready (known reference values available) │
└─────────────────────────────────────────────────────────┘
┌─────────────────────┐
│ How many sensors on │
│ the same bus? │
└────────┬────────────┘
│
┌──────────────┴──────────────┐
│ │
1 sensor 2+ sensors
│ │
v v
┌─────────────────┐ ┌──────────────────┐
│ Need high speed │ │ Each sensor has │
│ (>1 MHz)? │ │ unique address? │
└───────┬─────────┘ └────────┬─────────┘
│ │
┌──────┴──────┐ ┌───────┴──────┐
│ │ │ │
Yes No Yes No
│ │ │ │
v v v v
┌─────┐ ┌─────────┐ ┌─────┐ ┌─────┐
│ SPI │ │ Simple │ │ I2C │ │ SPI │
└─────┘ │ digital? │ └─────┘ │(CS) │
└────┬────┘ └─────┘
┌───┴───┐
│ │
Yes No
│ │
v v
┌──────┐ ┌──────┐
│ GPIO │ │ UART │
└──────┘ └──────┘
Quick Protocol Summary:
| Protocol | Speed | Wires | Multi-Device | Best For |
|---|---|---|---|---|
| I2C | 100-400 kHz (std) | 2 (SDA, SCL) | Yes (addressing) | Low-speed sensors, config registers |
| SPI | 1-50 MHz | 4+ (MOSI, MISO, SCLK, CS) | Yes (chip select) | High-speed, ADCs, displays |
| UART | 9600-115200 baud | 2 (TX, RX) | No (point-to-point) | GPS, serial sensors |
| GPIO | N/A | 1 per signal | No | Digital on/off, triggers, PWM |
Determine sensor type, part number, protocol, operating voltage, and required libraries.
# Document sensor identity
sensor_manifest = {
"name": "BME280",
"type": "Environmental",
"measures": ["temperature", "humidity", "pressure"],
"protocol": "i2c",
"address": 0x76, # or 0x77 with SDO high
"voltage": 3.3,
"library": "adafruit-circuitpython-bme280",
"datasheet": "https://www.bosch-sensortec.com/media/boschsensortec/downloads/datasheets/bst-bme280-ds002.pdf",
}
Wire the sensor. Verify physical connections.
# Verify I2C device is detected
import subprocess
def scan_i2c_bus(bus_number: int = 1) -> list[int]:
"""Scan I2C bus and return list of detected addresses."""
import smbus2
bus = smbus2.SMBus(bus_number)
devices = []
for addr in range(0x03, 0x78):
try:
bus.read_byte(addr)
devices.append(addr)
except OSError:
pass
bus.close()
return devices
detected = scan_i2c_bus()
assert 0x76 in detected, f"BME280 not found! Detected: {[hex(a) for a in detected]}"
Set sample rate, resolution, filtering, and operating mode.
import adafruit_bme280.advanced as adafruit_bme280
import board
import busio
i2c = busio.I2C(board.SCL, board.SDA)
bme280 = adafruit_bme280.Adafruit_BME280_I2C(i2c, address=0x76)
# Configure oversampling and filter
bme280.overscan_temperature = adafruit_bme280.OVERSCAN_X16
bme280.overscan_humidity = adafruit_bme280.OVERSCAN_X16
bme280.overscan_pressure = adafruit_bme280.OVERSCAN_X16
bme280.iir_filter = adafruit_bme280.IIR_FILTER_X16
bme280.mode = adafruit_bme280.MODE_NORMAL
bme280.standby_period = adafruit_bme280.STANDBY_TC_500
Compare sensor readings against known reference values and compute correction factors.
import numpy as np
def calibrate_temperature(sensor_readings: list[float],
reference_readings: list[float]) -> tuple[float, float]:
"""Two-point linear calibration. Returns (offset, gain)."""
sensor_arr = np.array(sensor_readings)
ref_arr = np.array(reference_readings)
gain, offset = np.polyfit(sensor_arr, ref_arr, 1)
return offset, gain
# Example: ice water (0C) and boiling water (100C)
raw_readings = [1.2, 99.5]
reference = [0.0, 100.0]
offset, gain = calibrate_temperature(raw_readings, reference)
print(f"Calibration: corrected = {gain:.4f} * raw + {offset:.4f}")
Run the sensor under controlled conditions and verify readings fall within expected tolerance.
import time
def validate_sensor(sensor, calibration, tolerance: float = 0.5,
num_samples: int = 100) -> dict:
"""Collect samples and validate statistical properties."""
readings = []
for _ in range(num_samples):
raw = sensor.temperature
corrected = calibration["gain"] * raw + calibration["offset"]
readings.append(corrected)
time.sleep(0.1)
arr = np.array(readings)
return {
"mean": float(np.mean(arr)),
"std": float(np.std(arr)),
"min": float(np.min(arr)),
"max": float(np.max(arr)),
"range_ok": bool(np.std(arr) < tolerance),
"num_samples": num_samples,
}
Push validated data to downstream consumers (MQTT, database, file, etc.).
import json
import time
def publish_reading(sensor_id: str, value: float, unit: str,
calibrated: bool = True) -> dict:
"""Format a sensor reading for downstream consumption."""
return {
"sensor_id": sensor_id,
"timestamp": time.time(),
"value": round(value, 4),
"unit": unit,
"calibrated": calibrated,
"quality": "valid",
}
reading = publish_reading("bme280-01", 22.35, "celsius")
print(json.dumps(reading, indent=2))
Maintain state across conversation turns using this block:
<sensor-state>