UNIXのパイプの延長で、
- 書き込み側プロセスはひとつ
- 読み込み側プロセスは複数
- 書き込み側プロセスで行を書き込むと、読み込み側プロセス全て書き込んだ行が出力される
- 読み込み側プロセスは任意のタイミングで読み込みを開始できる(読み込みを開始した以降に書き込みされた行を読み込む)
という挙動のものがほしい。
(pub/subとはちょっと違うかもしれない)
これはtail -f -n 0
で達成できますが、何らかの方法でファイルをときどき消すなどしてやらないとファイルが無限にでかくなっていきます。
UNIXドメインソケットを使うと良いかもと思ったのですが、UNIXドメインソケットを読み書きできるいい感じのコマンドがなかったのでgoで実装してみました。
コード
pub
pub.go
package main
import (
"bufio"
"io"
"log"
"net"
"os"
)
var connections = make(map[net.Conn]bool)
func main() {
go acceptConnections()
stdin := bufio.NewReader(os.Stdin)
for {
b, err := stdin.ReadByte()
if err == io.EOF {
break
}
for conn := range connections {
_, err := conn.Write([]byte{b})
if err != nil {
log.Println("socket write error:", err)
conn.Close()
delete(connections, conn)
log.Println("connections count: ", len(connections))
}
}
//log.Println("wrote:", b)
}
}
func acceptConnections() {
listener, err := net.Listen("unix", "./sock")
if err != nil {
log.Println("Listen error: ", err)
return
}
defer listener.Close()
for {
conn, err := listener.Accept()
if err != nil {
log.Println("Accept error: ", err)
continue
}
log.Println("Accepted")
connections[conn] = true
log.Println("connections count: ", len(connections))
}
}
sub
sub.go
package main
import (
"fmt"
"net"
)
func main() {
conn, err := net.Dial("unix", "./sock")
if err != nil {
fmt.Printf("Dial error: %s\n", err)
return
}
defer conn.Close()
buf := make([]byte, 1024)
for {
n, err := conn.Read(buf)
if n == 0 {
break
}
if err != nil {
fmt.Printf("Read error: %s\n", err)
}
fmt.Print(string(buf[:n]))
}
}
使ってみる
ビルドします。
ビルド
$ go build pub.go
$ go build sub.go
書き込み側プロセスを起動します。
1秒ごとに時刻を出力するワンライナーの出力をパイプでつないでみます。
送信側
$ while :; do date; sleep 1; done | ./pub
読み込み側のプロセスを起動すると、date
の結果がずらずらと流れてきます。
受信側
$ ./sub
2019年 1月27日 日曜日 05時47分16秒 JST
2019年 1月27日 日曜日 05時47分17秒 JST
2019年 1月27日 日曜日 05時47分19秒 JST
2019年 1月27日 日曜日 05時47分20秒 JST
別のターミナルを起動してもうひとつ./sub
をたちあげると、起動したタイミング以降に書き込まれたdate
の結果がずらずらと流れてきます。
ToDo
- 終了処理をちゃんとやる
- UNIXドメインソケットのパスを引数で指定できるようにする
- 入出力が逆バージョンもつくったら便利なのでは、複数のセンサの値を読んでひとつのログに書き出したりとかに使えそう
- コマンド名はやはりpub/subではない気がするし、なんか考える