はじめに
先日、Raspberry Pi に FeliCa リーダー・ライター RC-S620/S を接続する という記事において、業務用のFeliCaリーダー・ライターを紹介し、Arduino向けRC-S620/S制御ライブラリをRaspberryPiに移植した話を書きました。
しかしそれはC++用なので、LL全盛の現代においては非常に扱いにくいです。
そこで、RC-S620/S を Pythonから利用できるようにしてみました。
Arduino向けRC-S620/S制御ライブラリをPythonに移植したというお話です。
前提条件
- Python 2.7
- pySerial 3.1
- RC-S620/S
OSは RaspberryPiでもWindowsでもMACでも大丈夫だと思います。(最初、RaspberryPi上で開発してたのですが、途中からWindows上で作業しました)
なお、C++の「Arduino向けRC-S620/S制御ライブラリ」は不要です。
モジュール
2つあります。
まずメインとなる1個め
# !/usr/bin/env python
# -*- coding: utf-8 -*-
"""RC-S620/S sample library for Python"""
from __future__ import print_function
import time
import serial
import stringbin
class Rcs620s:
__RCS620S_MAX_CARD_RESPONSE_LEN = 254
__RCS620S_MAX_RW_RESPONSE_LEN = 265
__RCS620S_DEFAULT_TIMEOUT = 1000
__ser = None # シリアルオブジェクト
idm = None # IDm:polling で設定される。8バイト文字列。
pmm = None # PMm:polling で設定される。8バイト文字列。
def __init__(self):
self.__timeout = self.__RCS620S_DEFAULT_TIMEOUT
def gettimeout(self):
return self.__timeout
def settimeout(self, value):
self.__timeout = value
self.__ser.timeout(value/1000.0)
def initDevice(self, portName):
try:
self.__ser = serial.Serial(port=portName, baudrate=115200, timeout=self.__timeout/1000.0)
except serial.serialutil.SerialException:
return "can't open serial port"
response = self.__rwCommand("\xd4\x32\x02\x00\x00\x00")
if (response != "\xd5\x33" ) :
return "can't open RC-S620/S"
# RFConfiguration (max retries)
response = self.__rwCommand("\xd4\x32\x05\x00\x00\x00")
if (response != "\xd5\x33" ) :
return "can't initialize RC-S620/S"
# RFConfiguration (additional wait time = 24ms)
response = self.__rwCommand("\xd4\x32\x81\xb7")
if (response != "\xd5\x33" ) :
return "can't initialize RC-S620/S"
return ""
def polling(self, systemCode) :
# InListPassiveTarget
buf = "\xd4\x4a\x01\x01\x00" + systemCode + "\x00\x0f"
response = self.__rwCommand(buf)
if (response is None):return False
if (len(response) != 22) :return False
if (not response.startswith("\xd5\x4b\x01\x01\x12\x01")) :return False
self.idm = response[6:6+8]
self.pmm = response[14:14+8]
return True
def cardCommand(self, command) :
if (self.__timeout >= (0x10000 / 2)) :
commandTimeout = 0xffff;
else :
commandTimeout = (self.__timeout * 2);
# CommunicateThruEX
buf = "\xd4\xa0"
buf += stringbin.int2strbinLE(commandTimeout,2)
buf += chr(len(command) + 1)
buf += command
buf = self.__rwCommand(buf)
if (buf is None) :return None
bufLen = len(buf)
if (bufLen < 4) :return None
if (not buf.startswith("\xd5\xa1\x00")) :return None
if (bufLen != (3 + ord(buf[3]))) :return None
return buf[4:]
def rfOff(self) :
# RFConfiguration (RF field)
response = self.__rwCommand("\xd4\x32\x01\x00")
if (response != "\xd5\x33" ) :
return False
return True
def push(self, data) :
# 未テストです
dataLen = len(data)
if (dataLen > 224) :
return False
# Push
buf = "\xb0"+self.idm+chr(dataLen) + data
buf = self.cardCommand(buf)
if ( buf != "\xb1"+self.idm+chr(dataLen) ) :
return False
buf = "\xa4"+self.idm+"\x00"
buf = self.cardCommand(buf)
if ( buf != "\xa5"+self.idm+"\x00") :
return False
time.sleep(1000)
return True
def requestService(self, serviceCode):
u""" サービスの存在確認 """
buf = "\x02" + self.idm + "\x01" + serviceCode
buf = self.cardCommand(buf)
if (buf is None) : return False
if (len(buf) != 12) : return False
if (not buf.startswith("\x03" + self.idm)) : return False
if (buf[10:] == "\xff\xff") : return False
return True
def readWithoutEncryption(self, serviceCode, blockNumber):
u""" 暗号化なしで読む """
buf = "\x06" + self.idm
buf += "\x01" # サービス数
buf += serviceCode
buf += "\x01" # ブロック数(なぜか複数指定するとエラーが返る)
buf += "\x80"
buf += chr(blockNumber)
buf = self.cardCommand(buf)
if (buf is None) :
return None
if (len(buf) != 28) :
return None
if (not buf.startswith("\x07"+self.idm)) :
return None
return buf[12:]
def readWithoutEncryption2(self, serviceCode, blockNumber, length):
u""" 複数ブロックを暗号化なしで読む """
res = ""
for i in range(0, length):
buf = self.readWithoutEncryption(serviceCode, blockNumber+i)
if ( buf != None ) :
res += buf
return res
def readBlock(self, serviceCode, blockNumber, length):
u""" 存在確認してから読む """
if(self.requestService(serviceCode)) :
return self.readWithoutEncryption2(serviceCode, 0, length)
else:
return None
# ------------------------
# private
# ------------------------
def __rwCommand(self, command):
self.__flushSerial();
commandLen = len(command)
dcs = self.__calcDCS(command)
# transmit the command
req = "\x00\x00\xff"
if (commandLen <= 255) :
# normal frame
req += chr(commandLen) + chr((-commandLen)&0xff)
self.__writeSerial(req)
else :
# extended frame
# 未テストです
req += "\xff\xff"
req += chr((commandLen >> 8) & 0xff)
req += chr((commandLen >> 0) & 0xff)
req += chr(self.calfDCS(req[2:]))
self.__writeSerial(req);
self.__writeSerial(command);
req = chr(dcs)+"\x00"
self.__writeSerial(req);
# receive an ACK
res = self.__readSerial(6);
if (res != "\x00\x00\xff\x00\xff\x00") :
self.__cancel()
return None
# receive a response
res = self.__readSerial(5);
if (res == None) :
self.__cancel()
return None
elif ( not res.startswith("\x00\x00\xff") ) :
return None
if ((res[3] == "\xff") and (res[4] == "\xff")) :
# 未テストです
res = self.__readSerial(3)
if (res == None or self.__calcDCS(res) != 0) :
return None
responseLen = (ord(res[5]) << 8) | (ord(res[6]) << 0)
else :
if (self.__calcDCS(res[3:]) != 0) :
return None
responseLen = ord(res[3])
if (responseLen > self.__RCS620S_MAX_RW_RESPONSE_LEN) :
return None
response = self.__readSerial(responseLen)
if (response == None) :
self.__cancel()
return None
dcs = chr(self.__calcDCS(response))
res = self.__readSerial(2)
if (res == None or res[0] != dcs or res[1] != "\x00") :
self.__cancel()
return None
return response
def __cancel(self):
# transmit an ACK
self.__writeSerial("\x00\x00\xff\x00\xff\x00")
time.sleep(0.001);
self.__flushSerial();
# DCS(チェックサム)を計算する
def __calcDCS(self, data):
checkSum = 0;
for c in data:
checkSum += ord(c)
return -checkSum & 0xff
def __writeSerial(self, data) :
self.__ser.write(data);
def __readSerial(self, length) :
data = self.__ser.read(length)
if ( len(data) == length ) :
return data
else :
return None
def __flushSerial(self) :
self.__ser.flush()
2個め。どっかにありそうと思いつつも、探し疲れて作っちゃいました。車輪の再発明かもしれません。
# !/usr/bin/env python
# -*- coding: utf-8 -*-
"""バイナリな文字列を扱います"""
def strbinBE2int(strbin):
summation = 0;
for c in strbin:
summation <<= 8
summation += ord(c)
return summation
def strbinLE2int(strbin):
return strbinBE2int(strbin[::-1])
def int2strbinLE(num,length):
strbin = ""
for _ in range(0, length):
strbin += chr(num & 0xff)
num >>= 8
return strbin
def int2strbinBE(num,length):
return int2strbinLE(num,length)[::-1]
使用例
かざされた Suica,nanaco,WAON,Edy のIDmと残高を表示するプログラムを作ってみました。
実行時にカードがかざされていなければ、何も吐かずに終了します。
SERIAL_PORT_NAME は環境に合わせて書き換えてください。
処理の簡略化のため、システムコード、サービスコードはエンディアンを反転させて指定する必要があります。
# !/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import sys
import stringbin
import rcs620s
COMMAND_TIMEOUT = 250
# serial port
SERIAL_PORT_NAME = "/dev/ttyAMA0" #raspberrypiの場合
SERIAL_PORT_NAME = "COM3" #windowsの場合
def printBalance(card_name, balance):
u""" 残高を表示する """
print("%s %uyen" % (card_name, balance))
def hexdmp(strhex,delimiter) :
u""" 文字列(中身はバイナリ)をHEXダンプする """
result = ""
for c in strhex :
result += c.encode('hex')
result += delimiter
if ( 0<len(delimiter) ) :
# 最後に付けてしまっているdelimiterを取る
result = result[:-len(delimiter)]
return result
if __name__ == '__main__':
rcs620sObj = rcs620s.Rcs620s()
ret = rcs620sObj.initDevice(SERIAL_PORT_NAME)
if (ret!="") :
# 初期化失敗→エラーを吐いて終了
print(ret)
sys.exit(1)
rcs620s.timeout = COMMAND_TIMEOUT
# Suica領域
if(rcs620sObj.polling("\x00\x03")):
print(hexdmp(rcs620sObj.idm,":"))
# Suica PASMO etc
# http://jennychan.web.fc2.com/format/suica.html
buf = rcs620sObj.readBlock("\x8B\x00", 0, 1)
if(buf is not None) :
balance = stringbin.strbinLE2int(buf[11:13])
printBalance("SUICA", balance)
# 共通領域
if(rcs620sObj.polling("\xFE\x00")):
print(hexdmp(rcs620sObj.idm,":"))
# nanaco
buf = rcs620sObj.readBlock("\x97\x55", 0, 1)
if(buf is not None) :
balance = stringbin.strbinLE2int(buf[0:4])
printBalance("nanaco", balance)
# waon
buf = rcs620sObj.readBlock("\x17\x68", 0, 1)
if(buf is not None) :
balance = stringbin.strbinLE2int(buf[0:2])
printBalance("WAON", balance)
# Edy
buf = rcs620sObj.readBlock("\x17\x13", 0, 1)
if(buf is not None) :
balance = stringbin.strbinLE2int(buf[0:4])
printBalance("Edy", balance)
rcs620sObj.rfOff()