パイプとかファイルディスクリプタとか、そーゆー低レイヤーのはなし | |
---|---|
boost::processの子プロセスの入力を閉じる | https://qiita.com/cielavenir/items/9219162170cf2dd8b144 |
サブプロセス処理をパイプを使わずに行う | https://qiita.com/cielavenir/items/0e69848b705fafec86e1 |
fstreamのファイルディスクリプタを取得する ムーブコンストラクタしかないクラスのprotectedメンバを呼び出す |
https://qiita.com/cielavenir/items/fe892c564e6b12983776 |
事情によりboostを使ってプログラムを書いているのだが、今回、パイプ処理が必要になり、boost::processを使うことにした。Debian stretchなのでboostは1.62であり、processは使用できない。1.64からprocessディレクトリを拝借できた。header-onlyで本当に助かった…。
さて、子プロセスとストリームデータをパイプでやり取りするには、入力が終わり次第入力パイプを閉じなければならない。そうでないとデータの終端がわからないからだ。
PythonでいうPopen.stdin.close()、RubyでいうIO.close_write()が相当する。
boost::processではどうやるのか調べたところ、https://stackoverflow.com/questions/44239499/close-the-stdin-of-boostprocess-child に行き着いた。opstream::pipe().close()
すれば良いらしい。
ここまでわかったところでサンプルを作ってみた(明らかに当時のサンプルではなく記事向けのサンプルだが気にしないこと)。
※本題とは少し離れるが、読み取りか書き込みのどちらかを別スレッドにしているのは、子プロセスと大容量のデータをパイプでやり取りするとブロックしてしまうからである。
// g++ zstd.cpp -lboost_thread -lboost_system
// zstd < input_file | ./a.out (and compare printed string to original SHA1)
# include <iostream>
# include <boost/uuid/sha1.hpp>
# include <boost/algorithm/hex.hpp>
# include <boost/process/system.hpp>
# include <boost/process/io.hpp>
# include <boost/process/pipe.hpp>
# include <boost/iostreams/copy.hpp>
# include <boost/thread.hpp>
unsigned int byteswap32(unsigned int n){
const unsigned int m8=0x00ff00ff;
const unsigned int m16=0x0000ffff;
n=((n&m8)<<8)|((n>>8)&m8);
n=((n&m16)<<16)|((n>>16)&m16);
return n;
}
int main(){
std::stringstream ss;
{
boost::process::opstream enc;
boost::process::ipstream dec;
boost::process::child c("zstd -d", boost::process::std_in < enc, boost::process::std_out > dec);
# if 1
auto fpush=[&]{
/// write to enc ///
std::string content;
content.assign((std::istreambuf_iterator<char>(std::cin)),std::istreambuf_iterator<char>());
enc.write(content.data(),content.size());
/// flush enc ///
enc.flush(); // flush is immune to opstream::rdbuf() issue.
//enc.rdbuf()->sync();
enc.pipe().close();
};
boost::thread(boost::ref(fpush));
boost::iostreams::copy(dec, ss);
# else
auto fpop=[&]{
boost::iostreams::copy(dec, ss);
};
boost::thread(boost::ref(fpop));
std::string content;
content.assign((std::istreambuf_iterator<char>(std::cin)),std::istreambuf_iterator<char>());
enc.write(content.data(),content.size());
enc.flush();
enc.pipe().close();
# endif
c.wait();
}
{
boost::uuids::detail::sha1 hash;
unsigned int digest[5];
hash.process_bytes(ss.str().data(), ss.str().size());
hash.get_digest((boost::uuids::detail::sha1::digest_type)digest);
for(int i=0;i<5;i++)digest[i]=byteswap32(digest[i]);
std::string result;
boost::algorithm::hex((char*)digest, ((char*)digest) + 20, std::back_inserter(result));
std::cout << result << std::endl;
}
return 0;
}
ところが、(初版のものは)データが短絡してしまった。ストリームに書き込んだのが実際に子プロセスに送られる前に、パイプを閉じてしまったのである。子プロセスに送られたことを保証しなければならない。
pipe.hppを見てみたところ、basic_pipebufにこういうものが見つかった。
///Synchronizes the buffers with the associated character sequence
int sync() override { return this->_write_impl() ? 0 : -1; }
しかし、これを呼ぶには、opstreamの変数_bufを取り出さなければならない。rdbuf()メソッドで取り出せそうだったが、バグのため取り出すことができなかった。
このバグは1.70で修正されており、まとめて書くと次のようなものだ。
--- a/boost/process/pipe.hpp
+++ b/boost/process/pipe.hpp
@@ -250,7 +250,7 @@ template<
>
class basic_ipstream : public std::basic_istream<CharT, Traits>
{
- basic_pipebuf<CharT, Traits> _buf;
+ mutable basic_pipebuf<CharT, Traits> _buf;
public:
typedef basic_pipe<CharT, Traits> pipe_type;
@@ -262,7 +262,7 @@ public:
typedef typename Traits::off_type off_type ;
///Get access to the underlying stream_buf
- basic_pipebuf<CharT, Traits>* rdbuf() const {return _buf;};
+ basic_pipebuf<CharT, Traits>* rdbuf() const {return &_buf;};
///Default constructor.
basic_ipstream() : std::basic_istream<CharT, Traits>(nullptr)
@@ -326,7 +326,7 @@ template<
>
class basic_opstream : public std::basic_ostream<CharT, Traits>
{
- basic_pipebuf<CharT, Traits> _buf;
+ mutable basic_pipebuf<CharT, Traits> _buf;
public:
typedef basic_pipe<CharT, Traits> pipe_type;
@@ -338,7 +338,7 @@ public:
///Get access to the underlying stream_buf
- basic_pipebuf<CharT, Traits>* rdbuf() const {return _buf;};
+ basic_pipebuf<CharT, Traits>* rdbuf() const {return &_buf;};
///Default constructor.
basic_opstream() : std::basic_ostream<CharT, Traits>(nullptr)
@@ -401,7 +401,7 @@ template<
>
class basic_pstream : public std::basic_iostream<CharT, Traits>
{
- basic_pipebuf<CharT, Traits> _buf;
+ mutable basic_pipebuf<CharT, Traits> _buf;
public:
typedef basic_pipe<CharT, Traits> pipe_type;
@@ -413,7 +413,7 @@ public:
///Get access to the underlying stream_buf
- basic_pipebuf<CharT, Traits>* rdbuf() const {return _buf;};
+ basic_pipebuf<CharT, Traits>* rdbuf() const {return &_buf;};
///Default constructor.
basic_pstream() : std::basic_iostream<CharT, Traits>(nullptr)
これを踏まえて、opstream::rdbuf()->sync();
すれば良いことを見出した。
しかし、なぜこうも長期間バグが放置されていたのか気になってしまう。rdbuf()は実は不要なのではないか?
ここで私はあることを思い出した。opstream::flush()
である。競技プログラミングのリアクティブ形式で問題になるあれである(余談だがstd::endlは内部的にflushも行う(のであまり意識されないかもしれないが))。
https://cpprefjp.github.io/reference/ostream/basic_ostream/flush.html によると、安全にopstream::rdbuf()->sync()
を呼び出すという仕様のようだ。
こうして最終的にはboostに手を加えることなく解決することができた。
(先のサンプルでboost::iostreams::copy(std::cin, enc);
とあえて書かなかったのは、copyは内部的にflush相当のことを行っているらしいのでサンプルとしてふさわしくないから、という理由が明らかになるのである。)
なお、boost 1.71でopstream::close()
が追加されたが、直後のopstream::pipe().close()
は依然として必要という状況であった。パイプまで閉じてほしかった…。
ところでなぜpipe.hppのrdbuf()は壊れているにもかかわらずflush()は動作していたのだろうか。ここで、basic_streambuf::rdbufはvirtualでなく、basic_streambuf::syncはvirtualであることが意味をなしてくる。
iostreamは内部的にbasic_streambufを保持している。pipe.hppでいう_buf
の他に、例えばlibstdc++では、bits/basic_ios.hに_M_streambuf
がある。これはprotectedであるが、ostream::flush()
からは問題なく呼び出せたわけだ。