0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

玄箱の検索とシステムのインストール

Last updated at Posted at 2025-07-14

玄箱を探す

ap_servdが実行されている玄箱を UDP ブロードキャストを使用して探します。

ifconfigまたはip addrコマンドが実行可能なことを前提としています。

find_kurobox.py
#!/usr/bin/env python3

import argparse
import struct
import socket
import subprocess
import sys
from collections import defaultdict


def ifconfig():
    cp = subprocess.run(['ifconfig'], capture_output=True, check=True)
    interfaces = defaultdict(dict)
    curr = None
    for line in cp.stdout.decode().splitlines():
        if not line[0].isspace():
            ifname = line.split(':', 1)[0]
            curr = interfaces[ifname]
            curr['interface'] = ifname
            continue
        if not curr:
            continue
        line = line.strip()
        if line[:6] == 'ether ':
            curr['ether'] = line.split()[1]
        elif line[:5] == 'inet ':
            curr['inet'] = line.split()[1]
    return interfaces


def ip_addr():
    cp = subprocess.run(['ip', 'addr'], capture_output=True, check=True)
    interfaces = defaultdict(dict)
    curr = None
    for line in cp.stdout.decode().splitlines():
        if not line[0].isspace():
            ifname = line.split(':', 1)[1].split(':')[0].strip()
            curr = interfaces[ifname]
            curr['interface'] = ifname
            continue
        if not curr:
            continue
        line = line.strip()
        if line[:11] == 'link/ether ':
            curr['ether'] = line.split()[1]
        elif line[:5] == 'inet ':
            curr['inet'] = line.split()[1].split('/')[0]
    return interfaces


def get_interfaces():
    try:
        return ifconfig()
    except Exception as e:
        # print(f'ifconfig: {e}')
        pass
    return ip_addr()


def packet(cmd, smac, tmac=None):
    pkt = struct.pack('<L2HBBHL', 32, 8, 1, cmd, 0x80, 0, 0)
    pkt += smac
    if tmac is None:
        tmac = b'\xff' * 6
    pkt += tmac
    pkt += b'\0' * 4
    return pkt


def command(sock, cmd, ip=None):
    sock.sendto(cmd, (ip if ip else '255.255.255.255', 22936))
    recv = defaultdict(list)
    try:
        while True:
            data, addr = sock.recvfrom(1024)
            recv[addr[0]] = data
            if ip:
                break
    except socket.timeout:
        pass
    return recv


def unpack_u16be(data):
    return struct.unpack(f'>{len(data) >> 1}H', data)


def unpack_u16le(data):
    return struct.unpack(f'<{len(data) >> 1}H', data)


def pack_u16le(data):
    return struct.pack(f'<{len(data)}H', *data)


def pack_u16les(data):
    return pack_u16le(data).decode().strip('\0').strip()


def kbscan(sock, mac):
    return command(sock, packet(0x20, mac))


def kbhdd(sock, smac, tmac, ip):
    return command(sock, packet(0x90, smac, tmac), ip)


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-I', '--interface')
    parser.add_argument('-H', '--hdd-info', action='store_true')
    args = parser.parse_args()

    indent = ' ' * 4

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    sock.bind(('0.0.0.0', 0))
    sock.settimeout(1)

    interfaces = get_interfaces()
    kbres = {}
    for ifname in [args.interface] if args.interface else interfaces.keys():
        if ifname in ('lo', 'lo0', 'lo1'):
            continue
        nif = interfaces[ifname]
        ether, inet = nif.get('ether'), nif.get('inet')
        if not (inet and ether):
            continue
        mac = bytes(int(n, 16) for n in ether.split(':'))
        print(f'{ifname}: {inet}')
        for _ in range(2):
            kbres |= kbscan(sock, mac)
    for kaddr, sres in kbres.items():
        sdata = sres[32:]
        ipaddr = '.'.join(f'{n}' for n in reversed(sdata[:4]))
        ipmask = '.'.join(f'{n}' for n in reversed(sdata[4:8]))
        mode = sdata[16:32].decode().rstrip('\0')
        wgrp = sdata[32:48].decode().rstrip('\0')
        name = sdata[48:80].decode().rstrip('\0')
        print(f'{kaddr} : {mode} : {name}' + (f' : {wgrp}' if wgrp else ''))
        print(f'{indent}IP: addr={ipaddr}, mask={ipmask}')
        if args.hdd_info:
            hdres = kbhdd(sock, sres[22:28], sres[16:22], ipaddr)
            for haddr, hres in hdres.items():
                if kaddr != haddr:
                    continue
                incits529 = hres[32:]
                hdd_be = unpack_u16be(incits529)
                hdd_le = unpack_u16le(incits529)
                chs = [hdd_le[n] for n in (1, 3, 6)]
                serial = pack_u16les(hdd_be[10:20])
                fwrev = pack_u16les(hdd_be[23:27])
                model = pack_u16les(hdd_be[27:47])
                lba = hdd_le[60] + (hdd_le[61] << 16)
                print(f'{indent}ドライブ名: {model}')
                print(f'{indent}容量   : {lba >> 11} MiB ({lba * 512 // 1000000} MB)')
                print(f'{indent}S/N    : {serial}')
                print(f'{indent}F/W Rev  : {fwrev}')
                print(f'{indent}C/H/S   : {chs[0]}/{chs[1]}/{chs[2]}')


