Python から FeliCa リーダー・ライター RC-S620/S を使う

  • 19
    Like
  • 0
    Comment
More than 1 year has passed since last update.

はじめに

先日、Raspberry Pi に FeliCa リーダー・ライター RC-S620/S を接続する という記事において、業務用のFeliCaリーダー・ライターを紹介し、Arduino向けRC-S620/S制御ライブラリをRaspberryPiに移植した話を書きました。

RC-S620S.jpg

しかしそれはC++用なので、LL全盛の現代においては非常に扱いにくいです。

そこで、RC-S620/S を Pythonから利用できるようにしてみました。

Arduino向けRC-S620/S制御ライブラリをPythonに移植したというお話です。

前提条件

  • Python 2.7
  • pySerial 3.1
  • RC-S620/S

OSは RaspberryPiでもWindowsでもMACでも大丈夫だと思います。(最初、RaspberryPi上で開発してたのですが、途中からWindows上で作業しました)
なお、C++の「Arduino向けRC-S620/S制御ライブラリ」は不要です。

モジュール

2つあります。
まずメインとなる1個め

rcs620s.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""RC-S620/S sample library for Python"""

from __future__ import print_function

import time
import serial
import stringbin

class Rcs620s:

    __RCS620S_MAX_CARD_RESPONSE_LEN = 254
    __RCS620S_MAX_RW_RESPONSE_LEN = 265

    __RCS620S_DEFAULT_TIMEOUT = 1000

    __ser = None    # シリアルオブジェクト

    idm = None  # IDm:polling で設定される。8バイト文字列。
    pmm = None  # PMm:polling で設定される。8バイト文字列。

    def __init__(self):
        self.__timeout = self.__RCS620S_DEFAULT_TIMEOUT


    def gettimeout(self):
        return self.__timeout

    def settimeout(self, value):
        self.__timeout = value
        self.__ser.timeout(value/1000.0)

    def initDevice(self, portName):

        try:
            self.__ser = serial.Serial(port=portName, baudrate=115200, timeout=self.__timeout/1000.0)
        except serial.serialutil.SerialException:
            return "can't open serial port"

        response = self.__rwCommand("\xd4\x32\x02\x00\x00\x00")
        if (response != "\xd5\x33" ) :
            return "can't open RC-S620/S"

        # RFConfiguration (max retries)
        response = self.__rwCommand("\xd4\x32\x05\x00\x00\x00")
        if (response != "\xd5\x33" ) :
            return "can't initialize RC-S620/S"

        # RFConfiguration (additional wait time = 24ms)
        response = self.__rwCommand("\xd4\x32\x81\xb7")
        if (response != "\xd5\x33" ) :
            return "can't initialize RC-S620/S"

        return ""

    def polling(self, systemCode) :
        # InListPassiveTarget
        buf = "\xd4\x4a\x01\x01\x00" + systemCode + "\x00\x0f"

        response = self.__rwCommand(buf)
        if (response is None):return False
        if (len(response) != 22) :return False
        if (not response.startswith("\xd5\x4b\x01\x01\x12\x01")) :return False

        self.idm = response[6:6+8]
        self.pmm = response[14:14+8]

        return True

    def cardCommand(self, command) :
        if (self.__timeout >= (0x10000 / 2)) :
            commandTimeout = 0xffff;
        else :
            commandTimeout = (self.__timeout * 2);
        # CommunicateThruEX
        buf = "\xd4\xa0"
        buf += stringbin.int2strbinLE(commandTimeout,2)
        buf += chr(len(command) + 1)
        buf += command

        buf = self.__rwCommand(buf)
        if (buf is None) :return None
        bufLen = len(buf)
        if (bufLen < 4) :return None
        if (not buf.startswith("\xd5\xa1\x00")) :return None
        if (bufLen != (3 + ord(buf[3]))) :return None

        return buf[4:]

    def rfOff(self) :
        # RFConfiguration (RF field)
        response = self.__rwCommand("\xd4\x32\x01\x00")
        if (response != "\xd5\x33" ) :
            return False

        return True

    def push(self, data) :
        # 未テストです

        dataLen = len(data)
        if (dataLen > 224) :
            return False

        # Push
        buf = "\xb0"+self.idm+chr(dataLen) + data

        buf = self.cardCommand(buf)
        if ( buf != "\xb1"+self.idm+chr(dataLen) ) :
            return False

        buf = "\xa4"+self.idm+"\x00"
        buf = self.cardCommand(buf)
        if ( buf != "\xa5"+self.idm+"\x00") :
            return False

        time.sleep(1000)

        return True

    def requestService(self, serviceCode):
        u""" サービスの存在確認 """
        buf = "\x02" + self.idm + "\x01" + serviceCode

        buf = self.cardCommand(buf)

        if (buf is None) : return False
        if (len(buf) != 12) : return False
        if (not buf.startswith("\x03" + self.idm)) : return False
        if (buf[10:] == "\xff\xff") : return False

        return True


    def readWithoutEncryption(self, serviceCode, blockNumber):
        u""" 暗号化なしで読む """
        buf = "\x06" + self.idm
        buf += "\x01" # サービス数
        buf += serviceCode
        buf += "\x01" # ブロック数(なぜか複数指定するとエラーが返る)
        buf += "\x80"
        buf += chr(blockNumber)

        buf = self.cardCommand(buf)

        if (buf is None) :
            return None
        if (len(buf) != 28) :
            return None
        if (not buf.startswith("\x07"+self.idm)) :
            return None

        return buf[12:]

    def readWithoutEncryption2(self, serviceCode, blockNumber, length):
        u""" 複数ブロックを暗号化なしで読む """
        res = ""
        for i in range(0, length):
            buf = self.readWithoutEncryption(serviceCode, blockNumber+i)
            if ( buf != None ) :
                res += buf
        return res


    def readBlock(self, serviceCode, blockNumber, length):
        u""" 存在確認してから読む """
        if(self.requestService(serviceCode)) :
            return self.readWithoutEncryption2(serviceCode, 0, length)
        else:
            return None

# ------------------------
# private
# ------------------------

    def __rwCommand(self, command):
        self.__flushSerial();

        commandLen = len(command)

        dcs = self.__calcDCS(command)

        # transmit the command
        req = "\x00\x00\xff"
        if (commandLen <= 255) :
            # normal frame
            req += chr(commandLen) + chr((-commandLen)&0xff)
            self.__writeSerial(req)
        else :
            # extended frame
            # 未テストです
            req += "\xff\xff"
            req += chr((commandLen >> 8) & 0xff)
            req += chr((commandLen >> 0) & 0xff)
            req += chr(self.calfDCS(req[2:]))
            self.__writeSerial(req);

        self.__writeSerial(command);
        req = chr(dcs)+"\x00"
        self.__writeSerial(req);

        # receive an ACK
        res = self.__readSerial(6);
        if (res != "\x00\x00\xff\x00\xff\x00") :
            self.__cancel()
            return None

        # receive a response
        res = self.__readSerial(5);
        if (res == None) :
            self.__cancel()
            return None
        elif ( not res.startswith("\x00\x00\xff") ) :
            return None

        if ((res[3] == "\xff") and (res[4] == "\xff")) :
            # 未テストです
            res = self.__readSerial(3)
            if (res == None or self.__calcDCS(res) != 0) :
                return None
            responseLen = (ord(res[5]) << 8) | (ord(res[6]) << 0)
        else :
            if (self.__calcDCS(res[3:]) != 0) :
                return None
            responseLen = ord(res[3])
        if (responseLen > self.__RCS620S_MAX_RW_RESPONSE_LEN) :
            return None

        response = self.__readSerial(responseLen)
        if (response == None) :
            self.__cancel()
            return None

        dcs = chr(self.__calcDCS(response))

        res = self.__readSerial(2)
        if (res == None or res[0] != dcs or res[1] != "\x00") :
            self.__cancel()
            return None

        return response

    def __cancel(self):
        # transmit an ACK
        self.__writeSerial("\x00\x00\xff\x00\xff\x00")
        time.sleep(0.001);
        self.__flushSerial();

    # DCS(チェックサム)を計算する
    def __calcDCS(self, data):

        checkSum = 0;
        for c in data:
            checkSum += ord(c)
        return -checkSum & 0xff

    def __writeSerial(self, data) :
        self.__ser.write(data);

    def __readSerial(self, length) :

        data = self.__ser.read(length)
        if ( len(data) == length ) :
            return data
        else :
            return None

    def __flushSerial(self) :
        self.__ser.flush()

2個め。どっかにありそうと思いつつも、探し疲れて作っちゃいました。車輪の再発明かもしれません。

stringbin.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""バイナリな文字列を扱います"""

