DHT11を使って温度と湿度を測定して、その値をLCD1602(液晶ディスプレイ)に表示させるプログラムです。
main.py
from machine import Pin
import utime as time
from dht import DHT11, InvalidPulseCount
from lcd1602 import LCD
pin = Pin(15, Pin.IN, Pin.PULL_UP)
sensor = DHT11(pin)
lcd = LCD()
time.sleep(5) # initial delay
while True:
try:
sensor.measure()
lcd.clear()
string = "Temperature:{}\nHumidity: {}".format(sensor.temperature, sensor.humidity)
lcd.message(string)
time.sleep(4)
except InvalidPulseCount as e:
print('Bad pulse count - retrying ...')
dht.py
# Reference: https://how2electronics.com/interfacing-dht11-temperature-humidity-sensor-with-raspberry-pi-pico/
import array
import micropython
import utime
from machine import Pin
from micropython import const
class InvalidChecksum(Exception):
pass
class InvalidPulseCount(Exception):
pass
MAX_UNCHANGED = const(100)
MIN_INTERVAL_US = const(200000)
HIGH_LEVEL = const(50)
EXPECTED_PULSES = const(84)
class DHT11:
_temperature: float
_humidity: float
def __init__(self, pin):
self._pin = pin
self._last_measure = utime.ticks_us()
self._temperature = -1
self._humidity = -1
def _measure(self):
current_ticks = utime.ticks_us()
if utime.ticks_diff(current_ticks, self._last_measure) < MIN_INTERVAL_US and (
self._temperature > -1 or self._humidity > -1
):
# Less than a second since last read, which is too soon according
# to the datasheet
return False
self._send_init_signal()
pulses = self._capture_pulses()
buffer = self._convert_pulses_to_buffer(pulses)
self._verify_checksum(buffer)
self._humidity = buffer[0] + buffer[1] / 10
self._temperature = buffer[2] + buffer[3] / 10
self._last_measure = utime.ticks_us()
def measure(self, times=5):
errors = ""
for i in range(times):
try:
self._measure()
break
except Exception as e:
errors += "[Try %s] "%i + str(e) + "\n"
else:
raise InvalidPulseCount("Tried 5 times, but all failed\n%s" % errors)
@property
def humidity(self):
return self._humidity
@property
def temperature(self):
return self._temperature
def _send_init_signal(self):
self._pin.init(Pin.OUT, Pin.PULL_DOWN)
self._pin.value(1)
utime.sleep_ms(50)
self._pin.value(0)
utime.sleep_ms(18)
@micropython.native
def _capture_pulses(self):
pin = self._pin
pin.init(Pin.IN, Pin.PULL_UP)
val = 1
idx = 0
transitions = bytearray(EXPECTED_PULSES)
unchanged = 0
timestamp = utime.ticks_us()
while unchanged < MAX_UNCHANGED:
if val != pin.value():
if idx >= EXPECTED_PULSES:
raise InvalidPulseCount(
"Got more than {} pulses".format(EXPECTED_PULSES)
)
now = utime.ticks_us()
transitions[idx] = now - timestamp
timestamp = now
idx += 1
val = 1 - val
unchanged = 0
else:
unchanged += 1
pin.init(Pin.OUT, Pin.PULL_DOWN)
if idx != EXPECTED_PULSES:
raise InvalidPulseCount(
"Expected {} but got {} pulses".format(EXPECTED_PULSES, idx)
)
return transitions[4:]
def _convert_pulses_to_buffer(self, pulses):
"""Convert a list of 80 pulses into a 5 byte buffer
The resulting 5 bytes in the buffer will be:
0: Integral relative humidity data
1: Decimal relative humidity data
2: Integral temperature data
3: Decimal temperature data
4: Checksum
"""
# Convert the pulses to 40 bits
binary = 0
for idx in range(0, len(pulses), 2):
binary = binary << 1 | int(pulses[idx] > HIGH_LEVEL)
# Split into 5 bytes
buffer = array.array("B")
for shift in range(4, -1, -1):
buffer.append(binary >> shift * 8 & 0xFF)
return buffer
def _verify_checksum(self, buffer):
# Calculate checksum
checksum = 0
for buf in buffer[0:4]:
checksum += buf
if checksum & 0xFF != buffer[4]:
raise InvalidChecksum()
lcd1602.py
import machine
import time
class LCD():
def __init__(self, addr=None, blen=1):
sda = machine.Pin(6)
scl = machine.Pin(7)
self.bus = machine.I2C(1,sda=sda, scl=scl, freq=400000)
#print(self.bus.scan())
self.addr = self.scanAddress(addr)
self.blen = blen
self.send_command(0x33) # Must initialize to 8-line mode at first
time.sleep(0.005)
self.send_command(0x32) # Then initialize to 4-line mode
time.sleep(0.005)
self.send_command(0x28) # 2 Lines & 5*7 dots
time.sleep(0.005)
self.send_command(0x0C) # Enable display without cursor
time.sleep(0.005)
self.send_command(0x01) # Clear Screen
self.bus.writeto(self.addr, bytearray([0x08]))
def scanAddress(self, addr):
devices = self.bus.scan()
if len(devices) == 0:
raise Exception("No LCD found")
if addr is not None:
if addr in devices:
return addr
else:
raise Exception(f"LCD at 0x{addr:2X} not found")
elif 0x27 in devices:
return 0x27
elif 0x3F in devices:
return 0x3F
else:
raise Exception("No LCD found")
def write_word(self, data):
temp = data
if self.blen == 1:
temp |= 0x08
else:
temp &= 0xF7
self.bus.writeto(self.addr, bytearray([temp]))
def send_command(self, cmd):
# Send bit7-4 firstly
buf = cmd & 0xF0
buf |= 0x04 # RS = 0, RW = 0, EN = 1
self.write_word(buf)
time.sleep(0.002)
buf &= 0xFB # Make EN = 0
self.write_word(buf)
# Send bit3-0 secondly
buf = (cmd & 0x0F) << 4
buf |= 0x04 # RS = 0, RW = 0, EN = 1
self.write_word(buf)
time.sleep(0.002)
buf &= 0xFB # Make EN = 0
self.write_word(buf)
def send_data(self, data):
# Send bit7-4 firstly
buf = data & 0xF0
buf |= 0x05 # RS = 1, RW = 0, EN = 1
self.write_word(buf)
time.sleep(0.002)
buf &= 0xFB # Make EN = 0
self.write_word(buf)
# Send bit3-0 secondly
buf = (data & 0x0F) << 4
buf |= 0x05 # RS = 1, RW = 0, EN = 1
self.write_word(buf)
time.sleep(0.002)
buf &= 0xFB # Make EN = 0
self.write_word(buf)
def clear(self):
self.send_command(0x01) # Clear Screen
def openlight(self): # Enable the backlight
self.bus.writeto(self.addr,bytearray([0x08]))
# self.bus.close()
def write(self, x, y, str):
if x < 0:
x = 0
if x > 15:
x = 15
if y < 0:
y = 0
if y > 1:
y = 1
# Move cursor
addr = 0x80 + 0x40 * y + x
self.send_command(addr)
for chr in str:
self.send_data(ord(chr))
def message(self, text):
#print("message: %s"%text)
for char in text:
if char == '\n':
self.send_command(0xC0) # next line
else:
self.send_data(ord(char))