郵便番号データ(CSV形式)
郵便局のウェブサイトでは郵便番号CSVが公開されており、ダウンロードして利用する事で郵便番号から住所や事業者を取得する事が出来ます。
これらのデータは「著作権を主張せず再配布も自由」と明記してありますので、テストデータに活用してみるのに良さそうです。
郵便番号CSVはその名の通りCSV形式ではありますが、そのままでは郵便番号データベースとして利用する事が出来ません。
具体的には、以下の様な問題(ご利用上のご注意)があります。
- 文字数が多い住所は複数の情報に跨って記述されている。
- 一つの郵便番号に複数の住所が割り当てられているものがある。
- 住所データとして表示をさせた際に、残念な表記 (「以下に掲載がない場合」という表記) が含まれる。
テストデータとして使うのであれば、そのまま読み込んでしまっても全く問題ないのですけど、ちょっとスクリプトで読み込ませて多少はマシな形式に変換してみます。
ついでに郵便番号CSV(住所・カナ表記)と郵便番号CSV(住所・ローマ表記)を結合させます。
conv_zipcode_jp.py
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------ import(s)
import sys
import re
import csv
import json
import collections
import unicodedata
# ------------------------------------------------------------------- class(s)
class CZipcodeRecord(object):
class CAddress(object):
prefecture = ""
municipality = ""
area = ""
def __str__(self):
return "|".join([self.prefecture, self.municipality, self.area])
def get_list(self):
if len(self.area) > 0:
return [self.prefecture, self.municipality, self.area]
elif len(self.municipality) > 0:
return [self.prefecture, self.municipality]
else:
return [self.prefecture]
def is_splitted_be(self):
if re.search("(.*?", self.area) is not None:
if re.search("(.*?)", self.area) is None:
return True
return False
def is_splitted_en(self):
if re.search(".*?)", self.area) is not None:
if re.search("(.*?)", self.area) is None:
return True
return False
def __init__(self, src_record, dict_zipcode_roma):
r = [v.decode("cp932").encode("utf-8") for v in src_record]
self.jiscode = r[0]
self.zipcode_5 = r[1]
self.zipcode_7 = r[2]
self.address = self.CAddress()
self.address.prefecture = r[6]
self.address.municipality = r[7]
self.address.area = r[8]
self.address_kana = self.CAddress()
self.address_kana.prefecture = norm_process(r[3])
self.address_kana.municipality = norm_process(r[4])
self.address_kana.area = norm_process(r[5])
self.address_rome = self.CAddress()
self.flag_attrib = "".join(r[9:13])
self.flag_update = "".join(r[12:15])
try:
rome = dict_zipcode_roma[self.zipcode_7]
if self.address.prefecture == rome[1]:
self.address_rome.prefecture = rome[4]
if self.address.municipality == rome[2]:
self.address_rome.municipality = rome[5]
if self.address.area == rome[3]:
self.address_rome.area = rome[6]
except KeyError:
pass
if self.address.area.find("以下に掲載がない場合") == 0:
self.address.area = ""
self.address_kana.area = ""
self.address_rome.area = ""
def key(self):
return (self.zipcode_7)
# ============================================================================
def norm_process(src_text):
utext = unicodedata.normalize("NFKC", src_text.decode("utf-8"))
for src_v, dst_v in zip(u"0123456789()", u"0123456789()"):
utext = utext.replace(src_v, dst_v)
return utext.encode("utf-8")
# ============================================================================
def build_rome_dic(rome_pathname):
pattern_rome = re.compile("[A-Z0-9\\(\\)]")
dict_result = collections.OrderedDict()
with open(rome_pathname, "r") as h_rfile:
for r in csv.reader(h_rfile):
list_record = [v.decode("cp932").encode("utf-8").replace(" " , "") for v in r]
if len([v for v in list_record[4:] if pattern_rome.search(v) is not None]):
dict_result[list_record[0]] = list_record
return dict_result
# ============================================================================
def main():
dict_zip_roma = build_rome_dic(sys.argv[2])
dict_zip = collections.OrderedDict()
nRecordCount = 0
with open(sys.argv[1], "r") as h_rfile:
for r in csv.reader(h_rfile):
o = CZipcodeRecord(r, dict_zip_roma)
if o.key() not in dict_zip:
dict_zip[o.key()] = [o]
nRecordCount += 1
if o.address.is_splitted_be() is True:
n_rpart = 1
else:
n_rpart = 0
else:
if n_rpart == 0:
dict_zip[o.key()].append(o)
nRecordCount += 1
if o.address.is_splitted_en() is True:
print "addres parser error"
sys.exit()
elif n_rpart == 1:
dict_zip[o.key()][0].address.area += o.address.area
dict_zip[o.key()][0].address_kana.area += o.address_kana.area
dict_zip[o.key()][0].address_rome.area += o.address_rome.area
if o.address.is_splitted_be() is True:
print "addres parser error"
sys.exit()
elif o.address.is_splitted_en() is True:
n_rpart -= 1
else:
print "addres parser error"
sys.exit()
# json で出力する場合の例
"""
{
"0000000": {
"country": "JP",
"address_list": [
{
"ja": [],
"ja_kana": [],
"ja_rome": []
}
]
}
}
"""
nProgress = 0
dictExport = {}
for k, list_o in dict_zip.items():
list_address = []
for o in list_o:
list_address.append(
{
"ja": o.address.get_list(),
"ja_kana": o.address_kana.get_list(),
"ja_rome": o.address_rome.get_list()
}
)
nProgress += 1
dictExport[k] = {
"country": "JP",
"address_list": list_address
}
print json.dumps(dictExport, indent=True)
# -----------------------------------------------------------------------[EOF]
この実装の問題点について
前述した問題のうち、 文字数が多い住所は複数の情報に跨って記述されている。 という点の解決を正しく実装していません。
今回の実装では、
- 文字列に「(」が含まれているが「)」は含まれていないものを検出する。
- 見つかった場合は、「(」が含まれていないが「)」が含まれている住所が検出されるまで文字連結をする。
といった単純処理をしています。
もしかすると正しくない文字連結をしてしまっている場所があるかもしれません。
利用方法
一つ目の引数に 「住所の郵便番号」 、二つ目の引数に 「住所の郵便番号(ローマ字)」 を指定します。
実行結果は標準出力にJSON形式で書き出されます。
# こんな感じでの使用を想定しています。
$ python conv_zipcode_jp.py KEN_ALL.CSV KEN_ALL_ROME.CSV