def strbinBE2int(strbin):

    summation = 0;
    for c in strbin:
        summation <<= 8
        summation += ord(c)
    return summation

def strbinLE2int(strbin):
    return strbinBE2int(strbin[::-1])

def int2strbinLE(num,length):
    strbin = ""
    for _ in range(0, length):
        strbin += chr(num & 0xff)
        num >>= 8

    return strbin

def int2strbinBE(num,length):
    return int2strbinLE(num,length)[::-1]

使用例

かざされた Suica,nanaco,WAON,Edy のIDmと残高を表示するプログラムを作ってみました。
実行時にカードがかざされていなければ、何も吐かずに終了します。

SERIAL_PORT_NAME は環境に合わせて書き換えてください。

処理の簡略化のため、システムコード、サービスコードはエンディアンを反転させて指定する必要があります。

getidm.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function

import sys
import stringbin

import rcs620s

COMMAND_TIMEOUT = 250

# serial port
SERIAL_PORT_NAME = "/dev/ttyAMA0" #raspberrypiの場合
SERIAL_PORT_NAME = "COM3" #windowsの場合

def printBalance(card_name, balance):
    u""" 残高を表示する """
    print("%s %uyen" % (card_name, balance))

def hexdmp(strhex,delimiter) :
    u""" 文字列(中身はバイナリ)をHEXダンプする """
    result = ""
    for c in strhex :
        result += c.encode('hex')
        result += delimiter

    if ( 0<len(delimiter) ) :
        # 最後に付けてしまっているdelimiterを取る
        result = result[:-len(delimiter)]

    return result

