これはなに
昨今、DARPA CGC始めネイティブバイナリを対象とした脆弱性自動解析技術はますます進歩の一途を辿っているが、あまりにも急激に進歩するため実際のユースケースやドキュメントが追従できてない様に思う。そのためangrやS2E、Drillerといった基礎ツールを使いこなすまでの壁が高く、何だかんだしてるうちにまた次の研究が登場してしまうといった始末。まあアカデミアのstate-of-artsなんでそんな物だと思うけどもうちょっとマシなTipsは無いのか。
まあ無いなら書いてみるか。って事でコードスニペット集みたいな記事を作ってみた。7割ぐらい自分のメモ目的だけど、まあ研究やら趣味活動やらでオレオレスクリプトコードが溜まってるので汎用的なやつは放出してみる。
今回はangrのスクリプト集。何か新しく書いたら適時更新して行くと思う。あくまで基本を理解してる人向けのTips集。多分研究者とかCTFでrevやる人が想定読者?
(一応こんなのもある)
angr例文集
しばらく使っていなかったけど今年に入ってangr8がリリースされてついにpython3専用のフレームワークになってた。APIもちょくちょく便利な方向にアプデされてる。
シンボリックファイル
ファイルの内容をシンボリックにする。angrの公式チュートリアルとかによくある標準入力stdinから受け取るんじゃなくて、入力がファイルってパターン。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int auth(char *buf) {
if (strcmp(buf, "ABCD") == 0) {
return 1;
}
return 0;
}
int main() {
FILE *fp;
size_t rsize;
char buf[128] = {0};
if ((fp = fopen("input.dat", "r")) == NULL) {
exit(0);
}
while((rsize = fread(buf, sizeof(char), 128, fp)) > 0) {
int res = auth(buf);
if (res) {
printf("Auth success %s\n", buf);
break;
}
printf("Faile %s\n", buf);
}
fclose(fp);
}
実は入力がstdinであろうとファイルであろうと、angrはSimProcedureという仕組みを使って、プログラムがファイルをopenで開くとsymbolizeしておいてくれる。stdinはopen時じゃなくて初期からsymbolizeされたファイルになってるだけの違い。SimProcedureでインタプリタ実行時に標準関数をあらかたエミュレーションしてくれるんだけど、その合間にこのsymbolizeの処理をやってくれる。詳しくはangrのprocedure/ディレクトリとか見ると良い。
import angr
import sys
base_addr = 0x400000
authed_addr = base_addr + 0x83d
bin_path = 'test_file'
input_path = 'input.dat'
proj = angr.Project(bin_path)
state = proj.factory.entry_state()
with open(input_path) as fp:
content = fp.read()
print(content)
# Explore path
simgr = proj.factory.simulation_manager(state)
simgr.explore(find=authed_addr)
state = simgr.found[0]
print(state.posix.dump_file_by_path(input_path))
ちなみにある地点(SimState)で扱ってるファイルはstate.posix.fsでアクセスできて、勿論書き換えもできる。なのでstdinとか使わないしいらないよって時は以下の様に自分で"input.dat"をSimFileとして定義してそれだけ使うようにしてもよい。
import angr
import sys
base_addr = 0x400000
authed_addr = base_addr + 0x83d
bin_path = 'test_file'
input_path = 'input.dat'
proj = angr.Project(bin_path)
state = proj.factory.entry_state()
with open(input_path) as fp:
content = fp.read()
print(content)
# Memory mapped symbolic file
password_file = angr.storage.SimFile(input_path, content=content)
# Add new fs to current state
fs = {
input_path: password_file
}
state.posix.fs = fs
# Explore path
simgr = proj.factory.simulation_manager(state)
simgr.explore(find=authed_addr)
state = simgr.found[0]
print(state.posix.dump_file_by_path(input_path))
注意
上のコードを見てアレ??と思ったangrオタクもいるかもしれない。実際僕も始めは驚いたんだけど、
password_file = angr.storage.SimFile(input_path, content=content)
でcontentにfp.read()で読んできたconcreate valueが指定できてるんですよ。少し古いangrを使ってる人、具体的にはこのコミットより前だとSimFileは以下のような感じで作る必要がありましたよね。
password_len = 0x4
password_vec = state.se.BVS('password_vec', password_len * 8)
# Symbolic memory containing file content(BitVector)
password_mem = angr.state_plugins.SimSymbolicMemory(memory_id='file_{}'.format(input_path))
password_mem.set_state(state)
password_mem.store(0, password_vec)
# Memory mapped symbolic file
password_file = angr.storage.SimFile(input_path, content=password_mem, size=password_len)
入力の大きさ分のBitVectorを作ってSymbolicMemoryに書き込む。
それをSimFileのcontentに渡すことでつまりSymbolic Memory mapped fileみたいな物を作る必要があったんだけど、これはangr8から必要なくなった。contentはSymbolicMemoryじゃなくて普通に具体値を渡してやると中で勝手に同じ大きさのSymbolicMemoryが作られるようになってる。
シンボリックソケット
意外とサーバープログラムのソケット入力をシンボルにする例は少ないけど、これもSimProcedureでangrが勝手にソケットを裏でシンボルにしてるのですぐできる。実際procedure/posix/socket.pyとか見ると
import angr
######################################
# socket
######################################
class socket(angr.SimProcedure):
#pylint:disable=arguments-differ
def run(self, domain, typ, protocol):
conc_domain = self.state.solver.eval(domain)
conc_typ = self.state.solver.eval(typ)
conc_protocol = self.state.solver.eval(protocol)
if self.state.posix.uid != 0 and conc_typ == 3: # SOCK_RAW
return self.state.libc.ret_errno('EPERM')
nonce = self.state.globals.get('socket_counter', 0) + 1
self.state.globals['socket_counter'] = nonce
fd = self.state.posix.open_socket(('socket', conc_domain, conc_typ, conc_protocol, nonce))
return fd
posix.open_socketで内部的にシンボリックなソケットが作られてる。
なのでstdinやファイル同様、exploreしてたどり着いた先のSimStateのfsからソケットのfdをposix.dumpsなりでconcretizeしたら終わり。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <netinet/in.h>
#include <netdb.h>
int auth(char *buf) {
if (strcmp(buf, "ABCD") == 0) {
return 1;
}
return 0;
}
int main() {
int sockfd;
int new_sockfd;
int len;
struct sockaddr_in reader_addr;
struct sockaddr_in client_addr;
char buf[128] = {0};
if ((sockfd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
perror("reader: socket");
exit(1);
}
bzero((char *) &reader_addr, sizeof(reader_addr));
reader_addr.sin_family = PF_INET;
reader_addr.sin_addr.s_addr = htonl(INADDR_ANY);
reader_addr.sin_port = htons(4444);
if (bind(sockfd, (struct sockaddr *)&reader_addr, sizeof(reader_addr)) < 0) {
perror("reader: bind");
exit(1);
}
if (listen(sockfd, 5) < 0) {
perror("reader: listen");
close(sockfd);
exit(1);
}
int aflag = 0;
while(!aflag) {
len = sizeof(client_addr);
new_sockfd = accept(sockfd, (struct sockaddr *)&client_addr, &len);
if(new_sockfd == -1) {
perror("accept");
continue;
}
//printf("from %s port %d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
int rsize;
while((rsize = read(new_sockfd, buf, 128)) > 0) {
int res = auth(buf);
if (res) {
printf("Auth success %s\n", buf);
aflag = 1;
break;
} else {
printf("Noop... %s(%d)\n", buf, rsize);
memset(buf, 0, sizeof(buf));
}
}
close(new_sockfd);
}
close(sockfd);
}
import angr
import sys
base_addr = 0x400000
authed_addr = base_addr + 0xa7d
bin_path = 'test_server'
client_fd = 4
proj = angr.Project(bin_path)
state = proj.factory.entry_state()
# Explore path
simgr = proj.factory.simulation_manager(state)
simgr.explore(find=authed_addr)
state = simgr.found[0]
print(state.posix.dumps(client_fd))
Driller
現状本家のDrillerは残念ながらangrへの入力がstdinに固定されてしまっているので、stdinではなく他のfdから読むスクリプトも書いてみよう。
import angr
import sys
import tracer # For concolic execution
base_addr = 0x400000
auth_addr = base_addr + 0xa5a # An address of auth()
authed_addr = base_addr + 0xa7d
client_fd = 4 # XXX hard coded
bin_path = 'test_server'
pre_data = bytes('AAAA', 'ascii')
# Driller tracer
r = tracer.qemu_runner.QEMURunner(binary=bin_path, input=pre_data)
drill_tech = angr.exploration_techniques.DrillerCore(trace=r.trace)
# Project
proj = angr.Project(bin_path)
# Prepare
#t = angr.exploration_techniques.Tracer(trace=r.trace, crash_addr=r.crash_addr, copy_states=True)
#state = proj.factory.blank_state(addr=auth_addr, remove_options={angr.options.LAZY_SOLVES}) # XXX: Not work
state = proj.factory.entry_state()
simgr = proj.factory.simulation_manager(state)
#simgr.use_technique(t) # concolic tech
simgr.use_technique(drill_tech) # driller tech
# Proceed after socket creation
# XXX: Bit ugly method. But we can't use explore(find=auth_addr) or blank_state(addr=auth_addr)
while state.posix.get_fd(client_fd) == None:
simgr.step()
state = simgr.active[0]
print('Reached', state)
# Preconstrain packet data
state.preconstrainer.preconstrain_file(pre_data, state.posix.get_fd(client_fd).read_storage, True) # get_fd return duplexfd
# Find new path transition (See Driller paper)
while simgr.active:
simgr.step()
if 'diverted' not in simgr.stashes:
continue
while simgr.diverted:
state = simgr.diverted.pop(0)
# Print passed Basick Blocks
print(state.history.bbl_addrs[-1])
break
DrillerはQEMURunnerによるConcolic executionが必須なのだが、残念ながらQEMURunnerはソケットから具体値を受け取ってトレースする方法は提供していないため、サーバープログラムに対してはうまく動かない。(要修正) AFLが生成した入力に対するpreconstrainは本家のdrillerでは、
s.preconstrainer.preconstrain_file(self.input, s.posix.stdin, True)
というようにstdin前提でpreconstrainしてしまっているけど、こっちのスクリプトはstate.posix.get_fd(client_fd).read_storageを使ってソケットを入力として扱ってる。しかしそのせいで以下のような汚いコードを追加するハメになった。
while state.posix.get_fd(client_fd) == None:
simgr.step()
state = simgr.active[0]
実際にsocket()でclient_fdが生成されるまで状態を進めてしまうんだけど常にactive[0]を取って一方向に進み続けるのは良くない。
かといって初期のstateをblank_state(addr=auth_addr)のようにソケット生成後の状態から始めようとしても何故かstate.posix.get_fd(client_fd)がNoneを返すので上のようにしてる。
もっと良い方法があったら教えて欲しい。
目的到達パスが陽に無い場合
Flareon CTFのWrite upが参考になる。文字を入力してエンコーディングして吐くってだけ。エンコードされた文字が正しいかどうかの判定は送信された先のサーバーで行われるでの対象のバイナリ内には"puts("Accepted")"みたいな到達したいエッジがあるわけじゃない。
こういう場合はエンコード終了後に適当な所で正しいエンコーディング文字と比較するような制約をstateに追加してevalで解くとかできる。
http://0x0atang.github.io/reversing/2015/09/18/flareon5-concolic.html
あとhookでバッファをdupしてりもしてる。self modify避け。
更新について
また新しいスクリプトを書いたら載せていく。