1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

{作成}LLMを使用して自然言語でコマンドを実行してくれるツールを作ってみた

Last updated at Posted at 2024-12-26

はじめに

昨日、理想のコマンド入力システムが作成できたのでLLMにアクションの指示を出したら勝手に実行してくれるスクリプトを作成しました。

内容

  • ユーザーが○○したいみたいな入力をする
  • Geminiが分析してアクションリストを作成する
  • アクションリストをGeminiが解析し、コマンドリストを作成
  • コマンドを順次実行していく
  • 一秒ごとにログを取得して画面に出力する

コード

app.py
import subprocess
import threading
import os
import pty
from datetime import datetime
import re
from flask import Flask, request, jsonify, render_template
import time
from chatt import chatLogic

class ShellLogger:
    def __init__(self, log_file="log.txt"):
        self.log_file = log_file
        # 不要な出力を無視するパターン
        self.ignore_patterns = [
            r"┌──\(kali㉿kali\)-\[.*?\]",
            r"└─\$",
            r"stty: .*",
            r"^\s*$",
            r"\[.*?\] exec:.*"
        ]
        
    def clean_ansi(self, text):
        # ANSIエスケープシーケンスを除去
        ansi_escape = re.compile(r'''
            \x1B  # ESC
            (?:   # 非キャプチャグループ
                [@-Z\\-_]
                |\[
                [0-?]*  # パラメータバイト
                [ -/]*  # 中間バイト
                [@-~]   # 最終バイト
            )
        ''', re.VERBOSE)
        text = re.sub(r'\x1B\][0-9;]*;*[a-zA-Z]', '', text)
        text = re.sub(r'\x1B\[[\?0-9;]*[a-zA-Z]', '', text)
        text = re.sub(r'\x1B\[[\?0-9;]*[mK]', '', text)
        text = ansi_escape.sub('', text)
        text = re.sub(r'\x0f|\x1B\[H|\x1B\[2J|\x1B\[K|\r', '', text)
        return text
        
    def should_ignore(self, message):
        cleaned_message = self.clean_ansi(message)
        return any(re.search(pattern, cleaned_message) for pattern in self.ignore_patterns)
        
    def log(self, message, message_type="INFO"):
        if self.should_ignore(message):
            return
        cleaned_message = self.clean_ansi(message).strip()
        if not cleaned_message:
            return
        log_entry = f"{cleaned_message}\n"
        with open(self.log_file, "a", encoding="utf-8") as f:
            f.write(log_entry)
        
    def log_command(self, command):
        if not self.should_ignore(command):
            self.log(f"Command: {command}", "CMD")
        
    def log_output(self, output):
        if not self.should_ignore(output):
            self.log(f"{output}", "OUT")

def create_pty_process():
    master, slave = pty.openpty()
    process = subprocess.Popen(
        ["bash"],
        stdin=slave,
        stdout=slave,
        stderr=slave,
        preexec_fn=os.setsid,
        universal_newlines=True
    )
    return process, master, slave

def read_output(master_fd, logger):
    buffer = ""
    while True:
        try:
            data = os.read(master_fd, 1024).decode()
            if data:
                buffer += data
                if '\n' in buffer:
                    lines = buffer.split('\n')
                    for line in lines[:-1]:
                        if line.strip():
                            logger.log_output(line.strip())
                    buffer = lines[-1]
        except (OSError, UnicodeDecodeError):
            buffer = ""
            continue

# Flask app setup
app = Flask(__name__)
logger = ShellLogger()
process, master, slave = create_pty_process()

# 出力読み取りスレッドの開始
threading.Thread(target=read_output, args=(master, logger), daemon=True).start()

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/execute', methods=['POST'])
def execute():
    user_input = request.json.get('user_input', "")
    if not user_input:
        return jsonify({"error": "No user_input provided"}), 400

    commands = chatLogic(user_input)
    print(f"Received user_input: {user_input}")
    print(f"Generated commands: {commands}")

    for command in commands:
        command = command.strip()
        if command:
            logger.log_command(command)
            os.write(master, (command + '\n').encode())
            time.sleep(0.1)

    return jsonify({"status": "commands received", "commands": commands})

@app.route('/logs', methods=['GET'])
def logs():
    try:
        if not os.path.exists(logger.log_file):
            return jsonify({"logs": []})
            
        with open(logger.log_file, 'r', encoding='utf-8') as f:
            log_lines = [line.strip() for line in f.readlines() if line.strip()]
        return jsonify({"logs": log_lines})
        
    except Exception as e:
        print(f"Error reading logs: {str(e)}")
        return jsonify({"error": "Failed to read logs", "details": str(e)}), 500

@app.route('/clear-logs', methods=['POST'])
def clear_logs():
    try:
        with open(logger.log_file, 'w', encoding='utf-8') as f:
            f.write('')
        return jsonify({"status": "success", "message": "Logs cleared successfully"})
    except Exception as e:
        return jsonify({"status": "error", "message": str(e)}), 500

if __name__ == "__main__":
    #app.run(debug=True)
    app.run(host='0.0.0.0', port=5000)

chatt.py
from gemini import chat_gem

def extract_bash_commands(text):
    # 入力文字列を行で分割
    lines = text.strip().split('\n')
    
    # 最初と最後の行を除外(スライシング使用)
    commands = lines
    
    # 特定の文字列(例: 'pattern1', 'pattern2')を含む行を除去
    patterns_to_exclude = ['```bash', '```']
    filtered_commands = [
        command for command in commands 
        if not any(pattern in command for pattern in patterns_to_exclude)
    ]
    
    return filtered_commands
def chatLogic(user_input):
    print(f"userinput2   :{user_input}")
    
    instruct = f"""指示:ユーザーのアクションの理想が入力として提供されます。
    入力:{user_input}
    出力:詳細なactionlistを出力する。"""
    
    actionlist1 = chat_gem(instruct)
    print("11111===========================")
    print(actionlist1)
    actionlist = f"""指示:アクションリストが入力として提供される。実際に実行可能な環境に適したコマンドを列挙して下さい。
    入力:{actionlist1}
    コンテキスト:OSはLinuxです。
    出力:コマンドのみを改行刻みで列挙する。"""
    

    
    commandlist = chat_gem(actionlist)
    print("222222===========================")
    print(commandlist)
    list1=extract_bash_commands(commandlist)
    return list1

結果

入力:
testという名前のテキストファイルを作成して。中身は、helloとでも書いておいての

スクリーンショット (21).png
スクリーンショット (19).png
ここから
スクリーンショット (20).png
こう

mkdir -p ~/Documents/
cd ~/Documents/
echo "hello" > test.txt
ls -l test.txt
cat test.txt

入力:
metasploitを起動し、vsftpdに関するモジュールを検索したい

スクリーンショット (15).png
スクリーンショット (18).png
スクリーンショット (16).png
スクリーンショット (17).png

msfconsole
search vsftpd
info <モジュール名>
use <モジュール名>
show options
set <オプション名> <値>
set PAYLOAD <ペイロード名>
run

一応目的は果たせているが余計なことも行われてしまっている。

まとめ

インタラクティブなツールでも実行できており、コマンドの量が多すぎて覚えてられない場合には頼りになるかもしれません。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?