玄箱を探す
ap_servdが実行されている玄箱を UDP ブロードキャストを使用して探します。
ifconfigまたはip addrコマンドが実行可能なことを前提としています。
find_kurobox.py
#!/usr/bin/env python3
import argparse
import struct
import socket
import subprocess
import sys
from collections import defaultdict
def ifconfig():
cp = subprocess.run(['ifconfig'], capture_output=True, check=True)
interfaces = defaultdict(dict)
curr = None
for line in cp.stdout.decode().splitlines():
if not line[0].isspace():
ifname = line.split(':', 1)[0]
curr = interfaces[ifname]
curr['interface'] = ifname
continue
if not curr:
continue
line = line.strip()
if line[:6] == 'ether ':
curr['ether'] = line.split()[1]
elif line[:5] == 'inet ':
curr['inet'] = line.split()[1]
return interfaces
def ip_addr():
cp = subprocess.run(['ip', 'addr'], capture_output=True, check=True)
interfaces = defaultdict(dict)
curr = None
for line in cp.stdout.decode().splitlines():
if not line[0].isspace():
ifname = line.split(':', 1)[1].split(':')[0].strip()
curr = interfaces[ifname]
curr['interface'] = ifname
continue
if not curr:
continue
line = line.strip()
if line[:11] == 'link/ether ':
curr['ether'] = line.split()[1]
elif line[:5] == 'inet ':
curr['inet'] = line.split()[1].split('/')[0]
return interfaces
def get_interfaces():
try:
return ifconfig()
except Exception as e:
# print(f'ifconfig: {e}')
pass
return ip_addr()
def packet(cmd, smac, tmac=None):
pkt = struct.pack('<L2HBBHL', 32, 8, 1, cmd, 0x80, 0, 0)
pkt += smac
if tmac is None:
tmac = b'\xff' * 6
pkt += tmac
pkt += b'\0' * 4
return pkt
def command(sock, cmd, ip=None):
sock.sendto(cmd, (ip if ip else '255.255.255.255', 22936))
recv = defaultdict(list)
try:
while True:
data, addr = sock.recvfrom(1024)
recv[addr[0]] = data
if ip:
break
except socket.timeout:
pass
return recv
def unpack_u16be(data):
return struct.unpack(f'>{len(data) >> 1}H', data)
def unpack_u16le(data):
return struct.unpack(f'<{len(data) >> 1}H', data)
def pack_u16le(data):
return struct.pack(f'<{len(data)}H', *data)
def pack_u16les(data):
return pack_u16le(data).decode().strip('\0').strip()
def kbscan(sock, mac):
return command(sock, packet(0x20, mac))
def kbhdd(sock, smac, tmac, ip):
return command(sock, packet(0x90, smac, tmac), ip)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-I', '--interface')
parser.add_argument('-H', '--hdd-info', action='store_true')
args = parser.parse_args()
indent = ' ' * 4
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.bind(('0.0.0.0', 0))
sock.settimeout(1)
interfaces = get_interfaces()
kbres = {}
for ifname in [args.interface] if args.interface else interfaces.keys():
if ifname in ('lo', 'lo0', 'lo1'):
continue
nif = interfaces[ifname]
ether, inet = nif.get('ether'), nif.get('inet')
if not (inet and ether):
continue
mac = bytes(int(n, 16) for n in ether.split(':'))
print(f'{ifname}: {inet}')
for _ in range(2):
kbres |= kbscan(sock, mac)
for kaddr, sres in kbres.items():
sdata = sres[32:]
ipaddr = '.'.join(f'{n}' for n in reversed(sdata[:4]))
ipmask = '.'.join(f'{n}' for n in reversed(sdata[4:8]))
mode = sdata[16:32].decode().rstrip('\0')
wgrp = sdata[32:48].decode().rstrip('\0')
name = sdata[48:80].decode().rstrip('\0')
print(f'{kaddr} : {mode} : {name}' + (f' : {wgrp}' if wgrp else ''))
print(f'{indent}IP: addr={ipaddr}, mask={ipmask}')
if args.hdd_info:
hdres = kbhdd(sock, sres[22:28], sres[16:22], ipaddr)
for haddr, hres in hdres.items():
if kaddr != haddr:
continue
incits529 = hres[32:]
hdd_be = unpack_u16be(incits529)
hdd_le = unpack_u16le(incits529)
chs = [hdd_le[n] for n in (1, 3, 6)]
serial = pack_u16les(hdd_be[10:20])
fwrev = pack_u16les(hdd_be[23:27])
model = pack_u16les(hdd_be[27:47])
lba = hdd_le[60] + (hdd_le[61] << 16)
print(f'{indent}ドライブ名: {model}')
print(f'{indent}容量 : {lba >> 11} MiB ({lba * 512 // 1000000} MB)')
print(f'{indent}S/N : {serial}')
print(f'{indent}F/W Rev : {fwrev}')
print(f'{indent}C/H/S : {chs[0]}/{chs[1]}/{chs[2]}')
sys.exit(main())
実行したときの表示パターン
$ python3 find_kurobox.py -I en0 -H
en0: xxx.xxx.xxx.xxx
xxx.xxx.xxx.xxx : KURO-BOX-EM : KURO-BOX/HG(IESHIGE)
IP: addr=xxx.xxx.xxx.xxx, mask=mmm.mmm.mmm.mmm
ドライブ名: DDDDDDDDDDDD
容量 : YYYYY MiB (yyyyy MB)
S/N : ZZZZZZZZZZZZ
F/W Rev : RRRR
C/H/S : CCCCC/HH/SS
システムをインストールする
玄箱に取り付けた HDD をフォーマットしてシステムをインストールします。
システム構築の試行錯誤用です。EMモードにしてから実行してください。
ftplib,telnetlibを使って
- HDD のフォーマット
-
KuroBoxSetup.exeと同じ形式でフォーマットします
-
- システム(IMAGE)のインストール
-
.zip拡張子ならばimage.zipとして展開 -
.tgz(.tar.gz)拡張子ならばtmpimage.tgzとして展開
-
- 通常モードへの切り替え(
write_okの実行) - 再起動
を実行します。
kuroboxsetup.py
#!/usr/bin/env python3
import argparse
import io
import os
import sys
import time
from ftplib import FTP
from telnetlib import Telnet
FORMAT_SCRIPT = """
#!/bin/sh
umount /mnt /mnt2
/sbin/mfdisk -e /dev/hda
sh /sbin/mkfilesystem.sh
"""
INSTALL_SCRIPT = """
#!/bin/sh
cd /mnt2
unzip image.zip
tar -C /mnt -zxvf tmpimage.tgz || exit $?
/usr/bin/write_ok
reboot
"""
verbose = None
def noop(*_args, **_kwargs): pass
def message(msg): print(msg)
class KuroBoxSetup:
def __init__(self, target, passwd):
self.target = target
self.username = 'root'
self.password = passwd
def put_data(self, fp, tdir, name):
verbose(f'FTP: {self.target}')
ftp = FTP(self.target)
ftp.login(self.username, self.password)
ftp.set_pasv(True)
verbose(f'CWD: {tdir}')
ftp.cwd(tdir)
verbose(f'PUT: {name}')
ftp.storbinary(f'STOR {name}', fp)
verbose(f'FTP: end')
ftp.quit()
def put_file(self, sfile, tdir, name=None):
if name is None:
name = os.path.basename(sfile)
with open(sfile, 'rb') as fp:
self.put_data(fp, tdir, name)
def cmd(self, line):
verbose(f'Telnet: {self.target}')
rh = Telnet(self.target)
try:
rh.read_until(b'login: ')
rh.write(self.username.encode() + b'\n')
rh.read_until(b'Password: ')
rh.write(self.password.encode() + b'\n')
time.sleep(1)
verbose(f'Comand: {line}')
rh.read_until(b'# ')
rh.write(line.encode() + b'\n')
while True:
time.sleep(1/25)
sys.stdout.write(rh.read_very_eager().decode('euc-jp'))
sys.stdout.flush()
rh.write(b'\x04')
except ConnectionResetError:
rh.close()
except EOFError:
rh.close()
finally:
rh.close()
sys.stdout.write('\n')
sys.stdout.flush()
def main():
global verbose
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true', help='冗長モード')
parser.add_argument('--HG', action='store_true', help='HGのとき指定')
parser.add_argument('target', metavar='TARGET', help='玄箱のホスト名/IPアドレスを指定')
parser.add_argument('image', metavar='IMAGE', help='image.zip/tmpimage.tgz 相当を指定')
args = parser.parse_args()
verbose = message if args.verbose else noop
image = args.image
iname = None
if image[-4:] == '.zip':
iname = 'image.zip'
elif image[-4:] == '.tgz' or image[-7:] == '.tar.gz':
iname = 'tmpimage.tgz'
else:
print(f'unknown image format: {image}')
return 2
setup = KuroBoxSetup(args.target, 'kuroadmin' if args.HG else 'kuro')
setup.put_data(io.BytesIO(FORMAT_SCRIPT.encode()), '/tmp', 'format.sh')
setup.cmd('sh /tmp/format.sh')
setup.put_file(image, '/mnt2', iname)
setup.put_data(io.BytesIO(INSTALL_SCRIPT.encode()), '/tmp', 'install.sh')
setup.cmd('sh /tmp/install.sh')
return 0
sys.exit(main())