PythonでSSH接続を行うためにparamikoというライブラリを使うことにしました。今のところ主な用途はコマンド実行だけなので大した内容ではなかったり、正直よくわかっていないのでメモです。
ユースケース
待ち状態が起きうるが少ないデータであればうまくいくexec_command
exec_commandで標準エラー出力が約2MiB(1,900,000Byte)未満の場合は雑に書いても動作します
# 標準出力とエラー出力をそれぞれ10MiB行うコマンド
command = '''bash -c 'dd if=/dev/urandom bs=1024 count=10240 >&1 & dd if=/dev/urandom bs=1024 count=10240 >&2; wait' '''
import paramiko
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.MissingHostKeyPolicy()) # 未知の公開鍵の場合は無視
client.connect(hostname="127.0.0.1", port=22, username="test", password="test") # 接続の確立
(stdin, stdout, stderr) = client.exec_command(command, timeout=5) # コマンド実行
stdout_data = stdout.read() # 標準出力を全て読み込みbytesで得る(※ここで待ち状態になりタイムアウトする)
stderr_data = stderr.read() # 標準エラー出力を全て読み込みbytesで得る
# 終了コードを受け取る
code = stdout.channel.recv_exit_status()
# 色々処理
ドキュメントを読んだりサンプルコードを探したりするとだいたいこんな感じの例にあたると思います。
しかし上記のコード例をそのまま実行した場合、おそらく待ち状態になりタイムアウトします。説明は後述します。
待ち状態になるのを防ぐ
色々調べて試した感じ、これでとりあえず待ち状態になることは防げます。
ただしこの書き方の場合、コマンドの実行が長くてもタイムアウトによる終了はされないため、実行するコマンドによっては永遠に待ち続けます。もしn秒以上経過したら異常としたい場合は、その処理を書く必要があります。
# 標準出力とエラー出力をそれぞれ10MiB行うコマンド
command = '''bash -c 'dd if=/dev/urandom bs=1024 count=10240 >&1 & dd if=/dev/urandom bs=1024 count=10240 >&2; wait' '''
import paramiko
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.MissingHostKeyPolicy()) # 未知の公開鍵の場合は無視
client.connect(hostname="127.0.0.1", port=22, username="test", password="test") # 接続の確立
channel = client.get_transport().open_session(timeout=10)
try:
channel.exec_command(command)
RECV_SIZE = 1024 * 32
stdout_data = b''
stderr_data = b''
while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
stdout_data += channel.recv(RECV_SIZE)
stderr_data += channel.recv_stderr(RECV_SIZE)
# 終了コードを受け取る
code = channel.recv_exit_status()
# 色々処理
finally:
channel.close()
while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
がキモで、リモートからコネクションを閉じる要求が来ていない、もしくは、バッファにデータが溜まっている間は繰り返すようにします。また、channelのrecvはブロッキングしないので安心して使えます。
これで(メモリが許す限りは)データをちゃんと読み取れるはずです。ビジーループなのでCPU使用率は高くなります。
上記の例ではメモリにため込んでいますが、データを受け取った都度なんか処理したい場合はそのように書くこともできます。
なお、このコードに至った後にこのStackoverflowの記事を見つけました。
https://stackoverflow.com/a/32758464
paramikoのこの辺りの動作を理解する助けになるかと思います。
メモ
以下はメモです。
paramikoのログを取りたい
標準のloggingが使えます。paramikoの動作や、特にchannelの動きがわかるので有用です。
import logging
logging.basicConfig()
logging.getLogger("paramiko").setLevel(logging.DEBUG)
内部メモ
SSHはChannelという仕組みを持ち、1つの接続で多重通信が行える。
SSHの RFC 4254 で コマンド実行(exec)を行う通信の定義や、これ以上データを送らないときはSSH_MSG_CHANNEL_EOF
かSSH_MSG_CHANNEL_CLOSE
を送るなどが決まっている。
paramikoのTransportクラスはthreading.Threadのサブクラスなので、通信処理は全部別スレッドで行われる。
- Channelのスレッドは、データの送受信を待ち受けて、バッファがいっぱいになるまではブロッキングせずに読み書き
- 利用(実装者)側のスレッドは、バッファがいっぱいになるまではブロッキングせずに読み書き
おそらく「バッファがいっぱいになるまでは」の閾値がおそらく約2MiB(きっちり2MiBではなくこれを超えると格段に起きやすくなる)
実装を追いきれてないのでよくわかっていない。
Channelのrecv_exit_statusについて
recv_exit_statusはexit statusが得られていない状態もしくはcloseされていない状態で呼び出すとtimeoutの設定を無視してブロッキングします。最低限、channel.exit_status_ready()
がTrueを返す状態で使うべきです。
雑な実装だとデータが読み切れず処理が止まってしまうところの調査
channel.recv_ready()
とchannel.recv_stderr_ready()
はどちらもFalse
を返す場合があります。これらは単純にバッファにデータがあるかどうかをTrue/Falseで返すだけです。
https://github.com/paramiko/paramiko/blob/v1.18.1/paramiko/buffered_pipe.py#L104
バッファのデータは別スレッドで書き込まれるため、見るタイミングによっては両方に無い場合も十分あり得ます。
他には channel.exit_status_ready()
がよさそうです。経験上「exit status のコードが得られた==コマンドの実行が完了した」と考えて良さそうに感じますが、paramiko(SSH?)においては正しくないようでした。
これはあくまで「exit status のメッセージが得られた」というだけです(一応、closedであるかどうかも見ていますが、通常は先にexit statusのメッセージを受け取るのであまり意味ないと思います)
実際、手元で検証したところたまにデータが受信しきれていない状態になりました。
検証コード
qiitaは`この仮説としては、リモートのバッファにデータまだ残っている状態が存在するが先にexit statusのコードを送ってくることがあるのではないかと考えています。その場合に、リモートのバッファにまだデータはあるのに、exit statusを受け取っている状態かつローカルのバッファにはない状態になりループから抜けてしまうという感じになっているのだと思います。これを考慮する必要がありそうでした。
ちなみに、stackoverflowではexit_status_readyを使った回答がありますが、上記の理由からダメなコードです。コメントでも言われてますね。
https://stackoverflow.com/a/21105626
そうなるとリモートからデータ読み切れてるかどうかを知る必要がありそうです。SSHの仕様であるRFC 4254を見たところ、以下の記述がありました。
https://tools.ietf.org/html/rfc4254
5.3. Closing a Channel より抜粋
A party MAY send SSH_MSG_CHANNEL_CLOSE without having sent or received SSH_MSG_CHANNEL_EOF.
paramikoとしては処理の終了の判断としては、リモート側からSSH_MSG_CHANNEL_EOFのメッセージが送られてくることを期待します。受け取るとparamikoはChannel._handle_eofを実行します。
https://github.com/paramiko/paramiko/blob/v1.18.1/paramiko/transport.py#L2369
https://github.com/paramiko/paramiko/blob/v1.18.1/paramiko/channel.py#L1101
これが実行された後であれば、channel.eof_received
がTrueになるので、後はバッファの中身を全て読み出せば、全部読めたということになります。
paramikoのデバッグログで言えば、以下のようなログが出るまでは読み続けたほうが良い、ということになります。
DEBUG:paramiko.transport:[chan 0] EOF received (0)
ただ、上にあるSSHのRFCの記述通りであれば、EOFを送らずいきなりCLOSEを送ってくることが認められているため、それに配慮したほうがよさそうです。そうなるとデータを最後まで受信できたかどうかの判断は、CLOSEを送ってきたかどうかで判断するのがよさそうです。
paramikoではcloseのメッセージを受け取ると以下のような感じで_close_internal
まで処理をしてclosed
をセットします。
https://github.com/paramiko/paramiko/blob/v1.18.1/paramiko/transport.py#L2370
https://github.com/paramiko/paramiko/blob/v1.18.1/paramiko/channel.py#L1114
https://github.com/paramiko/paramiko/blob/v1.18.1/paramiko/channel.py#L1186
https://github.com/paramiko/paramiko/blob/v1.18.1/paramiko/channel.py#L1163
paramiko的にはいきなりCLOSEが来たときは、EOFが来たのと同等の処理をするわけではなさそうで、channel.eof_received
の値が変わらない気がします。なのでchannel.eof_received
を見る方法はダメそうです(本当?)
なので、channel.closed
とするのが無難そうです。
(ここまで正しければ、(channel.closed or channel.eof_received)
にしてもよさそうですが、この背景を説明したうえでないと混乱を招きそうなので「無難」な方にします)
最後にrecv_exit_status
はブロッキングを伴いますが、channel.closed
がTrueになっている状態であれば何の問題もありません。(exit-statusメッセージが来るか、closeメッセージが来ると、状態が切り替わりブロッキングしなくなるはずなので)
受信処理をもう少しどうにかならんか
ノンブロッキングではありますが不必要にrecvを呼んだりビジーループだったりとイケてないです。データを受け取ったら処理してほしいです。
調べたところchannelがselectに対応しているのでselectで非同期処理をする方法ができるようです(今回採用してないのはもう疲れて調べる気力がなくなったため)