前回 Bubble Babble エンコーダを例に、 Haskell でループ処理を行う例を挙げた。せっかくエンコーダを書いたので、今度はデコーダを書いてみることにする。
Bubble Babble は元々バイナリデータのエンコード手法なので、エンコーダの方は理論的には1失敗する要素がなかった。ところがデコーダの場合は必ずしもデコードが可能とは限らず、途中で不正な入力により処理を中断する可能性がある点が異なる。今回はエラーに関しては簡易的に String
でメッセージを伝えることとし、 decode :: B.ByteString -> Either String B.ByteString
として実装してみよう。
事前準備
アルゴリズムの確認
仕様書を再掲する。
デコードの仕様は以下の通りだ。
Decoding is obviously the process of encoding reversed.
デコードは明らかにエンコード処理を逆転させたものである。
……。
これ、アレかな? 数学書とかで「自明である」って書いてあるけど全然自明に思えないヤツかな??
とはいえあくまで例題として取り上げているだけなことに変わりはないので、深くは立ち入らないことにしよう。実装例は掲載するので詳細が気になる人はコードを確認して欲しい。
チェックサムの検証
デコードに関してはチェックサムの検証がある。せっかくアルゴリズムとして組み込まれているので、チェックサムを確認しつつ、誤りが検出されたらエラーとして Left
を返却するようにしたい。
仕様書を確認すると、
To check the checksums, when a tuple <a, b, c, d, e> or partial
tuple <a, b, c> has been recovered from the encoded string, an
implementation should check that ((a - C[i]) mod 6) < 4 and that
((c - C[i]) mod 6) < 4. Otherwise the encoded string is not a valid
encoding of any data and should be rejected.
簡易的に訳すと
タプル
<a, b, c, d, e>
または部分タプル<a, b, c>
がエンコード文字列から復元されたとき、実装は((a - C[i]) mod 6) < 4
かつ((c - C[i]) mod 6) < 4
であることを確認するべきである。そうでないならば、その入力は無効であり拒否するべきである。
…とあるのだが、実はこの説明は間違っている。アルゴリズムの詳細には立ち入らないつもりであったにも関わらず、如何せんこの仕様の解読に苦労したので、顛末を記しておく。あくまで表題の Haskell でループする話を読みたい方は飛ばしてほしい。
まず、仕様書の例である 0 バイト列 xexax
で確認してみる。最初と最後の x
は固定のマーカーなので、実体は exa
の部分タプルのみである。ここで 'e'
は 1
、 'a'
は 0
であるから、部分タプルとしては <1, 16, 0>
が復元され、 a = 1
c = 0
が分かる。チェックサム C[1]
は 1
だ。さて検証すると
-
((a - C[i]) mod 6) = ((1 - 1) mod 6) = (0 mod 6) = 0 < 4
(OK) -
((c - C[i]) mod 6) = ((0 - 1) mod 6) = (-1 mod 6) = 5 < 4
(NG)
となりあっさり NG となってしまう。明らかにおかしい。
もしかして法(modulo)ではなく剰余(remainder)なのではないかとも疑った。であれば c
のほうも -1
になるので 4 未満は満たす。しかし他の例でこれも通らないことがすぐに確認できる。それにしても、この 4 というのは何だろう?
闇雲にやっていても仕方がないので、元々 a
c
には何が入っていたのか確認してみると、以下のいずれかである(入力 D
の添え字は省略した)。
a = (((D >> 6) & 3) + C[i] ) mod 6
c = (((D ) & 3) + _|C[i] / 6|_) mod 6
a = C[i] mod 6
c = _|C[i] / 6|_
これは結局、 a
と c
には入力の一部を 2 ビットずつに分けて格納しており、そこにチェックサム C
を 6 で割った商と余りに分けて、それぞれに加えていることを意味する。入力長が偶数で「余りの 1 文字」が存在しない場合は、チェックサムそのものが分けて格納されているだけだ。元々の 4 とはつまり、入力からは 2 ビットしか入ってこないのだから、加算されたチェックサム C
を取り除いたら必然的に 2 ビット=4 未満となるはずだ、ということが言いたいのだ。
従って、確認すべきは恐らく以下の通りである。
(a - (C[i] mod 6 )) mod 6 < 4
(c - (_|C[i] / 6|_)) mod 6 < 4
入力長が偶数のときの最後の部分タプルについては厳密に 0 であることを確認することも可能だ。
これが正しいのかどうかはわからない。ただ、いくつか正常なパターン、エラーがあるパターンで確認してみる限り、正常なパターンを異常と判断することはなかったし、エラーがあるパターンも多くの場合でエラーを検出できていることが確認できているので、完全に間違っているというわけでもなさそうだ。ということで今回はこれを用いる。
実装
前準備
まずデコードで利用するライブラリと各関数群を定義しておく。これから実装する例ではいずれもこれらを利用可能とする2。
import Control.Monad
import Data.Bits ((.|.), (.&.), shiftL, shiftR)
import Data.Functor
import Data.List (mapAccumL)
import Data.Monoid ((<>))
import Data.Word
import qualified Data.ByteString as B
import qualified Data.ByteString.Builder as B
import qualified Data.ByteString.Char8 as C
import qualified Data.ByteString.Lazy as BL
voels, consonants :: B.ByteString
voels = C.pack "aeiouy"
consonants = C.pack "bcdfghklmnprstvzx"
safeIdx :: B.ByteString -> Char -> Either String Word8
safeIdx lst x = case C.elemIndex x lst of
Just i -> Right $ fromIntegral i
Nothing -> Left $ "Invalid input: " ++ [x]
safeCc :: Char -> Either String Word8
safeCc = safeIdx consonants
safeVv :: Char -> Either String Word8
safeVv = safeIdx voels
safeFirstByte :: Word8 -> Char -> Char -> Char -> Either String Word8
safeFirstByte s a b c = do
ai <- safeVv a
bi <- safeCc b
ci <- safeVv c
let s1 = mod (ai + (6 - mod s 6)) 6
let s2 = mod (ci + (6 - div s 6)) 6
return $ shiftL s1 6 .|. shiftL bi 2 .|. s2
safePairByte :: Word8 -> Char -> Char -> Char -> Char -> Char -> Either String (Word8, [Word8])
safePairByte s a b c d e = do
t <- safeFirstByte s a b c
di <- safeCc d
ei <- safeCc e
let u = shiftL di 4 .|. ei
let [si, ti, ui] = map fromIntegral [s, t, u] :: [Int]
let ns = fromIntegral $ mod (si*5 + ti*7 + ui) 36
return (ns, [t, u])
validator :: (Int -> Int -> Int -> Bool) -> Word8 -> Char -> Char -> Either String ()
validator checker ws wa wc = do
let s = fromIntegral ws
a <- fromIntegral <$> safeVv wa
c <- fromIntegral <$> safeVv wc
unless (checker s a c) $ Left "Invalid checksum"
validate, strictValidate :: Word8 -> Char -> Char -> Either String ()
validate = validator $ \s a c -> mod (a - mod s 6) 6 < 4 && mod (c - div s 6) 6 < 4
strictValidate = validator $ \s a c -> a == mod s 6 && c == div s 6
共通関数はいずれも Either String a
を利用して異常があった場合に Left
を返すようにしてある。
再帰
デコーダももちろん再帰で書ける。高階関数が使えるに越したことはないと思うが、無理に使おうとして苦労しそうなら原始の再帰で力押ししてしまうのも手である。実際に書いてみるとこんな感じだろうか。
decode :: B.ByteString -> Either String B.ByteString
decode input = case C.uncons input of
Just ('x', str) -> BL.toStrict . B.toLazyByteString
<$> loop 1 (C.unpack str)
_ -> Left "Invalid input: Input should begin with 'x'"
where
loop :: Word8 -> String -> Either String B.Builder
loop s [a,'x',c,'x'] = do
strictValidate s a c
return mempty
loop s [a,b,c,'x'] = do
validate s a c
r <- safeFirstByte s a b c
return $ B.word8 r
loop s (a:b:c:d:'-':e:rs) = do
validate s a c
(ns, r) <- safePairByte s a b c d e
(mconcat (map B.word8 r) <>) <$> loop ns rs
loop _ _ = Left "Invalid input"
Bubble Babble のデコーダは最初と最後の x
を除いて 6 文字ずつ、最後だけ 3 文字を処理する形になる。ぱっと見とは実は区切りが違うのだ。仕様書に乗っている例だとこんな区切りになる。 -
に騙されてはいけない。
x | igak-n | yryk-h | umil-b | osek-s | ona | x
このパターンマッチを適用するために B.ByteString
で受け取った入力をいったんリストに分解している。こうすることでパターンマッチで書くことができるので楽になる。 loop
の最初の二つは終了条件のパターンマッチで、三つ目が通常のループである。それらにマッチしなければ何かがおかしいので Left
を返す。
Either e
はそのモナド能力として計算中断能力を持っているが、この例では再帰自身もループの終了に関与しているので少し分かりづらい。まずループの「終了」は再帰自身が管理していて、正しく最後までデコードできた場合は再帰呼び出しをしないことでループを終える。
一方、例えば validate
や safeFirstByte
が Left
を返した場合においては Either e
の計算中断能力も関係してくる。例えば以下の部分で validate s a c
が Left
を返したとすると、 Either e
の >>=
の定義によりそれ以降の safePairByte
等の呼び出しは行われない。
do
validate s a c
(ns, r) <- safePairByte s a b c d e
(mconcat (map B.word8 r) <>) <$> loop ns rs
その結果、再帰呼び出し loop ns rs
も呼び出されないためループ自体が中断されることになる。
foldl
再帰を使えば何でもできるが、やはり高階関数が使えるのであれば積極的に使っていきたい。 ByteString
には foldl
が用意されているので利用を検討してみよう。
今回 6 文字ずつ処理を行いたいわけだが、 fold
では 1 文字ずつ呼び出しが行われてしまう。仕方がないのでエンコーダでもやったように、状態を保存しつつ状況が揃った場合にそれぞれの処理を行うように書いてみよう。
data DecodeState
= DecodeState
{ _dChecksum :: Word8
, _dBuffer :: String
, _dResult :: B.Builder
}
decode :: B.ByteString -> Either String B.ByteString
decode input = case C.uncons input of
Just ('x', str) -> fmap (BL.toStrict . B.toLazyByteString . _dResult)
. finalize . C.foldl f (return $ DecodeState 1 [] mempty) $ str
_ -> Left "Invalid input: Input should begin with 'x'"
where
f :: Either String DecodeState -> Char -> Either String DecodeState
f ds@(Left _) _ = ds
f (Right (DecodeState s [c,'x',a] res)) 'x' =
strictValidate s a c $> DecodeState s [] res
f (Right (DecodeState s [c,b,a] res)) 'x' = do
validate s a c
r <- safeFirstByte s a b c
return . DecodeState s [] $ res <> B.word8 r
f (Right (DecodeState s ['-',d,c,b,a] res)) e = do
validate s a c
(ns, r) <- safePairByte s a b c d e
return . DecodeState ns [] $ res <> mconcat (map B.word8 r)
f (Right (DecodeState _ [_,_,_,_,_] _)) _ = Left "Invalid input"
f (Right s@(DecodeState _ buf _)) a = return $ s { _dBuffer = a:buf }
finalize :: Either String DecodeState -> Either String DecodeState
finalize ds@(Left _) = ds
finalize ds@(Right (DecodeState _ [] _)) = ds
finalize _ = Left "Invalid input"
一文字ずつ読み取り、 6 文字のセットができるまでは _dBuffer
に入力を溜めていく。溜めているのは f (Right s@(DecodeState _ buf _)) a = return $ s { _dBuffer = a:buf }
の部分だ。 _dBuffer
に 5 文字溜まり、 6 文字目が来たところでパターンマッチをかけて適切な処理を行うという方針である。 a:buf
として溜めているのでパターンマッチ時には順序が逆転していることに注意する。
途中で不正な入力がきたりチェックサムの検証に失敗したりすると Left
が発生するが、残念なことに foldl
ではループの中断が行えない。途中で Left
が発生した場合は f ds@(Left _) _ = ds
のパターンマッチに引っかかり、そのときの Left
値を持ったまま最後まで読み飛ばすことになる。
foldr
foldl
ではループの中断は行えないが、実は foldr
なら可能である。どうするのかというと、継続渡しをする。 foldr
の型は
(a -> b -> b) -> b -> [a] -> b
だが3 、ここで b
に c -> d
を渡すと次のようになる:
(a -> (c -> d) -> c -> d) -> (c -> d) -> [a] -> c -> d
任意の型は当然引数を取ったって良いのだが、慣れないとなかなか奇妙に見える。
さてへんてこな型になったわけだが、基本的なところから順番に確認していこう。ふつうに使う場合のおさらいをすると、 foldr f 0 [1, 2, 3]
は f 1 (f 2 (f 3 0))
と展開される。これを踏まえたうえで b
に c -> d
を渡す場合は、例えば
foldr f finalize [1, 2, 3] 0
と書けば、以下のように展開される。
f 1 (f 2 (f 3 finalize)) 0
つまり f item next state =
と書いたとき、最初の呼び出しは item
にはリストの最初の要素 1
が、 next
には f 2 (f 3 finalize)
が、 state
には初期状態 0
が渡されてくることになる。ループを続けるためには state
と item
から新しい状態を計算し、 next
に渡せばよい。ここで next
は f 2 (f 3 finalize)
なので、ループは第二要素に続くことが分かる。ここまでは普通のことだが、もし f
が next
を呼び出さなかったらどうなるかを考えると、これがまさにループの中断だとわかるはずだ。
ではこれを使って実践してみよう。 DecodeState
は foldl
で使ったものと同じものを使う。
data DecodeState
= DecodeState
{ _dChecksum :: Word8
, _dBuffer :: String
, _dResult :: B.Builder
}
decode :: B.ByteString -> Either String B.ByteString
decode input = case C.uncons input of
Just ('x', str) -> BL.toStrict . B.toLazyByteString . _dResult
<$> C.foldr f finalize str (DecodeState 1 [] mempty)
_ -> Left "Invalid input: Input should begin with 'x'"
where
f :: Char -> (DecodeState -> Either String DecodeState) -> DecodeState -> Either String DecodeState
f 'x' next (DecodeState s [c,'x',a] res) = do
strictValidate s a c
next (DecodeState s [] res)
f 'x' next (DecodeState s [c,b,a] res) = do
validate s a c
r <- safeFirstByte s a b c
next . DecodeState s [] $ res <> B.word8 r
f e next (DecodeState s ['-',d,c,b,a] res) = do
validate s a c
(ns, r) <- safePairByte s a b c d e
next . DecodeState ns [] $ res <> mconcat (map B.word8 r)
f _ _ (DecodeState _ [_,_,_,_,_] _) = Left "Invalid input"
f a next s@(DecodeState _ buf _) = next $ s { _dBuffer = a:buf }
finalize :: DecodeState -> Either String DecodeState
finalize ds@(DecodeState _ [] _) = return ds
finalize _ = Left "Invalid input"
foldl
のときとやっていることは同じだ。違うのは引数の順序と、入力が Either String DecodeState
ではなく DecodeState
になっていることだ。このため各パターンマッチでいちいち Right
を書く必要がなくなっているし、 Left
のマッチ節も不要となった。
ループの中断は先に見た通り next
を呼び出すか否かで制御されている。 Either e
の do
構文内で Left
が発生すれば、その時点で次の行に進まなくなるので next
が呼び出されず中断されることになる。このあたりの仕組みは再帰のときと同じだ。
foldM
せっかく中断能力を持つモナド Either e
を使っているのに、中断制御を再帰や継続渡しでやっているのは少しもったいない気もする。実は fold
シリーズには foldM
というモナド(Monadic)版もある。型はこうだ 4。
Monad m => (b -> a -> m b) -> b -> [a] -> m b
foldM f 0 [1, 2, 3]
は次のように展開される。
f 0 1 >>= \x -> f x 2 >>= \y -> f y 3
あるいは同じことだが、どちらかというと do
構文で説明されることが多い。
do
x <- f 0 1
y <- f x 2
z <- f y 3
return z
見ての通りループごとに bind 演算 (>>=)
が行われるため、モナド副作用(monadic side-effects)が発生する。 Either e
のモナド副作用は「計算の中断」である。
ByteString
に foldM
があれば良かったのだが、どうやらないようだ。仕方ないのでいったんリストに変換して foldM
を適用しよう。 DecodeState
はこれまでと全く同じものを利用する。
data DecodeState
= DecodeState
{ _dChecksum :: Word8
, _dBuffer :: String
, _dResult :: B.Builder
}
decode :: B.ByteString -> Either String B.ByteString
decode input = case C.uncons input of
Just ('x', str) -> fmap (BL.toStrict . B.toLazyByteString . _dResult)
. finalize . foldM f (DecodeState 1 [] mempty) $ C.unpack str
_ -> Left "Invalid input: Input should begin with 'x'"
where
f :: DecodeState -> Char -> Either String DecodeState
f (DecodeState s [c,'x',a] res) 'x' =
strictValidate s a c $> DecodeState s [] res
f (DecodeState s [c,b,a] res) 'x' = do
validate s a c
r <- safeFirstByte s a b c
return . DecodeState s [] $ res <> B.word8 r
f (DecodeState s ['-',d,c,b,a] res) e = do
validate s a c
(ns, r) <- safePairByte s a b c d e
return . DecodeState ns [] $ res <> mconcat (map B.word8 r)
f (DecodeState _ [_,_,_,_,_] _) _ = Left "Invalid input"
f s@(DecodeState _ buf _) a = return $ s { _dBuffer = a:buf }
finalize :: Either String DecodeState -> Either String DecodeState
finalize ds@(Left _) = ds
finalize ds@(Right (DecodeState _ [] _)) = ds
finalize _ = Left "Invalid input"
やっていることはこれまでとほぼ同じである。今回は高階関数が bind 演算 (>>=)
を呼び出してくれているおかげで、どこかで Left
が発生した時点でモナドの能力によりループが中断する。
mapAccumL
複数文字ずつ処理するためにいったん状態に溜め込み、溜め込んだ内容に対してパターンマッチを行う方針を見てきたが、そもそも今回は固定長 6 文字ずつの処理で良いので、最初から 6 文字ずつに分解してしまったほうが話は早い。Ruby なら each_slice(6)
とかしたい場面だ。Haskell だし当然類似品があるに違いない。と思って Data.List
を探してみるが、どうもなさそうに見える。仕方ない、自分で定義しよう5。
chunksOf :: Int -> [a] -> [[a]]
chunksOf _ [] = []
chunksOf n xs = hs : chunksOf n ts
where (hs, ts) = splitAt n xs
これも再帰によるループだ。具体的に使ってみるとこんな感じになる。
>>> chunksOf 3 [1..9]
[[1,2,3],[4,5,6],[7,8,9]]
>>> chunksOf 3 [1..10]
[[1,2,3],[4,5,6],[7,8,9],[10]]
これができれば map
系関数が使えるようになる。とはいえ今回に関してはチェックサムという状態を伝搬させていく必要があり、ただの map
ではそれはできない。そんなときは状態も一緒に扱える mapAccumL :: (a -> b -> (a, c)) -> a -> [b] -> (a, [c])
6 の出番だ。
decode :: B.ByteString -> Either String B.ByteString
decode input = case C.uncons input of
Just ('x', str) -> fmap (BL.toStrict . B.toLazyByteString . mconcat)
. sequence . snd
. mapAccumL f 1
. chunksOf 6
$ C.unpack str
_ -> Left "Invalid input: Input should begin with 'x'"
where
f :: Word8 -> String -> (Word8, Either String B.Builder)
f s [a,'x',c,'x'] =
(s, strictValidate s a c $> mempty)
f s [a,b,c,'x'] = (,) s $ do
validate s a c
r <- safeFirstByte s a b c
return $ B.word8 r
f s [a,b,c,d,'-',e] =
case validate s a c >> safePairByte s a b c d e of
Right (ns, r) -> (ns, Right . mconcat . map B.word8 $ r)
Left msg -> (s, Left msg)
f s _ = (s, Left "Invalid input")
各処理のパターンマッチが素直に書けてとても素晴らしい。しかしモナドは扱ってくれないのでいちいち毎回 Either
のパターンマッチをしなければならないのが残念な点ではある。
ループの中断がどこで行われるのかというと、これは Either e
のモナド能力による。mapAccumL
自体はモナドのことには関係しておらず、ではどこかというと sequence
がそれを担当することになる。正格評価が標準の言語に慣れていると、
- まず
chunksOf
が全入力を分解して -
mapAccumL
が各要素を全て処理し -
sequence
が結果の[Either String B.Builder]
を処理する
と順番に考えがちだが、Haskell では必要に応じて計算する形をとるため、
-
sequence
が[Either String B.Builder]
の先頭の要素を要求し -
mapAccumL
が次の入力を要求し -
chunksOf
が入力のうち必要な部分を切り分けて返し -
mapAccumL
が切り分けられたデータを処理し -
sequence
がその結果を受け取る
という形の処理が繰り返されることになる(この説明は主要素だけを切り出しているので正確性には欠けるが)。つまり 5. で Left
を受け取ったら、Either e
のモナド副作用により sequence
はリストの次の要素を要求しない。これがつまりループの中断となるわけだ。
mapAccumM
foldl
にモナド版 foldM
があったように、 mapAccumL
にもモナド版の mapAccumM
が、
……ないな? なんでないの? 便利なのに。
実際検索するといくつかライブラリに特化したものなどが出てくるのだが、そのあたりを参考にして定義してみよう。
mapAccumM :: Monad m => (acc -> x -> m (acc, y)) -> acc -> [x] -> m (acc, [y])
mapAccumM _ a [] = return (a, [])
mapAccumM f a (x:xs) = do
(b, y) <- f a x
(c, ys) <- mapAccumM f b xs
return (c, y:ys)
これを使えば Either e
のモナド副作用も mapAccumM
に任せられる。
decode :: B.ByteString -> Either String B.ByteString
decode input = case C.uncons input of
Just ('x', str) -> fmap (BL.toStrict . B.toLazyByteString . mconcat . snd)
. mapAccumM f 1
. chunksOf 6
$ C.unpack str
_ -> Left "Invalid input: Input should begin with 'x'"
where
f :: Word8 -> String -> Either String (Word8, B.Builder)
f s [a,'x',c,'x'] =
strictValidate s a c $> (s, mempty)
f s [a,b,c,'x'] = do
validate s a c
r <- safeFirstByte s a b c
return (s, B.word8 r)
f s [a,b,c,d,'-',e] = do
validate s a c
(ns, r) <- safePairByte s a b c d e
return (ns, mconcat . map B.word8 $ r)
f _ _ = Left "Invalid input"
これでいちいちループ内で Either
のパターンマッチをすることもなくなった。
おまけ: Bubble Babble 実例
ところでこれまで見てきた Bubble Babble、実は SSH 鍵の指紋表示に使われていたりする。
% ssh-keygen -B -f sample
2048 xuzam-luzyn-pamov-dohog-tatuz-lylyf-zahek-firor-fynym-sodot-maxyx sample (RSA)
といっても普段は -l
の方を見ていると思うので知らない人も多そうだ。
% ssh-keygen -l -E sha1 -f sample
2048 SHA1:/Ie8mqLi1k31958/12Pus2WMCdg sample (RSA)
せっかくデコーダを作ったので実際にデコードして一致することを確認してみるのも良いだろう。
こうして並べてみると Base64 のほうが大分短いが、Bubble Babble の良いところは子音母音の組み合わせと区切り文字のおかげで、人間が見たときに何となく読みやすい形になっている点にある。印刷を行うなど、目視で一致を確認せざるを得ない場合などは Bubble Babble のほうを使ってみるのもありかもしれない。クーポンコードのようなものを印刷する場合にも良さそうに思えるがどうだろうか。と思ったけど今はそういうのは QR コードか。
-
実際には、メモリが不足する、現実的な時間で計算が終わらない、といったリソース不足による障害は考え得る。 ↩
-
どうやら最近の GHC では
(<>)
はGHC.Base
に入ったようなので、いちいちimport
しなくても良くなったようだ。 ↩ -
厳密にはリストに限定されず
Foldable t => (a -> b -> b) -> b -> t a -> b
である。また、ByteString
のfoldr
は(Word8 -> b -> b) -> b -> ByteString -> b
である。 ↩ -
こちらも厳密にはリストに限定されず
(Foldable t, Monad m) => (b -> a -> m b) -> b -> t a -> m b
である。 ↩ -
外のライブラリを探せば
split
のData.List.Split
に同じものが定義されている。 ↩ -
実際はリストに限定されず
Traversable
を受け入れ、Traversable t => (a -> b -> (a, c)) -> a -> t b -> (a, t c)
である。 ↩