Measuring electrical current with Python requires more thought than plugging in a sensor and calling a library. The hardware path you choose determines your resolution, your noise floor, your ability to detect ground faults, and how much accuracy you actually get under real conditions. This article covers building a complete, production-worthy current reading program in Python — from hardware fundamentals through calibration math, gain and resolution tradeoffs, high-side versus low-side sensing decisions, noise mitigation, asyncio-based concurrent logging, and running your monitor as a persistent systemd service.
Two common hardware paths are covered in depth: the INA219 current sensor over I2C, and a shunt resistor read through an MCP3008 ADC over SPI. Both are practical and well-supported in Python, but they serve different use cases, and the difference is worth understanding before you commit to one.
How Python Reads Electrical Current
Python does not read current directly. A microcontroller or single-board computer like a Raspberry Pi has no built-in current sensing capability, so the hardware chain typically looks like this:
Current sensor or shunt resistor → ADC → I2C or SPI bus → Python
A current sensor like the Texas Instruments INA219 handles the amplification and analog-to-digital conversion internally, delivering calibrated current, voltage, and power values over I2C. Python reads those values with a single library call. A shunt resistor approach is more hands-on: you place a known low-resistance resistor in series with your load, measure the tiny voltage across it with an external ADC such as the MCP3008, and calculate current from Ohm's law (I = V / R). Both approaches are valid, but they have meaningfully different noise characteristics, placement constraints, and accuracy profiles.
High-Side vs. Low-Side: A Decision That Matters
Before wiring anything, you need to decide where in the circuit the shunt resistor (or INA219) sits. This choice affects more than just wiring — it determines what faults you can detect and whether your ground reference is stable.
In low-side sensing, the shunt sits between the load and ground. The advantage is a simple common-mode reference: the voltage across the shunt is just slightly above ground, so a basic op-amp or ADC can measure it directly without needing high common-mode range. The disadvantage is that the load's ground rail is now lifted above system ground by the shunt voltage drop — typically a few millivolts, but this can still cause ground loop noise in sensitive circuits. More critically, low-side sensing cannot detect a short between the load and ground, because fault current would bypass the shunt entirely.
In high-side sensing, the shunt sits between the positive supply and the load. The load remains directly referenced to ground, which eliminates ground loop concerns and allows the monitor to detect downstream short-circuit faults. The INA219 is specifically designed for high-side operation — its inputs tolerate up to 26V common-mode voltage, according to the Texas Instruments datasheet. This makes it the correct choice for most battery-powered and safety-aware monitoring applications.
For the INA219 examples in this article, Vin+ connects to the positive supply rail and Vin- connects to the load. Current flows in, through the internal shunt, and out to the load. This is high-side placement. The MCP3008 shunt example in Section 6 uses low-side placement (shunt between load and ground) because the ADC input cannot tolerate high common-mode voltage — an important constraint to understand when choosing your path.
Option 1: INA219 over I2C
The INA219 is a dedicated DC current and power monitor from Texas Instruments. It senses across shunts on buses up to 26V, operates from a 3V to 5.5V supply drawing at most 1mA, and communicates over I2C with up to 16 programmable addresses. It ships in two grades: the standard INA219A and the higher-precision INA219B. The Adafruit INA219 breakout board uses a 0.1Ω shunt and is the easiest starting point for Raspberry Pi projects.
According to TI's datasheet (Rev. G), the INA219A achieves a maximum shunt amplifier offset voltage of ±100μV at PGA=/1, while the higher-precision INA219B grade tightens that to ±50μV at the same gain setting. Note that as of September 2022, Adafruit's INA219 breakout may ship with either grade depending on availability — check the chip marking (I219A or I219B) if the distinction matters for your application. This matters in real deployments: a Raspberry Pi in an outdoor enclosure or near heat-generating components will not be at 25°C, and you should factor thermal drift into any accuracy claims you make about your logged data.
Wiring
Connect the INA219 breakout to your Raspberry Pi as follows. The INA219 sits in series with the power path — current flows into Vin+ and out of Vin- to the load.
| INA219 Pin | Raspberry Pi Pin |
|---|---|
| VCC | 3.3V (Pin 1) |
| GND | Ground (Pin 6) |
| SDA | GPIO 2 / SDA (Pin 3) |
| SCL | GPIO 3 / SCL (Pin 5) |
| Vin+ | Positive supply rail (before load) |
| Vin- | Load positive terminal (after shunt) |
Enable I2C on Raspberry Pi
I2C must be enabled before the sensor appears on the bus. On Raspberry Pi OS, use raspi-config:
sudo raspi-config
# Navigate to: Interface Options > I2C > Enable
sudo reboot
After rebooting, confirm the sensor is visible. You should see 0x40 in the output grid (the INA219's default address when both A0 and A1 pins are tied to ground):
sudo i2cdetect -y 1
Library choice
Two Python libraries are available for the INA219. The pi-ina219 package (installed as shown below) is the one used in this article's examples because its API is simple and its calibration is automatic based on the shunt and MAX_AMPS values you supply. Note that as of early 2026 this library is unmaintained — no new versions have been released to PyPI in over 12 months and the repository shows no active development. It remains functional on current Raspberry Pi OS releases, but if you need active support or CircuitPython compatibility, Adafruit's adafruit-circuitpython-ina219 is the actively maintained alternative and supports the same hardware.
pip install pi-ina219
Raspberry Pi OS Bookworm (released late 2023) enforces PEP 668, blocking bare pip install on the system Python to prevent conflicts with OS-managed packages. If you see an "externally managed environment" error, create a virtual environment first: python3 -m venv venv && source venv/bin/activate, then run pip inside it. Alternatively, pass --break-system-packages for a system-wide install if you understand the implications.
Basic reading script
from ina219 import INA219, DeviceRangeError
SHUNT_OHMS = 0.1 # Value of the shunt resistor on the Adafruit breakout
MAX_AMPS = 2.0 # Maximum expected current in your circuit
def read_sensor():
ina = INA219(SHUNT_OHMS, MAX_AMPS)
ina.configure(ina.RANGE_16V) # Use RANGE_32V if your bus exceeds 16V
print(f"Bus Voltage: {ina.voltage():.3f} V")
print(f"Shunt Voltage: {ina.shunt_voltage():.3f} mV")
try:
print(f"Current: {ina.current():.3f} mA")
print(f"Power: {ina.power():.3f} mW")
except DeviceRangeError as e:
print(f"Overflow -- reduce MAX_AMPS or check wiring: {e}")
if __name__ == "__main__":
read_sensor()
If the actual current exceeds MAX_AMPS, the INA219's gain setting becomes insufficient and readings become meaningless. The library raises DeviceRangeError rather than returning silently wrong data. Always handle this exception explicitly. The practical ceiling with the 0.1Ω Adafruit shunt is approximately 3.2A.
The Calibration Math Behind the Library
Many tutorials treat the INA219 as a black box: pass in your shunt resistance and maximum current, read a number. That is convenient but it leaves a gap that will cost you when things are not working right, or when you need to squeeze more precision from the sensor. Understanding the calibration mechanism takes five minutes and pays dividends.
The INA219 contains a 12-bit ADC that measures only voltage — specifically, the differential voltage across the shunt resistor (Vin+ minus Vin-). It has no inherent knowledge of the shunt resistance value. You communicate the shunt value and your desired measurement resolution to the chip by writing to the Calibration Register. The chip then uses that value to compute current and power in hardware, making the results available in the Current Register and Power Register over I2C.
The calibration register value is derived from this relationship (taken from the TI INA219 datasheet):
Calibration Register = trunc( 0.04096 / (Current_LSB × R_shunt) )
Current_LSB is the resolution of each bit in the Current Register, expressed in amps per bit. You choose this value. The smaller you make it, the finer your resolution — but there is a floor. The minimum achievable Current_LSB is your maximum expected current divided by 32,767 (2¹&sup5; − 1). The maximum is your maximum expected current divided by 4,096 (2¹²).
A worked example with the default Adafruit setup (0.1Ω shunt, 2A MAX_AMPS):
- Minimum LSB = 2.0A / 32,767 ≈ 61μA per bit
- Maximum LSB = 2.0A / 4,096 ≈ 488μA per bit
- Chosen LSB = 100μA per bit (a round number near the minimum)
- Calibration Register = trunc(0.04096 / (0.0001 × 0.1)) = 4,096
This is exactly what the pi-ina219 library computes automatically when you pass SHUNT_OHMS=0.1 and MAX_AMPS=2.0. Knowing this, you can verify that the library is behaving as expected, and you can manually compute what precision you are actually getting at your specific configuration. The power LSB is always 20 times the current LSB, because the Power Register is computed internally as the product of current and bus voltage (which has a fixed 4mV LSB), per the INA219 datasheet.
One practical consequence: if you want the highest possible resolution for a low-current application — say, monitoring a sensor drawing less than 50mA — use a larger shunt resistor and a lower MAX_AMPS. A 1Ω shunt with MAX_AMPS of 0.1A gives you a minimum LSB of around 3μA, roughly 33 times finer than the default setup. The tradeoff is that a larger shunt dissipates more power and introduces a larger voltage drop in your circuit.
Gain, Resolution, and the Tradeoff Nobody Explains
The INA219 has a Programmable Gain Amplifier (PGA) with four settings that control the maximum shunt voltage the chip can measure: 40mV, 80mV, 160mV, and 320mV full-scale range. This gain setting is independent of the Calibration Register and interacts with it in ways that many tutorials skip.
Here is the core tradeoff: a lower gain (40mV full scale, PGA = /1) gives you finer voltage resolution across the shunt, which improves current resolution. But it also limits the maximum current you can measure before the shunt voltage exceeds 40mV. With a 0.1Ω shunt, 40mV corresponds to only 400mA. Exceeding that saturates the ADC and triggers DeviceRangeError. A higher gain (320mV full scale, PGA = /8, the library's default when you use auto-gain) accommodates up to 3.2A but with coarser resolution on low-current readings.
The pi-ina219 library handles this automatically when you specify MAX_AMPS — it selects the appropriate gain for your range. But the default gain of /8 (320mV range) is optimized for the widest current range, not the finest precision. If your circuit draws a predictable maximum of 500mA, explicitly setting ina.GAIN_2_80MV in your configure call will give you meaningfully better resolution on small current changes.
from ina219 import INA219, DeviceRangeError
SHUNT_OHMS = 0.1
MAX_AMPS = 0.5 # Circuit draws at most 500mA
ina = INA219(SHUNT_OHMS, MAX_AMPS)
# GAIN_2_80MV: 80mV full-scale range, better resolution for low-current circuits
ina.configure(ina.RANGE_16V, ina.GAIN_2_80MV)
try:
print(f"Current: {ina.current():.3f} mA")
except DeviceRangeError:
print("Current exceeded 80mV shunt range -- switch to GAIN_4_160MV or GAIN_8_320MV")
RANGE_16V and RANGE_32V control the bus voltage measurement range, not the current range. For a 12V system, RANGE_16V is correct and saves nothing in precision (bus voltage LSB is always 4mV regardless of range). The gain setting (GAIN_1_40MV through GAIN_8_320MV) is what controls current measurement precision. These are separate knobs, and conflating them is a common source of confusion.
Option 2: Shunt Resistor + MCP3008 ADC over SPI
If you need more than one simultaneous current channel, want lower-level control over the measurement process, or are working with a microcontroller that lacks I2C support, the shunt-plus-ADC path gives you flexibility the INA219 cannot. The MCP3008 is an 8-channel, 10-bit ADC that communicates over SPI. Each of its eight channels can independently read an analog voltage, making it practical to monitor multiple shunt resistors simultaneously without adding hardware complexity.
The key limitation of the MCP3008 in this context is common-mode voltage. Its inputs must remain within 0V to VDD (3.3V or 5V depending on your supply). This makes it a low-side sensing solution: your shunt resistor must sit between the load and ground so that the voltage across it stays near 0V. If you need high-side sensing with multiple channels, look at the INA3221 (a three-channel equivalent of the INA219) or use isolated ADC modules.
Install the library
pip install spidev
As with pi-ina219, Raspberry Pi OS Bookworm blocks bare pip install by default. If you see an "externally managed environment" error, install inside a virtual environment: python3 -m venv venv && source venv/bin/activate, then run pip install spidev inside it. If you activated a venv for the INA219 examples, install spidev into the same environment.
Enable SPI on Raspberry Pi
sudo raspi-config
# Navigate to: Interface Options > SPI > Enable
sudo reboot
SPI ADC current reading
One constant worth noting before the code: ADC_BITS is set to 1023, the maximum raw output of a 10-bit ADC (210 − 1). Using 1023 as the divisor means a full-scale raw reading maps to exactly VREF. Some code uses 1024 as the divisor instead, which maps full-scale to VREF × (1023/1024) — a ~0.1% difference. Both conventions are common; this article uses 1023 consistently throughout.
import spidev
import time
SHUNT_OHMS = 0.1 # Resistance of your shunt resistor in ohms
VREF = 3.3 # Reference voltage of your ADC (3.3V or 5V)
ADC_BITS = 1023 # Max raw value of a 10-bit ADC (2^10 - 1)
CHANNEL = 0 # MCP3008 channel connected to the shunt
spi = spidev.SpiDev()
spi.open(0, 0) # Bus 0, chip select 0
spi.max_speed_hz = 1_000_000
def read_adc(channel):
"""Read raw 10-bit value from MCP3008 channel (single-ended mode)."""
if channel < 0 or channel > 7:
raise ValueError("Channel must be 0-7")
response = spi.xfer2([1, (8 + channel) << 4, 0])
raw = ((response[1] & 3) << 8) + response[2]
return raw
def raw_to_current(raw_value):
"""Convert ADC raw value to current in milliamps."""
voltage_v = raw_value * (VREF / ADC_BITS) # Voltage across shunt
current_ma = (voltage_v / SHUNT_OHMS) * 1000 # Ohm's law, converted to mA
return current_ma
def read_adc_averaged(channel, samples=8):
"""Average multiple readings to reduce noise."""
return sum(read_adc(channel) for _ in range(samples)) / samples
try:
while True:
raw = read_adc_averaged(CHANNEL)
voltage = raw * (VREF / ADC_BITS) * 1000 # mV across shunt
current = raw_to_current(raw)
print(f"Raw: {raw:>6.1f} | Shunt: {voltage:.2f} mV | Current: {current:.2f} mA")
time.sleep(0.5)
except KeyboardInterrupt:
print("\nStopped.")
spi.close()
With a 3.3V reference and a 0.1Ω shunt, each ADC step represents approximately 3.22mV, which corresponds to about 32.2mA of current. This is the floor of your measurement resolution. If your circuit draws less than 100mA, the MCP3008 will give you at most three or four distinct readings across that range. For fine-grained low-current monitoring, the INA219 is the better choice.
Noise, Ground Loops, and Getting Clean Readings
Raw current readings are noisy. The SPI clock, GPIO switching, and switching regulators on the Raspberry Pi all inject noise onto the power rails and into adjacent signal lines. A few specific strategies make a measurable difference.
Decouple the ADC supply
The MCP3008 is sensitive to power supply noise. Its datasheet recommends a 0.1μF ceramic decoupling capacitor between VDD and AGND, placed as physically close to the chip as possible. Without this, switching noise from the Pi's digital logic couples into the analog reference and corrupts readings. This single component is responsible for a large fraction of the noise in poorly built MCP3008 circuits.
Additionally, using the Pi's 3.3V rail directly as VREF is suboptimal. The 3.3V rail is generated by a switching regulator and carries noise. For precision measurements, a dedicated voltage reference IC such as the MCP1525 (2.5V, under $1) provides a far more stable reference without additional filtering complexity.
Separate AGND from DGND
The MCP3008 has both an AGND and a DGND pin. Connecting these to the same point at the ADC and running a separate trace back to the system ground star point prevents digital switching currents from flowing through the analog ground path. On a breadboard this is difficult to implement perfectly, but on a PCB it is a standard practice that dramatically reduces measured noise.
Use differential mode for better noise rejection
The MCP3008 supports differential input mode, where it measures the voltage difference between two channels rather than the voltage at one channel relative to AGND. For current sensing across a shunt, you can connect one MCP3008 input to each side of the shunt and use differential mode, which rejects common-mode noise that appears equally on both inputs. The SPI protocol change is minimal:
def read_adc_differential(channel_pos, channel_neg):
"""Read differential voltage between two MCP3008 channels.
Valid pairs: (0,1), (2,3), (4,5), (6,7) -- positive then negative.
Note: channel_neg is validated here but the MCP3008 SPI protocol encodes
differential pairs by channel_pos alone (the chip treats adjacent pairs
as fixed differential inputs). Validate the pair before calling.
"""
# Differential mode: SGL/DIFF bit = 0, channel select encodes the pair
response = spi.xfer2([1, channel_pos << 4, 0])
raw = ((response[1] & 3) << 8) + response[2]
return raw
Software averaging
Averaging multiple consecutive readings in software reduces random noise by a factor of the square root of the number of samples. Eight samples cuts noise roughly in half at the cost of eight times the sampling time. This is already shown in the read_adc_averaged() function above. For the INA219, averaging is configurable directly on the chip through the ADC resolution and oversampling settings, which keeps averaging off the Pi's CPU entirely.
Ground loops with the INA219
The INA219 is a high-side sensor, so its Vin- pin is at load potential (slightly below supply voltage) rather than near ground. This means the INA219's measurement is inherently isolated from the ground plane variations that plague low-side designs. However, if the Pi and the INA219 are far apart on the same bench supply with long wire runs, inductive coupling can still add noise. Keep I2C wires short and away from switching power lines.
Full Program: Continuous Logging with Alerts
The following program builds on the INA219 path to add continuous polling, CSV logging, and an overcurrent alert. The design is intentionally structured so that each responsibility lives in its own function — making it easy to swap the INA219 for a simulated reading or a different sensor without touching the logging or alerting logic.
import csv
import time
import os
from datetime import datetime
from ina219 import INA219, DeviceRangeError
# -- Configuration -----------------------------------------------------------
SHUNT_OHMS = 0.1 # Shunt resistor value in ohms
MAX_AMPS = 2.0 # Maximum expected current
POLL_INTERVAL = 1.0 # Seconds between readings
ALERT_THRESHOLD = 1500.0 # Alert if current exceeds this value in mA
LOG_FILE = "current_log.csv"
MAX_LOG_BYTES = 10_000_000 # Rotate log at 10 MB
# -- Setup -------------------------------------------------------------------
def setup_sensor():
ina = INA219(SHUNT_OHMS, MAX_AMPS)
ina.configure(ina.RANGE_16V)
return ina
def rotate_log_if_needed(filepath):
"""Rename existing log to a timestamped archive if it exceeds MAX_LOG_BYTES."""
if os.path.exists(filepath) and os.path.getsize(filepath) > MAX_LOG_BYTES:
archive = filepath.replace(".csv", f"_{int(time.time())}.csv")
os.rename(filepath, archive)
print(f"Log rotated to {archive}")
def setup_log(filepath):
"""Open CSV log in append mode; write header if the file is new."""
rotate_log_if_needed(filepath)
file_exists = os.path.exists(filepath)
log_file = open(filepath, "a", newline="", encoding="utf-8")
writer = csv.writer(log_file)
if not file_exists:
writer.writerow([
"timestamp", "voltage_v", "current_ma",
"power_mw", "shunt_mv", "alert"
])
return log_file, writer
# -- Reading -----------------------------------------------------------------
def take_reading(ina):
"""Return a dict of measurements, or None on overflow."""
try:
return {
"voltage_v": ina.voltage(),
"current_ma": ina.current(),
"power_mw": ina.power(),
"shunt_mv": ina.shunt_voltage(),
}
except DeviceRangeError:
return None
def check_alert(reading, threshold_ma):
return reading["current_ma"] > threshold_ma
# -- Display -----------------------------------------------------------------
def display_reading(timestamp, reading, alert):
alert_tag = " *** OVERCURRENT ***" if alert else ""
print(
f"[{timestamp}] "
f"Voltage: {reading['voltage_v']:6.3f} V | "
f"Current: {reading['current_ma']:8.2f} mA | "
f"Power: {reading['power_mw']:8.2f} mW"
f"{alert_tag}"
)
# -- Main loop ---------------------------------------------------------------
def main():
print("Current Monitor -- INA219")
print(f"Logging to: {LOG_FILE}")
print(f"Alert threshold: {ALERT_THRESHOLD} mA")
print(f"Poll interval: {POLL_INTERVAL}s")
print("Press Ctrl+C to stop.\n")
ina = setup_sensor()
log_file, writer = setup_log(LOG_FILE)
alert_count = 0
try:
while True:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
reading = take_reading(ina)
if reading is None:
print(f"[{timestamp}] Overflow -- check MAX_AMPS setting.")
time.sleep(POLL_INTERVAL)
continue
alert = check_alert(reading, ALERT_THRESHOLD)
if alert:
alert_count += 1
display_reading(timestamp, reading, alert)
writer.writerow([
timestamp,
f"{reading['voltage_v']:.4f}",
f"{reading['current_ma']:.4f}",
f"{reading['power_mw']:.4f}",
f"{reading['shunt_mv']:.4f}",
"YES" if alert else "NO",
])
log_file.flush() # Write to disk immediately -- critical on embedded hardware
time.sleep(POLL_INTERVAL)
except KeyboardInterrupt:
print(f"\nStopped. {alert_count} alert(s) triggered this session.")
finally:
log_file.close()
if __name__ == "__main__":
main()
log_file.flush() forces each row to disk immediately rather than buffering it in memory. Python's default file buffering can hold several kilobytes of data before writing, meaning the last several seconds of readings are silently lost on any unexpected power loss. On a Raspberry Pi running from a wall adapter, this happens more often than expected. Flush after every row.
Concurrent Logging with asyncio
The blocking time.sleep() loop in the previous program is simple and reliable for a single task. But real monitoring deployments often need to do more than one thing at once: polling the sensor, writing to a log, checking thresholds, and perhaps sending a network alert. Python's asyncio library provides cooperative multitasking that handles these concurrent responsibilities without threads, keeping the code easy to reason about.
The following version runs two coroutines concurrently: one polls the sensor on a fixed interval, the other checks a shared queue and writes log entries. They yield control to each other at every await asyncio.sleep() call, so neither blocks the other.
import asyncio
import csv
import os
from datetime import datetime
from ina219 import INA219, DeviceRangeError
SHUNT_OHMS = 0.1
MAX_AMPS = 2.0
POLL_INTERVAL = 1.0
ALERT_THRESHOLD = 1500.0
LOG_FILE = "current_log_async.csv"
async def poll_sensor(ina, queue):
"""Read sensor on a fixed interval and push readings to the shared queue."""
while True:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
reading = {
"timestamp": timestamp,
"voltage_v": ina.voltage(),
"current_ma": ina.current(),
"power_mw": ina.power(),
"shunt_mv": ina.shunt_voltage(),
}
await queue.put(reading)
except DeviceRangeError:
print(f"[{timestamp}] Overflow -- check MAX_AMPS.")
await asyncio.sleep(POLL_INTERVAL)
async def log_readings(queue, log_file, writer):
"""Consume readings from the queue and write them to the CSV log."""
while True:
reading = await queue.get()
alert = reading["current_ma"] > ALERT_THRESHOLD
tag = " *** OVERCURRENT ***" if alert else ""
print(
f"[{reading['timestamp']}] "
f"Current: {reading['current_ma']:8.2f} mA"
f"{tag}"
)
writer.writerow([
reading["timestamp"],
f"{reading['voltage_v']:.4f}",
f"{reading['current_ma']:.4f}",
f"{reading['power_mw']:.4f}",
f"{reading['shunt_mv']:.4f}",
"YES" if alert else "NO",
])
log_file.flush()
queue.task_done()
async def main():
ina = INA219(SHUNT_OHMS, MAX_AMPS)
ina.configure(ina.RANGE_16V)
file_exists = os.path.exists(LOG_FILE)
log_file = open(LOG_FILE, "a", newline="", encoding="utf-8")
writer = csv.writer(log_file)
if not file_exists:
writer.writerow(["timestamp", "voltage_v", "current_ma", "power_mw", "shunt_mv", "alert"])
queue = asyncio.Queue(maxsize=100)
try:
await asyncio.gather(
poll_sensor(ina, queue),
log_readings(queue, log_file, writer),
)
except asyncio.CancelledError:
pass
finally:
log_file.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nStopped.")
When Ctrl+C is pressed, Python raises KeyboardInterrupt inside asyncio.run(). Python 3.11 and later handle this by cancelling all running tasks automatically before re-raising the exception, so the finally block in main() reliably closes the log file. On Python 3.10 and earlier, task cancellation on KeyboardInterrupt is also handled by asyncio.run() internally — the except asyncio.CancelledError: pass in main() suppresses any residual cancellation propagation from asyncio.gather(), ensuring the finally block always executes and the CSV is properly closed.
The queue's maxsize=100 acts as a buffer and also as a backpressure mechanism. If the log writer falls behind the sensor poller for any reason — a slow disk write, for instance — the queue fills up and await queue.put() blocks until space is available, preventing unbounded memory growth. This is a pattern worth understanding because it applies any time you decouple a producer from a consumer in Python.
Analyzing the Log with Python
Once you have a current_log.csv, Python gives you analysis options ranging from the standard library to full time-series tooling.
Basic statistics with the standard library
import csv
import statistics
readings = []
with open("current_log.csv", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
readings.append(float(row["current_ma"]))
if readings:
print(f"Samples: {len(readings)}")
print(f"Min: {min(readings):.2f} mA")
print(f"Max: {max(readings):.2f} mA")
print(f"Mean: {statistics.mean(readings):.2f} mA")
print(f"Std Dev: {statistics.stdev(readings):.2f} mA")
alerts = sum(1 for r in readings if r > 1500)
print(f"Alerts: {alerts} readings above 1500 mA")
Plotting with matplotlib
matplotlib is not part of the standard library. Install it once with pip install matplotlib before running the script below. On Raspberry Pi OS Bookworm, use your virtual environment: pip install matplotlib with the venv active, or pass --break-system-packages for a system-wide install.
import csv
import matplotlib.pyplot as plt
from datetime import datetime
timestamps = []
currents = []
with open("current_log.csv", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
timestamps.append(
datetime.strptime(row["timestamp"], "%Y-%m-%d %H:%M:%S")
)
currents.append(float(row["current_ma"]))
plt.figure(figsize=(12, 4))
plt.plot(timestamps, currents, linewidth=0.8)
plt.axhline(y=1500, color="red", linestyle="--", label="Alert threshold")
plt.xlabel("Time")
plt.ylabel("Current (mA)")
plt.title("Current over Time")
plt.legend()
plt.tight_layout()
plt.savefig("current_plot.png", dpi=150)
plt.show()
Detecting anomalies with a rolling window
A static threshold alert is a blunt instrument. A rolling window comparison catches subtler problems: a current that is gradually rising over minutes, or short spikes that individually fall under the threshold but cluster in ways that indicate component stress. The following approach computes a rolling mean and flags readings that deviate by more than two standard deviations from the local window:
import csv
import statistics
WINDOW = 30 # Compare each reading against the prior 30 samples
readings = []
anomalies = []
with open("current_log.csv", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
val = float(row["current_ma"])
readings.append((row["timestamp"], val))
if i >= WINDOW:
window_vals = [r[1] for r in readings[i - WINDOW:i]]
mean = statistics.mean(window_vals)
stdev = statistics.stdev(window_vals)
if stdev > 0 and abs(val - mean) > 2 * stdev:
anomalies.append((row["timestamp"], val, mean, stdev))
print(f"Anomalies detected: {len(anomalies)}")
for ts, val, mean, stdev in anomalies[:10]:
print(f" {ts} {val:.1f} mA (mean {mean:.1f}, stdev {stdev:.1f})")
Simulating Without Hardware
Hardware debugging and software debugging are easier to do separately. Build the full program against a simulated signal first, then swap in the real sensor. This also makes it possible to write unit tests for your alerting and logging logic without needing physical hardware in your CI pipeline.
The function below generates a realistic baseline with a slow sinusoidal drift and Gaussian noise — the kind of signal a battery-powered motor controller actually produces:
import random
import math
import time
def simulated_reading(t):
"""Synthetic current reading with noise and a slow sine wave."""
base_current = 800.0 # 800 mA baseline
sine_current = 400.0 * math.sin(t / 10.0) # Slow oscillation
noise = random.gauss(0, 20.0) # Gaussian noise ~20 mA std dev
current_ma = max(0, base_current + sine_current + noise)
return {
"voltage_v": 12.0 + random.gauss(0, 0.05),
"current_ma": current_ma,
"power_mw": 12.0 * current_ma,
"shunt_mv": current_ma * 0.1,
}
start = time.time()
for _ in range(20):
t = time.time() - start
reading = simulated_reading(t)
print(
f"t={t:5.1f}s "
f"Current: {reading['current_ma']:8.2f} mA "
f"Power: {reading['power_mw']:9.2f} mW"
)
time.sleep(0.5)
To use the simulation in the full program, replace take_reading(ina) in the main loop with a wrapper around simulated_reading(t). Everything else — the CSV writer, the alert logic, the display — runs without modification. When hardware is connected, swap the function call back. The architecture supports this because sensor reading, alerting, and logging are separated into distinct functions.
Running as a systemd Service
A script that runs in a terminal is fragile. It stops when the session ends, it does not restart after a reboot, and there is no standard mechanism to check its status. For any deployment that runs longer than a single session, the right approach is to register the monitor as a systemd service. This gives you automatic startup on boot, automatic restart on failure, and log access through journalctl.
First, create a service unit file. Replace /home/pi/current_monitor.py with the actual path to your script, and replace pi with your username:
sudo nano /etc/systemd/system/current-monitor.service
[Unit]
Description=Python Current Monitor (INA219)
[Service]
Type=simple
User=pi
WorkingDirectory=/home/pi
ExecStart=/usr/bin/python3 /home/pi/current_monitor.py
Restart=on-failure
RestartSec=5
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
The After=network.target dependency has been omitted here because a hardware current monitor with no network dependency does not need to wait for networking before starting. If your deployment sends network alerts — MQTT, HTTP POST, email — add After=network-online.target and Wants=network-online.target instead, which waits for an actual network connection rather than just the networking stack initializing.
If you installed pi-ina219 inside a virtual environment rather than system-wide, update ExecStart to point to the Python binary inside the venv: ExecStart=/home/pi/venv/bin/python /home/pi/current_monitor.py.
Enable and start the service:
sudo systemctl daemon-reload
sudo systemctl enable current-monitor.service
sudo systemctl start current-monitor.service
Check status and live output:
sudo systemctl status current-monitor.service
sudo journalctl -u current-monitor.service -f
The Restart=on-failure directive means the service restarts automatically if the Python process crashes. RestartSec=5 adds a five-second delay before each restart attempt, preventing a rapid crash loop from consuming system resources. This combination is appropriate for most embedded monitoring deployments.
Verifying Your Readings Are Actually Accurate
Many tutorials stop at "plug it in and read a number." That is not enough for any application where the current readings drive decisions — charging cutoffs, overcurrent protection, energy accounting, or anomaly detection. Before trusting logged data, verify it.
Compare against a known reference
The simplest verification is to place a known load in your circuit and compare the INA219 reading against a calibrated multimeter in series. A resistive load with a known value is ideal: a 12V supply through a 12Ω resistor draws exactly 1A, and the INA219 should read within a few milliamps of that. If it reads 980mA or 1020mA consistently, the offset is likely a shunt resistor tolerance issue rather than a software problem.
Account for shunt resistor tolerance
The 0.1Ω shunt resistor on the Adafruit INA219 breakout is rated at 1% tolerance. That means the actual resistance could be anywhere from 0.099Ω to 0.101Ω. At 2A, this translates to a potential current error of ±20mA from shunt tolerance alone, before accounting for any INA219 ADC error. The INA219 datasheet's specification of 100μV maximum offset refers to the amplifier input — the shunt resistor tolerance is a separate error source that the chip cannot compensate for automatically.
If you need tighter accuracy, you can measure the actual shunt resistance with a four-wire (Kelvin) measurement and pass that exact value into SHUNT_OHMS. The TI INA219 datasheet also notes that the Calibration Register can be adjusted post-measurement using a reference ammeter to cancel total system error — the pi-ina219 library does not expose this directly, but you can manually adjust SHUNT_OHMS in your code to achieve the same compensation effect.
Validate the timestamp cadence
A 1-second poll interval does not guarantee readings are exactly 1 second apart. Python's time.sleep() is not real-time: OS scheduling, I2C transaction time, and CSV write latency all add jitter. For energy calculations that depend on integrating current over time, log the actual elapsed time between readings rather than assuming the interval, or use monotonic time to compute it:
import time
prev_time = time.monotonic()
while True:
now = time.monotonic()
elapsed = now - prev_time # Actual elapsed time since last reading
prev_time = now
# reading = take_reading(ina)
# charge_mAh += reading["current_ma"] * (elapsed / 3600.0)
time.sleep(POLL_INTERVAL)
Common Issues and Fixes
Zero or negative current readings — Verify the load is actually drawing current and that the INA219 is wired in series with the load, not in parallel. If Vin+ and Vin- are swapped, readings will be negative. The INA219 is bidirectional by design, so reversed polarity gives you a valid (negative) reading rather than an error.
Persistent DeviceRangeError — The MAX_AMPS value passed to INA219() is lower than the actual current being drawn. Increase it to match your circuit's realistic peak draw. With the standard 0.1Ω Adafruit shunt, the practical ceiling is around 3.2A.
I2C address not found — Run i2cdetect -y 1 to confirm the device appears on the bus. A missing address usually means a wiring problem or I2C is not enabled. If you have multiple INA219 sensors on the same bus, the Adafruit breakout exposes four selectable I2C addresses — 0x40, 0x41, 0x44, and 0x45 — configurable via solderable jumpers on A0 and A1. The INA219 IC itself supports 16 programmable addresses in total, but the Adafruit breakout only exposes four of them.
MCP3008 readings stuck at 0 or 1023 — Floating analog inputs default to indeterminate values. Ground any unused MCP3008 input channels. A truly floating input will report garbage and can vary between runs. Per the Microchip MCP3008 datasheet, unused channels should be tied to AGND or VREF rather than left open.
Noisy SPI ADC readings — Add a 100nF ceramic decoupling capacitor between VDD and AGND, placed as close to the MCP3008 as physically possible. Consider switching to a precision voltage reference instead of the Pi's 3.3V rail. Averaging multiple readings in software also helps significantly.
CSV growing without bound — The log rotation code in the full program handles this with a file size check at startup. For more sophisticated rotation, Python's logging module includes RotatingFileHandler which can rotate on size or time automatically.
systemd service not restarting — Check that Restart=on-failure is set and that RestartSec gives the hardware time to reinitialize. If the INA219 loses power briefly during a restart, it may need a moment before responding to I2C. Adding a short time.sleep(1) at the start of your main() function handles this gracefully.
Key Takeaways
- Choose your sensing topology deliberately. High-side sensing (INA219) protects your ground reference and can detect downstream faults. Low-side sensing (shunt + MCP3008) is simpler but introduces ground disturbance and cannot detect ground faults. Know which one you are using and why.
- Understand the calibration math. The INA219 library handles calibration automatically, but the current LSB, gain setting, and shunt tolerance all affect your actual measurement accuracy. Compute your expected resolution before deploying and verify it against a reference load.
- Always flush logs to disk. Buffered writes lose data on power loss. Call
log_file.flush()after every row in any embedded monitoring application. - Use asyncio when you have concurrent tasks. A blocking poll loop is fine for a single task, but the moment you need concurrent responsibilities — polling, alerting, network reporting — asyncio with a shared queue is cleaner and easier to test than threads.
- Run as a systemd service for any persistent deployment. Terminal-based scripts stop at session end and do not survive reboots. A service unit file takes five minutes to write and makes your monitor a proper part of the system.
- Simulate first, verify second. Build against a simulated signal to catch software bugs, then verify the real hardware against a known reference load before trusting any logged data.