TL;DR
txid一致→時間窓×金額×アドレス一致の優先度で突合し、**二重仕訳+監査ログ(ハッシュ鎖)**まで自動生成します。
依存は標準ライブラリ+SQLiteのみ。1ファイル・数十行で動くため、会計×データ基盤の**“再現可能性”**を担保できます。
未突合はサスペンス勘定に自動送客し、オンチェーン照合率をKPIとして可視化します(会計・監査・CSの共通言語)。
背景と狙い
企業で暗号資産(例:ETH/USDC)を保有・送受金する際、オンチェーン(ウォレット)と取引所(交換業者)で二重管理が発生します。
**どちらが正?という議論ではなく、「同じ事象を互いに説明できるか」**が監査観点では重要です。この記事では、最小構成で多ソース突合→仕訳→監査ログまでを一気通貫で再現します。
全体像(1枚図)
flowchart LR
A[オンチェーンTx] -->|staging| S[正規化]
B[取引所レジャー] -->|staging| S
C[価格フィード(JPY)] -->|日次結合| V[評価]
S --> R[突合エンジン
①txid ②時間窓+金額+アドレス]
R --> J[仕訳生成
101-1/101-2/費用/サスペンス]
J --> H[監査ログ
改ざん耐性(前ハッシュ鎖)]
R --> U[未突合リスト
差異是正/エスカレーション]
V --> J
仕様(最小)
データソース
onchain_tx(txid, ts, from_addr, to_addr, asset, amount)
exchange_ledger(id, ts, side[deposit|withdraw], asset, amount, txid?, from_addr?, to_addr?, fee_asset?, fee_amount?)
price_feed(d, asset, jpy) … 評価用のシンプルな日足
address_map(raw_addr, label, kind[company|exchange|counterparty])
突合ロジック(優先度)
強一致:exchange_ledger.txid = onchain_tx.txid
時間窓一致:金額一致±ε、方向(入出庫)×アドレス整合、±10分以内
会計ロジック(例)
自己保管⇄取引所:101-1 自己保管 と 101-2 取引所保管 の振替
交換業者手数料:601-1 手数料 / 101-2
外部送金・受領(未分類):199-9 サスペンス(出) / 499-9 サスペンス(入)
監査ログ:仕訳行を時系列に連鎖ハッシュ(sha256(行内容||prev_hash))
実装(コピペで実行)
依存:Python3(標準ライブラリのみ)
実行:python3 reconcile_demo.py
reconcile_demo.py
import sqlite3, hashlib
conn = sqlite3.connect(":memory:")
cur = conn.cursor()
cur.executescript("""
PRAGMA foreign_keys=ON;
-- 1) スキーマ
CREATE TABLE onchain_tx (
txid TEXT PRIMARY KEY, ts TEXT, chain TEXT,
from_addr TEXT, to_addr TEXT, asset TEXT, amount REAL
);
CREATE TABLE exchange_ledger (
id TEXT PRIMARY KEY, ts TEXT, exch TEXT, side TEXT,
asset TEXT, amount REAL, txid TEXT, from_addr TEXT, to_addr TEXT,
fee_asset TEXT, fee_amount REAL
);
CREATE TABLE price_feed (d TEXT, asset TEXT, jpy REAL, PRIMARY KEY(d, asset));
CREATE TABLE address_map (raw_addr TEXT PRIMARY KEY, label TEXT, kind TEXT);
-- 2) サンプルデータ(最小)
""")
onchain = [
("0xaaa","2025-09-01 10:00:00","ethereum","0xSelf1","0xExDeposit1","USDC",1000.0),
("0xbbb","2025-09-01 11:01:00","ethereum","0xExHot1","0xSelf1","USDC",500.0),
("0xccc","2025-09-01 12:00:00","ethereum","0xSelf1","0xVendorA","ETH",1.2),
("0xddd","2025-09-01 13:05:00","ethereum","0xCustomerA","0xSelf2","USDC",2000.0),
]
ex_ledger = [
("E1","2025-09-01 10:05:00","Binance","deposit","USDC",1000.0,"0xaaa","0xSelf1","0xExDeposit1",None,0.0),
("E2","2025-09-01 11:00:00","Binance","withdraw","USDC",500.0,"0xbbb",None,"0xSelf1","USDC",1.0),
]
price_feed = [("2025-09-01","USDC",150.0), ("2025-09-01","ETH",450000.0)]
addr_map = [
("0xSelf1","Treasury Wallet 1","company"),
("0xSelf2","Treasury Wallet 2","company"),
("0xExDeposit1","Exchange Deposit","exchange"),
("0xExHot1","Exchange Hot","exchange"),
("0xVendorA","Vendor A","counterparty"),
("0xCustomerA","Customer A","counterparty"),
]
cur.executemany("INSERT INTO onchain_tx VALUES (?,?,?,?,?,?,?)", onchain)
cur.executemany("INSERT INTO exchange_ledger VALUES (?,?,?,?,?,?,?,?,?,?,?)", ex_ledger)
cur.executemany("INSERT INTO price_feed VALUES (?,?,?)", price_feed)
cur.executemany("INSERT INTO address_map VALUES (?,?,?)", addr_map)
3) 突合ビュー(txid一致→時間窓一致)
cur.executescript("""
DROP VIEW IF EXISTS match_by_txid;
CREATE VIEW match_by_txid AS
SELECT e.id AS ex_id, e.side, e.asset, e.amount AS ex_amt, e.ts AS e_ts,
o.ts AS o_ts, o.txid, o.from_addr, o.to_addr, o.amount AS on_amt
FROM exchange_ledger e
JOIN onchain_tx o ON e.txid IS NOT NULL AND e.txid = o.txid;
DROP VIEW IF EXISTS candidates_by_window;
CREATE VIEW candidates_by_window AS
SELECT e.id AS ex_id, e.side, e.asset, e.amount AS ex_amt, e.ts AS e_ts,
o.ts AS o_ts, o.txid, o.from_addr, o.to_addr, o.amount AS on_amt,
ABS(strftime('%s', e.ts) - strftime('%s', o.ts)) AS dt_sec,
CASE WHEN e.side='deposit' AND lower(o.to_addr)=lower(e.to_addr) THEN 1
WHEN e.side='withdraw' AND lower(o.from_addr)=lower(e.to_addr) THEN 1
ELSE 0 END AS addr_match
FROM exchange_ledger e
JOIN onchain_tx o
ON e.txid IS NULL AND e.asset = o.asset
AND ABS(o.amount - e.amount) <= 0.000001;
DROP VIEW IF EXISTS match_by_window;
CREATE VIEW match_by_window AS
SELECT * FROM candidates_by_window WHERE addr_match=1 AND dt_sec <= 600;
DROP TABLE IF EXISTS matches;
CREATE TABLE matches AS
SELECT ex_id, side, asset, ex_amt, e_ts, o_ts, txid, from_addr, to_addr, on_amt, 'txid' AS method
FROM match_by_txid
UNION ALL
SELECT ex_id, side, asset, ex_amt, e_ts, o_ts, txid, from_addr, to_addr, on_amt, 'window' AS method
FROM match_by_window;
DROP VIEW IF EXISTS unmatched_exchange;
CREATE VIEW unmatched_exchange AS
SELECT e.* FROM exchange_ledger e LEFT JOIN matches m ON m.ex_id = e.id WHERE m.ex_id IS NULL;
DROP VIEW IF EXISTS company_wallets;
CREATE VIEW company_wallets AS SELECT raw_addr FROM address_map WHERE kind='company';
DROP VIEW IF EXISTS unmatched_onchain;
CREATE VIEW unmatched_onchain AS
SELECT o.* FROM onchain_tx o LEFT JOIN matches m ON m.txid = o.txid
WHERE m.txid IS NULL AND (
lower(o.from_addr) IN (SELECT lower(raw_addr) FROM company_wallets) OR
lower(o.to_addr) IN (SELECT lower(raw_addr) FROM company_wallets)
);
-- 4) 評価ビュー
DROP VIEW IF EXISTS v_matches_valued;
CREATE VIEW v_matches_valued AS
SELECT m.*, DATE(m.o_ts) AS d, p.jpy AS px_jpy, m.on_amt * p.jpy AS jpy_amount
FROM matches m LEFT JOIN price_feed p ON p.d = DATE(m.o_ts) AND p.asset = m.asset;
-- 5) 仕訳テーブル
DROP TABLE IF EXISTS journal;
CREATE TABLE journal (
j_id TEXT PRIMARY KEY, ts TEXT, debit_acct TEXT, credit_acct TEXT,
asset TEXT, qty REAL, jpy_amount REAL, memo TEXT, link_txid TEXT, link_ex_id TEXT
);
""")
def acct(code, name):
return f"{code} {name}"
5-1) 突合済(自己⇄取引所)の仕訳 + 取引所手数料
for (ex_id, side, asset, ex_amt, e_ts, o_ts, txid, from_addr, to_addr, on_amt, method, d, px, jpy) in
cur.execute("SELECT * FROM v_matches_valued"):
if side == 'deposit':
debit, credit = acct("101-2","Digital Asset - Exchange"), acct("101-1","Digital Asset - Self-custody")
else: # withdraw
debit, credit = acct("101-1","Digital Asset - Self-custody"), acct("101-2","Digital Asset - Exchange")
cur.execute("INSERT INTO journal VALUES (?,?,?,?,?,?,?,?,?,?)",
(f"J-{ex_id}", o_ts, debit, credit, asset, on_amt, jpy,
f"{asset} {side} matched by {method} (ex_id={ex_id}, txid={txid})", txid, ex_id))
# fee(あれば)
fee_asset, fee_amt = cur.execute("SELECT fee_asset, fee_amount FROM exchange_ledger WHERE id=?", (ex_id,)).fetchone()
if fee_amt:
fee_px = cur.execute("SELECT jpy FROM price_feed WHERE d=? AND asset=?", (d, fee_asset)).fetchone()[0]
cur.execute("INSERT INTO journal VALUES (?,?,?,?,?,?,?,?,?,?)",
(f"J-{ex_id}-FEE", e_ts,
acct("601-1","Transaction Fee Expense"), acct("101-2","Digital Asset - Exchange"),
fee_asset, fee_amt, fee_amt*fee_px, f"{fee_asset} fee on exchange {ex_id}", txid, ex_id))
5-2) 未突合オンチェーン(外部⇄自社)をサスペンスへ
def is_company(addr):
return cur.execute("SELECT 1 FROM address_map WHERE lower(raw_addr)=lower(?) AND kind='company'", (addr,)).fetchone() is not None
for (txid, ts, chain, from_addr, to_addr, asset, amount) in cur.execute(
"SELECT txid, ts, chain, from_addr, to_addr, asset, amount FROM unmatched_onchain ORDER BY ts"):
d = ts.split(" ")[0]
px = cur.execute("SELECT COALESCE(jpy,0) FROM price_feed WHERE d=? AND asset=?", (d, asset)).fetchone()[0]
jpy = amount * px
if is_company(from_addr) and not is_company(to_addr): # 外部へ出金
debit, credit = acct("199-9","Suspense (Unclassified Outflow)"), acct("101-1","Digital Asset - Self-custody")
memo = f"{asset} onchain outflow to external (txid={txid})"
elif is_company(to_addr) and not is_company(from_addr): # 外部から入金
debit, credit = acct("101-1","Digital Asset - Self-custody"), acct("499-9","Suspense (Unclassified Inflow)")
memo = f"{asset} onchain inflow from external (txid={txid})"
else: # 内部/不明
debit, credit = acct("199-9","Suspense (Unclassified Outflow)"), acct("499-9","Suspense (Unclassified Inflow)")
memo = f"{asset} ambiguous movement (txid={txid})"
cur.execute("INSERT INTO journal VALUES (?,?,?,?,?,?,?,?,?,?)",
(f"J-{txid}", ts, debit, credit, asset, amount, jpy, memo, txid, None))
6) 監査ログ(ハッシュ鎖)
cur.executescript("DROP TABLE IF EXISTS audit_log; CREATE TABLE audit_log (seq INTEGER PRIMARY KEY AUTOINCREMENT, j_id TEXT, ts TEXT, hash TEXT, prev_hash TEXT);")
prev = ""
for row in cur.execute("SELECT j_id, ts, debit_acct, credit_acct, asset, qty, jpy_amount, IFNULL(link_txid,''), IFNULL(link_ex_id,'') FROM journal ORDER BY ts, j_id"):
s = "|".join(map(str,row)) + "|" + prev
h = hashlib.sha256(s.encode()).hexdigest()
cur.execute("INSERT INTO audit_log(j_id,ts,hash,prev_hash) VALUES (?,?,?,?)", (row[0],row[1],h,prev))
prev = h
7) KPI(突合率)
m_on_cnt, m_on_jpy = cur.execute("""
SELECT COUNT(), COALESCE(SUM(on_amt * p.jpy),0)
FROM matches m LEFT JOIN price_feed p ON p.d=DATE(m.o_ts) AND p.asset=m.asset
""").fetchone()
t_on_cnt, t_on_jpy = cur.execute("""
SELECT COUNT(), COALESCE(SUM(o.amount * p.jpy),0)
FROM onchain_tx o LEFT JOIN price_feed p ON p.d=DATE(o.ts) AND p.asset=o.asset
""").fetchone()
print(f"[coverage] onchain: count {m_on_cnt}/{t_on_cnt}, JPY {int(m_on_jpy)}/{int(t_on_jpy)}")
8) ダンプ(学習用)
print("\n[journal]")
for r in cur.execute("SELECT ts, debit_acct, credit_acct, asset, qty, jpy_amount, memo FROM journal ORDER BY ts"): print(r)
print("\n[audit head]")
for r in cur.execute("SELECT seq, j_id, substr(hash,1,16) FROM audit_log LIMIT 5"): print(r)
9) 最低限の検証
assert m_on_cnt == 2, "突合件数が想定と異なります"
print("\nOK")
実行結果(例)
[coverage] onchain: count 2/4, JPY 225000/1065000
[journal]
('2025-09-01 10:00:00', '101-2 Digital Asset - Exchange', '101-1 Digital Asset - Self-custody', 'USDC', 1000.0, 150000.0, 'USDC deposit to exchange matched by txid (ex_id=E1, txid=0xaaa)')
('2025-09-01 11:00:00', '601-1 Transaction Fee Expense', '101-2 Digital Asset - Exchange', 'USDC', 1.0, 150.0, 'USDC fee on exchange E2')
('2025-09-01 11:01:00', '101-1 Digital Asset - Self-custody', '101-2 Digital Asset - Exchange', 'USDC', 500.0, 75000.0, 'USDC withdraw from exchange matched by txid (ex_id=E2, txid=0xbbb)')
('2025-09-01 12:00:00', '199-9 Suspense (Unclassified Outflow)', '101-1 Digital Asset - Self-custody', 'ETH', 1.2, 540000.0, 'ETH onchain outflow to external (txid=0xccc)')
('2025-09-01 13:05:00', '101-1 Digital Asset - Self-custody', '499-9 Suspense (Unclassified Inflow)', 'USDC', 2000.0, 300000.0, 'USDC onchain inflow from external (txid=0xddd)')
[audit head]
(1, 'J-E1', '488a59ec53567cbd')
(2, 'J-E2-FEE', '517085b95452e87f')
(3, 'J-E2', 'f53d42bd2851333a')
(4, 'J-0xccc', '96a44e152c6ec913')
(5, 'J-0xddd', '8f04df4040c3b927')
OK
設計ポイント(なぜこの形にしたか)
再現性:SQLite・標準ライブラリで完結。監査説明で「このコードで同じ結果が出る」を担保。
優先度付き突合:まずtxid、次に時間窓×金額×方向×アドレス。誤検出率と捕捉率のバランスを運用で調整可能。
監査ログ:連鎖ハッシュは「改ざんがあれば必ず痕跡が残る」を実演する最低限の方法。
運用Tips(スケール/監査/コスト)
スケール:本番はBigQuery/DuckDBを推奨。日付パーティション×assetクラスタでスキャン費用を抑制。
一意性:event_uid = source + primary_key を論理IDに、冪等(idempotent)なアップサートを徹底。
ルール外だが重要な実務
グロス/ネット表記の統一(手数料控除の扱い)。
評価レートの出所・時点(EOD/加重平均など)を会計方針で固定。
内部移転自動識別(address_map.kind='company'が両端にある場合は“社内振替”)。
KPI例:オンチェーン照合率(件数/金額)、サスペンス滞留日数、監査指摘件数。
よくある質問(短問短答)
Q. 取引所側のtxidが欠ける場合は?
A. 本稿の時間窓一致が効きます。窓幅と金額誤差εは運用で調整し、重複候補は最短時差優先で確定します。
Q. ガス代(ETH)と手数料(USDC等)が混在しますが?
A. 本稿では取引所手数料のみ実装。オンチェーン手数料は支払資産の減少として601-1/101-1の仕訳を追加してください。
Q. 会計基準(JGAAP/IFRS)差分は?
A. 科目や評価の方針は会社の会計方針に従って下さい。ここでは技術実装の再現性に絞っています。
Q. 制裁・リスクの検知は?
A. address_mapに制裁フラグを持たせ、未突合のうち高リスク宛先を優先エスカレーションするのが実務的です。
付録(dbt/SQLへの展開ヒント)
models/staging/:onchain_tx, exchange_ledger, price_feed, address_map
models/mart/matches.sql:本稿のmatchesをdbt化
models/mart/journal.sql:会計テンプレート(科目マッピング)をYAMLで外出し
tests/:突合件数/金額の期待値をdbt testsで自動検証
免責
本記事は一般的な技術情報の提供を目的とし、法務・会計・税務の助言ではありません。実際の手続・設定・会計処理は、社内規程・顧問専門家および最新の一次資料をご確認ください。
