Skip to content

Instantly share code, notes, and snippets.

@yamamaya
Last active August 28, 2017 08:38
Show Gist options
  • Save yamamaya/93e94d83c5cdf5f528e6a631277270c7 to your computer and use it in GitHub Desktop.
Save yamamaya/93e94d83c5cdf5f528e6a631277270c7 to your computer and use it in GitHub Desktop.
NETDWARF obsidianからセンサーの情報をHTTPで定期的に送信する
import time
import NDSPI
from NDCore import ConnectLib, NDHTTPLib, PowerLib
def main():
try:
now = PowerLib.get_time()
timestamp = '{0:04d}{1:02d}{2:02d}{3:02d}{4:02d}{5:02d}'.format(
now['year']+2000,
now['month'],
now['day'],
now['hour'],
now['min'],
now['sec']
)
sensor = SCP1000(sck=2, mosi=3, miso=4, cs=5)
print 'Starting measurement...',
sensor.set_operation_mode(SCP1000.OPERATION_LOW_POWER_SINGLE)
print 'OK'
print 'Waiting for measurement to completed...',
sensor.wait()
print 'OK'
print 'Reading measurement results...',
pressure = sensor.read_pressure()
temperature = sensor.read_temperature()
print 'OK'
print 'Pressure:', pressure
print 'Temperature:', temperature
print 'Connecting to network...',
ConnectLib.connect()
print 'OK'
url = 'http://example.com/status'
param = {
'p':str(pressure),
't':str(temperature),
'ts':timestamp
}
print 'POST', url, param
print 'Sending data...',
data = NDHTTPLib.post(url, payload=param)
if data['status'] == '200':
print 'OK [{}]'.format(data)
else:
print 'FAILED, HTTP result {}'.format(data['status'])
except:
print 'ERROR: ' + str(sys.exc_info()[0]) + ' '+ str(sys.exc_info()[1])
PowerLib.set_alarm_interval(30) # 30分毎に起動
PowerLib.shutdown()
class SCP1000:
OPERATION_NO = 0x00
OPERATION_HIGH_SPEED = 0x09
OPERATION_HIGH_RESOLUTION = 0x0a
OPERATION_ULTRA_LOW_POWER = 0x0b
OPERATION_LOW_POWER_SINGLE = 0x0c
debug = False
def __init__(self, sck=2, mosi=3, miso=4, cs=5, **kwargs):
sck = kwargs.get('sck', sck)
mosi = kwargs.get('mosi', mosi)
miso = kwargs.get('miso', miso)
cs = kwargs.get('cs', cs)
self.spi = NDSPI.NDSPI(sck=sck, mosi=mosi, miso=miso, cs=cs)
self.spi.config(cpol=0, cpha=0, cspol=0)
def set_operation_mode(self,mode):
self.__write_reg(0x03, mode)
def read_pressure(self):
DATARD8 = self.__read_reg(0x1f, 1)
DATARD16 = self.__read_reg(0x20, 2)
pressure = ( ((DATARD8[0]&0x07)<<16) | (DATARD16[0]<<8) | DATARD16[1] ) * 0.25 / 100
return pressure
def read_temperature(self):
TEMPOUT = self.__read_reg(0x21, 2)
temperature = ( ((TEMPOUT[0]&0x1f)<<8) | TEMPOUT[1] ) * 0.05
if (TEMPOUT[0]&0x20) != 0:
temperature = temperature - 409.6
return temperature
def wait(self):
while (self.__read_reg(0x07, 1)[0] & 0x20) == 0:
time.sleep(1E-3)
def __read_reg(self, addr, readlen):
data = [addr<<2]
if SCP1000.debug:
print '>>',
SCP1000.__dump(data)
print
data = self.spi.readwrite(data, readlength=readlen+1)
if SCP1000.debug:
print '<<',
SCP1000.__dump(data)
print
return data[1:]
def __write_reg(self, addr, d):
data = [(addr<<2)|0x02, d]
if SCP1000.debug:
print '>>',
SCP1000.__dump(data)
print
self.spi.readwrite(data, readlength=len(data))
@staticmethod
def __dump(data):
for i in data:
print '{0:02X}'.format(i),
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment