#!/usr/bin/python # Copyright 2011 David Nesting. # # This work by David Nesting is licensed under a Creative Commons # Attribution-NonCommercial-ShareAlike 3.0 Unported License. # # http://creativecommons.org/licenses/by-nc-sa/3.0/ import logging import math import xbee def BytesToHexAddr(data, pad=0): # I'm sure this could be done more efficiently: bytes = map(ord, data) while len(bytes) < pad: bytes.insert(0, 0) hexbytes = map(lambda x: '%02x' % x, bytes) return ''.join(hexbytes) class ResistiveDivider(object): """Describes a resistive voltage divider, and provides solver methods.""" def __init__(self, vo=None, rg=None, rs=None, vin=None): """Creates a new resistive divider. Args: vin: Supply voltage vo: Output voltage rg: Resistance between vo and ground rs: Resistance between vo and vin Raises: ValueError if neither rg nor rs is provided """ if rg is None and rs is None: raise ValueError('rg or rs must be provided') self.vo = vo self.rg = rg self.rs = rs self.vin = vin def _params(self, vo, rg, rs, vin): # Helper for layering supplied arguments over arguments created with # the object, and ensuring they're all floats. if vo is None: vo = self.vo if vin is None: vin = self.vin if rg is None: rg = self.rg if rs is None: rs = self.rs if vo is not None: vo = float(vo) if rg is not None: rg = float(rg) if rs is not None: rs = float(rs) if vin is not None: vin = float(vin) return vo, rg, rs, vin def Vin(self, vo=None, rg=None, rs=None): """Computes input voltage. Args: vo: Measured output voltage (if not already known) vg: Measured resistance to ground (if not already known) rs: Measured resistance to vin (if not already known) Returns: Voltage at vin """ vo, rg, rs, _ = self._params(vo, rg, rs, None) if vo is None: raise ValueError('vo must be provided') if rg is None: raise ValueError('rg must be provided') if rs is None: raise ValueError('rs must be provided') vin = (rg + rs) * vo / rg return vin def Vo(self, rg=None, rs=None, vin=None): """Computes output voltage. Args: rg: Measured resistance to ground (if not already known) rs: Measured resistance to vin (if not already known) vin: Measured input voltage (if not already known) Returns: Output voltage """ _, rg, rs, vin = self._params(None, rg, rs, vin) if vin is None: raise ValueError('vin must be provided') if rg is None: raise ValueError('rg must be provided') if rs is None: raise ValueError('rs must be provided') vo = rg / (rg + rs) * vin return vo def R(self, vo=None, rg=None, rs=None, vin=None): """Computes missing resistance value. Args: vo: Measured output voltage (if not already known) rg: Measured resistance to ground (if not already known) rs: Measured resistance to vin (if not already known) vin: Measured input voltage (if not already known) Returns: Either rg or rs, whichever is missing """ vo, rg, rs, vin = self._params(vo, rg, rs, vin) if vin is None: raise ValueError('vin must be provided') if vo is None: raise ValueError('vo must be provided') if rg is None and rs is None: raise ValueError('rg or rs must be provided') if rg is None: r = -rs * vo / (vo - vin) elif rs is None: if vo > 0: r = -(rg * vo - rg * vin) / vo else: r = None else: raise ValueError('Asked to solve for R, but rg and rs are known') return r def Sample(adc): """Converts a 10-bit 1.2v ADC sample to a voltage.""" v = adc * 1.2 / 1024 return v def Vh400(v): """Converts a VH400 voltage value to a volumetric water content value. This uses the formula described at http://www.vegetronix.com/Products/VH400/VH400-Piecewise-Curve.phtml. """ vwc = None if v < 1.1: vwc = 10 * v - 1 elif v < 1.3: vwc = 25 * v - 17.5 elif v < 1.82: vwc = 48.08 * v - 47.5 else: vwc = 26.32 * v - 7.89 logging.debug('vh400: VWC v=%.2f = %.2f', v, vwc) return vwc def Solar(v): # No-op, just give us the raw voltage return v def Thermistor(tnom, rnom, r, bcoeff=3950): """Converts a thermistor resistance value to degrees. This uses the approach described at http://www.ladyada.net/learn/sensors/thermistor.html and http://en.wikipedia.org/wiki/Steinhart%E2%80%93Hart_equation Args: tnom: Nominal temperature (providing rnom) rnom: Nominal resistance (at temperature tnom) r: Measured resistance bcoeff: beta coefficient for the thermistor Returns: Degrees Celsius, or None if r is None """ if r is None: degrees = None else: degrees = 1.0 / (math.log(r / rnom) / bcoeff + 1.0 / (tnom + 273.15)) - 273.15 logging.debug('thermistor degrees for tnom=%r rnom=%r bcoeff=%r ' + 'r=%.2f = %.2f', tnom, rnom, bcoeff, r, degrees) return degrees # Set up my resistive voltage dividers. Measure these values yourself. div_solar = ResistiveDivider(rg=3330, rs=13760) div_moisture = ResistiveDivider(rg=6860, rs=11330) div_temp = ResistiveDivider(rg=3328, vin=3.214) # The sensors are described in this data structure so that it can be easily # expanded. In practice I have a script similar to this recording data for # multiple XBee sensor nodes like this. HANDLERS = { # The dict key here is the 64-bit source address of the XBee node. '0011223344556677': { 'adc-0': ('gardensolar', lambda v: Solar(div_solar.Vcc(vo=v))), 'adc-1': ('gardentemp', lambda v: Thermistor(25, 10000, div_temp.R(vo=v))), 'adc-2': ('gardenmoisture', lambda v: Vh400(div_moisture.Vcc(vo=v))), }, } def main(_): logging.basicConfig(level=logging.DEBUG) import serial # Change this as needed to reflect the actual serial device you need. x = xbee.ZigBee(serial.Serial('/dev/ttyUSB9', 9600), escaped=True) while True: try: frame = x.wait_read_frame() if not frame: continue addr = None if 'source_addr_long' in frame: # Extract a readable 64-bit source address addr = BytesToHexAddr(frame['source_addr_long'], pad=8) if addr in HANDLERS and 'samples' in frame: dev = HANDLERS[addr] for sample in frame['samples']: for port in sample: if port in dev: label, func = dev[port] # Convert the XBee sample to a voltage (as viewed # by the XBee) sampled = Sample(sample[port]) logging.debug('--- %s (%s): %r (%.2f volts)', port, label, sample[port], sampled) # Do something with the result print '%s: %r' % (label, func(sampled)) # Note that if you want to update a round-robin # database (RRD) here, you'll want to accumulate # all of the measurements and invoke 'update' only # once with all of them. except KeyboardInterrupt: break if __name__ == '__main__': import sys main(sys.argv[1:])