8
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Raspberry Piで水質測定(DO、pH、ORPなど)コード編

Last updated at Posted at 2018-07-29
  • 溶存酸素(DO)を測定。pHとかORPとかもコード内のセンサーのアドレスを書き換えれば基本的には同じだと思われます。
  • 測定したデータをThingspeakというIoT可視化サービスで可視化。

回路編:https://qiita.com/fftcy-sttkm/items/eb4c4c64c5aba454cf55

下記参考(センサーメーカーのサンプルコード)。※↓のサンプルコードにIoT可視化サービスによる可視化のコードを加えた。
https://github.com/AtlasScientific/Raspberry-Pi-sample-code/blob/master/i2c.py
https://myhydropi.com/connecting-a-ph-sensor-to-a-raspberry-pi
↓は買ったセンサー。個人輸入しました。pHとかORP計とかもあります。
https://www.atlas-scientific.com/product_pages/kits/do_kit.html

↓のコードでは、センサーアドレスとThingspeakのAPIキーを自分用に書き換えて使う。「CAL」をコマンドに入力してキャリブレーション、「R」で一回計測、「Poll,●●」で●●秒おきに連続して計測、Thingspeakに送信。Ctrl+Cでストップ。
※2点校正がオプションであるが、めんどいので1点校正。

import io  # used to create file streams
import fcntl  # used to access I2C parameters like addresses
import time  # used for sleep delay and timestamps
import string  # helps parse strings
import requests

class atlas_i2c:
    long_timeout = 5  # the timeout needed to query readings and calibrations
    short_timeout = 5  # timeout for regular commands
    default_bus = 1  # the default bus for I2C on the newer Raspberry Pis,
                     #certain older boards use bus 0
    default_address = 97  # 各種センサーに応じたアドレスを記載。データシートに書いてある。
    url = "https://api.thingspeak.com/update"
    key = "●●●●●自分のAPIキー"

    def __init__(self, address=default_address, bus=default_bus):
        # open two file streams, one for reading and one for writing
        # the specific I2C channel is selected with bus
        # it is usually 1, except for older revisions where its 0
        # wb and rb indicate binary read and write
        self.file_read = io.open("/dev/i2c-" + str(bus), "rb", buffering=0)
        self.file_write = io.open("/dev/i2c-" + str(bus), "wb", buffering=0)

        # initializes I2C to either a user specified or default address
        self.set_i2c_address(address)

    def set_i2c_address(self, addr):
        # set the I2C communications to the slave specified by the address
        # The commands for I2C dev using the ioctl functions are specified in
        # the i2c-dev.h file from i2c-tools
        I2C_SLAVE = 0x703
        fcntl.ioctl(self.file_read, I2C_SLAVE, addr)
        fcntl.ioctl(self.file_write, I2C_SLAVE, addr)

    def write(self, string):
        # appends the null character and sends the string over I2C
        string += "\00"
        self.file_write.write(string)

    def read(self, num_of_bytes=31):
        # reads a specified number of bytes from I2C,
        #then parses and displays the result
        res = self.file_read.read(num_of_bytes)  # read from the board
        response = filter(lambda x: x != '\x00', res)
        # remove the null characters to get the response
        if(ord(response[0]) == 1):  # if the response isnt an error
            char_list = map(lambda x: chr(ord(x) & ~0x80), list(response[1:]))
            # change MSB to 0 for all received characters except the first
            #and get a list of characters
            # NOTE: having to change the MSB to 0 is a glitch in the raspberry
            #pi, and you shouldn't have to do this!
            return "Command succeeded " + ''.join(char_list)
            # convert the char list to a string and returns it
        else:
            return "Error " + str(ord(response[0]))

    def query(self, string):
        # write a command to the board, wait the correct timeout,
        #and read the response
        self.write(string)

        # the read and calibration commands require a longer timeout
        if((string.upper().startswith("R")) or
           (string.upper().startswith("CAL"))):
            time.sleep(self.long_timeout)
        elif((string.upper().startswith("SLEEP"))):
            return "sleep mode"
        else:
            time.sleep(self.short_timeout)

        return self.read()

    def close(self):
        self.file_read.close()
        self.file_write.close()

    def send_data(self,value):
        params = {'api_key':atlas_i2c.key,'field1':value}
        requests.get(atlas_i2c.url,params=params)


def main():
    device = atlas_i2c()  # creates the I2C port object, specify the address
                          # or bus if necessary
    print ">> Atlas Scientific sample code"
    print ">> Any commands entered are passed to the board via I2C except:"
    print (">> Address,xx changes the I2C address the Raspberry Pi "
    "communicates with.")
    print (">> Poll,xx.x command continuously polls the board every "
    "xx.x seconds")
    print (" where xx.x is longer than the %0.2f second "
    "timeout." % atlas_i2c.long_timeout)
    print " Pressing ctrl-c will stop the polling"

    # main loop
    while True:
        myinput = raw_input("Enter command: ")

        # address command lets you change which address
        # the Raspberry Pi will poll
        if(myinput.upper().startswith("ADDRESS")):
            addr = int(string.split(myinput, ',')[1])
            device.set_i2c_address(addr)
            print ("I2C address set to " + str(addr))

        # contiuous polling command automatically polls the board
        elif(myinput.upper().startswith("POLL")):
            delaytime = float(string.split(myinput, ',')[1])

            # check for polling time being too short,
            # change it to the minimum timeout if too short
            if(delaytime < atlas_i2c.long_timeout):
                print ("Polling time is shorter than timeout, setting "
                "polling time to %0.2f" % atlas_i2c.long_timeout)
                delaytime = atlas_i2c.long_timeout

            # get the information of the board you're polling
            info = string.split(device.query("I"), ",")[1]
            print ("Polling %s sensor every %0.2f seconds, press ctrl-c "
            "to stop polling" % (info, delaytime))

            try:
                while True:
                    result = device.query("R")
                    value = float(result[18:])
                    print(result)
                    device.send_data(value)
                    time.sleep(delaytime - atlas_i2c.long_timeout)
            except KeyboardInterrupt:  # catches the ctrl-c command,
                                       # which breaks the loop above
                print "Continuous polling stopped"

        # if not a special keyword, pass commands straight to board
        else:
            try:
                print device.query(myinput)
            except IOError:
                print "Query failed"


if __name__ == '__main__':
    main()

↓可視化の様子。電気つけたとたん、光合成して溶存酸素が急上昇。
https://thingspeak.com/channels/344619

8
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?