if __name__ == '__main__':

    rcs620sObj = rcs620s.Rcs620s()

    ret = rcs620sObj.initDevice(SERIAL_PORT_NAME)

    if (ret!="") :
        # 初期化失敗→エラーを吐いて終了
        print(ret)
        sys.exit(1)

    rcs620s.timeout = COMMAND_TIMEOUT

    # Suica領域
    if(rcs620sObj.polling("\x00\x03")):
        print(hexdmp(rcs620sObj.idm,":"))
        # Suica PASMO etc
        # http://jennychan.web.fc2.com/format/suica.html
        buf = rcs620sObj.readBlock("\x8B\x00", 0, 1)
        if(buf is not None) :
            balance = stringbin.strbinLE2int(buf[11:13])
            printBalance("SUICA", balance)

    # 共通領域
    if(rcs620sObj.polling("\xFE\x00")):
        print(hexdmp(rcs620sObj.idm,":"))
        # nanaco
        buf = rcs620sObj.readBlock("\x97\x55", 0, 1)
        if(buf is not None) :
            balance = stringbin.strbinLE2int(buf[0:4])
            printBalance("nanaco", balance)

        # waon
        buf = rcs620sObj.readBlock("\x17\x68", 0, 1)
        if(buf is not None) :
            balance = stringbin.strbinLE2int(buf[0:2])
            printBalance("WAON", balance)

        # Edy
        buf = rcs620sObj.readBlock("\x17\x13", 0, 1)
        if(buf is not None) :
            balance = stringbin.strbinLE2int(buf[0:4])
            printBalance("Edy", balance)

    rcs620sObj.rfOff()