sys.exit(main())
実行したときの表示パターン
$ python3 find_kurobox.py -I en0 -H
en0: xxx.xxx.xxx.xxx
xxx.xxx.xxx.xxx : KURO-BOX-EM : KURO-BOX/HG(IESHIGE)
    IP: addr=xxx.xxx.xxx.xxx, mask=mmm.mmm.mmm.mmm
    ドライブ名: DDDDDDDDDDDD
    容量   : YYYYY MiB (yyyyy MB)
    S/N    : ZZZZZZZZZZZZ
    F/W Rev  : RRRR
    C/H/S   : CCCCC/HH/SS

システムをインストールする

玄箱に取り付けた HDD をフォーマットしてシステムをインストールします。

システム構築の試行錯誤用です。EMモードにしてから実行してください。

ftplib,telnetlibを使って

  • HDD のフォーマット
    • KuroBoxSetup.exeと同じ形式でフォーマットします
  • システム(IMAGE)のインストール
    • .zip拡張子ならばimage.zipとして展開
    • .tgz(.tar.gz)拡張子ならばtmpimage.tgzとして展開
  • 通常モードへの切り替え(write_okの実行)
  • 再起動

を実行します。

kuroboxsetup.py
#!/usr/bin/env python3

import argparse
import io
import os
import sys
import time
from ftplib import FTP
from telnetlib import Telnet


FORMAT_SCRIPT = """
#!/bin/sh

umount /mnt /mnt2
/sbin/mfdisk -e /dev/hda
sh /sbin/mkfilesystem.sh
"""

INSTALL_SCRIPT = """
#!/bin/sh

cd /mnt2
unzip image.zip
tar -C /mnt -zxvf tmpimage.tgz || exit $?
/usr/bin/write_ok
reboot
"""


verbose = None


def noop(*_args, **_kwargs): pass
def message(msg): print(msg)


class KuroBoxSetup:
    def __init__(self, target, passwd):
        self.target = target
        self.username = 'root'
        self.password = passwd

    def put_data(self, fp, tdir, name):
        verbose(f'FTP: {self.target}')
        ftp = FTP(self.target)
        ftp.login(self.username, self.password)
        ftp.set_pasv(True)
        verbose(f'CWD: {tdir}')
        ftp.cwd(tdir)
        verbose(f'PUT: {name}')
        ftp.storbinary(f'STOR {name}', fp)
        verbose(f'FTP: end')
        ftp.quit()

    def put_file(self, sfile, tdir, name=None):
        if name is None:
            name = os.path.basename(sfile)
        with open(sfile, 'rb') as fp:
            self.put_data(fp, tdir, name)

    def cmd(self, line):
        verbose(f'Telnet: {self.target}')
        rh = Telnet(self.target)
        try:
            rh.read_until(b'login: ')
            rh.write(self.username.encode() + b'\n')
            rh.read_until(b'Password: ')
            rh.write(self.password.encode() + b'\n')
            time.sleep(1)
            verbose(f'Comand: {line}')
            rh.read_until(b'# ')
            rh.write(line.encode() + b'\n')
            while True:
                time.sleep(1/25)
                sys.stdout.write(rh.read_very_eager().decode('euc-jp'))
                sys.stdout.flush()
                rh.write(b'\x04')
        except ConnectionResetError:
            rh.close()
        except EOFError:
            rh.close()
        finally:
            rh.close()
        sys.stdout.write('\n')
        sys.stdout.flush()


def main():
    global verbose

    parser = argparse.ArgumentParser()
    parser.add_argument('-v', '--verbose', action='store_true', help='冗長モード')
    parser.add_argument('--HG', action='store_true', help='HGのとき指定')
    parser.add_argument('target', metavar='TARGET', help='玄箱のホスト名/IPアドレスを指定')
    parser.add_argument('image', metavar='IMAGE', help='image.zip/tmpimage.tgz 相当を指定')

    args = parser.parse_args()
    verbose = message if args.verbose else noop

    image = args.image
    iname = None
    if image[-4:] == '.zip':
        iname = 'image.zip'
    elif image[-4:] == '.tgz' or image[-7:] == '.tar.gz':
        iname = 'tmpimage.tgz'
    else:
        print(f'unknown image format: {image}')
        return 2

    setup = KuroBoxSetup(args.target, 'kuroadmin' if args.HG else 'kuro')
    setup.put_data(io.BytesIO(FORMAT_SCRIPT.encode()), '/tmp', 'format.sh')
    setup.cmd('sh /tmp/format.sh')
    setup.put_file(image, '/mnt2', iname)
    setup.put_data(io.BytesIO(INSTALL_SCRIPT.encode()), '/tmp', 'install.sh')
    setup.cmd('sh /tmp/install.sh')
    return 0


sys.exit(main())
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?