influxdb-rubyとは
InfluxDBの開発元のinfluxdataが出しているclient libraryです。
そのテストコードを通して、influxdbを少し理解していこうという試みの2回目です。
cases/async_client_spec.rb
非同期のデータ登録
...
describe "#write_point" do
it "sends writes to client" do
post_request = stub_request(:post, stub_url).to_return(status: 204)
(worker.max_post_points + 100).times do
subject.write_point('a', {})
end
# The timout code is fragile, and heavily dependent on system load
# (and scheduler decisions). On the CI, the system is less
# responsive and needs a bit more time.
timeout_stretch = ENV["TRAVIS"] == "true" ? 10 : 3
Timeout.timeout(timeout_stretch * worker.sleep_interval) do
subject.stop!
end
worker.threads.each do |t|
expect(t.stop?).to be true
end
# exact times can be 2 or 3 (because we have 3 worker threads),
# but cannot be less than 2 due to MAX_POST_POINTS limit
expect(post_request).to have_been_requested.at_least_times(2)
end
end
...
ある程度の時間の間(timeout_stretch * worker.sleep_interval
)に
writerからrequestをpostされていることを確認
...
describe "async options" do
let(:async_options) do
{
max_post_points: 10,
max_queue_size: 100,
num_worker_threads: 1,
sleep_interval: 0.5
}
end
subject { worker }
before { worker.stop! }
specify { expect(subject.max_post_points).to be 10 }
specify { expect(subject.max_queue_size).to be 100 }
specify { expect(subject.num_worker_threads).to be 1 }
specify { expect(subject.sleep_interval).to be_within(0.0001).of(0.5) }
end
...
非同期のworkerに
キューやインターバル時間を設定できることを確認
be_within(0.0001).of(0.5)
はインターバル時間が0.5秒でプラスマイナス0.0001秒までは許容するという意味
cases/query_cluster_spec.rb
cluster adminのユーザを作成、一覧参照、削除
(データベースを作成ということとは別)
cases/query_continuous_query_spec.rb
Continuous Queries
...
describe "#list_continuous_queries" do
let(:query) { "SHOW CONTINUOUS QUERIES" }
let(:database) { "testdb" }
let(:response) do
{ "results" => [{ "series" => [{ "name" => "otherdb", "columns" => %w(name query),
"values" => [["clicks_per_hour", "CREATE CONTINUOUS QUERY clicks_per_hour ON otherdb BEGIN SELECT count(name) INTO \"otherdb\".\"default\".clicksCount_1h FROM \"otherdb\".\"default\".clicks GROUP BY time(1h) END"]] },
{ "name" => "testdb", "columns" => %w(name query),
"values" => [["event_counts", "CREATE CONTINUOUS QUERY event_counts ON testdb BEGIN SELECT count(type) INTO \"testdb\".\"default\".typeCount_10m_byType FROM \"testdb\".\"default\".events GROUP BY time(10m), type END"]] }] }] }
end
let(:expected_result) do
[{ "name" => "event_counts", "query" => "CREATE CONTINUOUS QUERY event_counts ON testdb BEGIN SELECT count(type) INTO \"testdb\".\"default\".typeCount_10m_byType FROM \"testdb\".\"default\".events GROUP BY time(10m), type END" }]
end
before do
stub_request(:get, "http://influxdb.test:9999/query").with(
query: { u: "username", p: "password", q: query }
).to_return(body: JSON.generate(response), status: 200)
end
it "should GET a list of continuous queries for specified db only" do
expect(subject.list_continuous_queries(database)).to eq(expected_result)
end
end
...
testdbに登録したCONTINUOUS QUERYの定義を
list_continuous_queriesで取得できることを確認
...
describe "#create_continuous_query" do
let(:name) { "event_counts_per_10m_by_type" }
let(:database) { "testdb" }
let(:query) { "SELECT COUNT(type) INTO typeCount_10m_byType FROM events GROUP BY time(10m), type" }
let(:every_interval) { nil }
let(:for_interval) { nil }
let(:clause) do
c = "CREATE CONTINUOUS QUERY #{name} ON #{database}"
if every_interval && for_interval
c << " RESAMPLE EVERY #{every_interval} FOR #{for_interval}"
elsif every_interval
c << " RESAMPLE EVERY #{every_interval}"
elsif for_interval
c << " RESAMPLE FOR #{for_interval}"
end
c << " BEGIN\n#{query}\nEND"
end
before do
stub_request(:get, "http://influxdb.test:9999/query").with(
query: { u: "username", p: "password", q: clause }
)
end
context "without resampling" do
it "should GET to create a new continuous query" do
expect(subject.create_continuous_query(name, database, query)).to be_a(Net::HTTPOK)
end
end
context "with resampling" do
context "EVERY <interval>" do
let(:every_interval) { "10m" }
it "should GET to create a new continuous query" do
expect(subject.create_continuous_query(name, database, query, resample_every: every_interval)).to be_a(Net::HTTPOK)
end
end
context "FOR <interval>" do
let(:for_interval) { "7d" }
it "should GET to create a new continuous query" do
expect(subject.create_continuous_query(name, database, query, resample_for: for_interval)).to be_a(Net::HTTPOK)
end
end
context "EVERY <interval> FOR <interval>" do
let(:every_interval) { "5m" }
let(:for_interval) { "3w" }
it "should GET to create a new continuous query" do
expect(subject.create_continuous_query(name, database, query, resample_for: for_interval, resample_every: every_interval)).to be_a(Net::HTTPOK)
end
end
end
end
...
CONTINUOUS QUERYの登録と実行周期などのオプション設定を確認
cases/query_core_spec.rb
InfluxDB::Query::Coreは
builderやqueryやwrite_pointsなどの基本的なメソッドを持つが、
このspecではあまりテストコードを書かず、
InfluxDB::Query::Core内のメソッドを利用する
他の機能でテストコードを書いている感じ。
ただ、単体テスト的なものはあってもいいような、、、
...
describe "#query" do
let(:query) { "SELECT value FROM requests_per_minute WHERE time > 1437019900" }
let(:response) do
{ "results" => [{ "series" => [{ "name" => "requests_per_minute",
"columns" => %w(time value) }] }] }
end
before do
stub_request(:get, "http://influxdb.test:9999/query")
.with(query: { db: "database", precision: "s", u: "username", p: "password", q: query })
.to_return(body: JSON.generate(response), status: 200)
end
it "should handle responses with no values" do
# Some requests (such as trying to retrieve values from the future)
# return a result with no "values" key set.
expected_result = [{ "name" => "requests_per_minute", "tags" => nil, "values" => [] }]
expect(subject.query(query)).to eq(expected_result)
end
end
...
influxdbに送るqueryの送信と結果取得を確認
cases/query_database_spec.rb
databaseの作成、削除、一覧参照
...
describe "#create_database" do
describe "from param" do
let(:query) { "CREATE DATABASE foo" }
it "should GET to create a new database" do
expect(subject.create_database("foo")).to be_a(Net::HTTPOK)
end
end
describe "from config" do
let(:query) { "CREATE DATABASE database" }
it "should GET to create a new database using database name from config" do
expect(subject.create_database).to be_a(Net::HTTPOK)
end
end
end
...
databaseの作成を確認
cases/query_retention_policy_spec.rb
データの保持期間の設定。
InfluxDBはdatabaseが作成された時にRETENTION POLICYも
autogenを介して、設定する。
この自動生成は/var/lib/influxdb/meta
で無効化することもできる
...
describe "#create_retention_policy" do
context "default" do
let(:query) { "CREATE RETENTION POLICY \"1h.cpu\" ON foo DURATION 1h REPLICATION 2 DEFAULT" }
it "should GET to create a new database" do
expect(subject.create_retention_policy('1h.cpu', 'foo', '1h', 2, true)).to be_a(Net::HTTPOK)
end
end
context "non-default" do
let(:query) { "CREATE RETENTION POLICY \"1h.cpu\" ON foo DURATION 1h REPLICATION 2" }
it "should GET to create a new database" do
expect(subject.create_retention_policy('1h.cpu', 'foo', '1h', 2)).to be_a(Net::HTTPOK)
end
end
end
...
RETENTION POLICYの作成を確認。
databaseのデフォルトにする場合、第5引数にtrueを設定
cases/query_series_spec.rb
seriesの作成など(?)
テスト未作成
cases/query_shard_space_spec.rb
shard関連(?)
テスト未作成
cases/query_shard_spec.rb
shard関連(?)
テスト未作成
cases/query_user_spec.rb
ユーザの作成、削除、一覧参照や権限変更
...
describe "#create_database_user" do
let(:user) { 'useruser' }
let(:pass) { 'passpass' }
let(:db) { 'foo' }
let(:query) { "CREATE user #{user} WITH PASSWORD '#{pass}'; GRANT ALL ON #{db} TO #{user}" }
context "without specifying permissions" do
it "should GET to create a new database user with all permissions" do
expect(subject.create_database_user(db, user, pass)).to be_a(Net::HTTPOK)
end
end
context "with passing permission as argument" do
let(:permission) { :read }
let(:query) { "CREATE user #{user} WITH PASSWORD '#{pass}'; GRANT #{permission.to_s.upcase} ON #{db} TO #{user}" }
it "should GET to create a new database user with permission set" do
expect(subject.create_database_user(db, user, pass, permissions: permission)).to be_a(Net::HTTPOK)
end
end
end
...
ユーザの作成(権限設定付きのケースもあり)
cases/query_with_params_spec.rb
queryへのパラメータの設定
...
describe "#query_with_params" do
let(:query) { "select * from foo where bar > %{param}" }
let(:compiled_query) { subject.builder.build(query, query_params) }
context "with empty params hash" do
let(:query_params) { {} }
it { expect { compiled_query }.to raise_error ArgumentError }
end
context "with empty params array" do
let(:query_params) { [] }
it { expect { compiled_query }.to raise_error ArgumentError }
end
context "with empty params" do
let(:query_params) { nil }
it { expect { compiled_query }.to raise_error ArgumentError }
end
context "with simple params" do
let(:query_params) { { param: 42 } }
it { expect(compiled_query).to eq "select * from foo where bar > 42" }
end
context "string escaping" do
let(:query_params) { { param: "string" } }
it { expect(compiled_query).to eq "select * from foo where bar > 'string'" }
end
end
...
hash,arrayは設定できず、
stringは「'」で括られる
https://github.com/influxdata/influxdb-ruby/blob/master/lib/influxdb/query/builder.rb#L18-L27
cases/querying_issue_7000_spec.rb
https://github.com/influxdata/influxdb/issues/7000
で対応した内容へのテストコード。
https://github.com/mhodson-qxbranch/influxdb-ruby/commit/044a350dc91997b4d690a9293c7048ce90b1ea48
chunk_sizeパラメータが追加された
...
context "with multiple series with different tags" do
let(:args) { { chunk_size: 100 } }
let(:extra_params) { { chunked: "true", chunk_size: "100" } }
let(:response_line_1) do
{ "results" => [{ "series" => [{ "name" => "access_times.service_1", "tags" => { "code" => "200", "result" => "failure", "status" => "OK" }, "columns" => %w(time value), "values" => [["2015-07-08T07:15:22Z", 327]] }] }] }
end
let(:response_line_2) do
{ "results" => [{ "series" => [{ "name" => "access_times.service_1", "tags" => { "code" => "500", "result" => "failure", "status" => "Internal Server Error" }, "columns" => %w(time value), "values" => [["2015-07-08T06:15:22Z", 873]] }] }] }
end
let(:response_line_3) do
{ "results" => [{ "series" => [{ "name" => "access_times.service_2", "tags" => { "code" => "200", "result" => "failure", "status" => "OK" }, "columns" => %w(time value), "values" => [["2015-07-08T07:15:22Z", 943]] }] }] }
end
let(:response_line_4) do
{ "results" => [{ "series" => [{ "name" => "access_times.service_2", "tags" => { "code" => "500", "result" => "failure", "status" => "Internal Server Error" }, "columns" => %w(time value), "values" => [["2015-07-08T06:15:22Z", 606]] }] }] }
end
let(:response) do
JSON.generate(response_line_1) + "\n" + JSON.generate(response_line_2) + "\n" + JSON.generate(response_line_3) + "\n" + JSON.generate(response_line_4)
end
let(:expected_result) do
[{ "name" => "access_times.service_1", "tags" => { "code" => "200", "result" => "failure", "status" => "OK" }, "values" => [{ "time" => "2015-07-08T07:15:22Z", "value" => 327 }] },
{ "name" => "access_times.service_1", "tags" => { "code" => "500", "result" => "failure", "status" => "Internal Server Error" }, "values" => [{ "time" => "2015-07-08T06:15:22Z", "value" => 873 }] },
{ "name" => "access_times.service_2", "tags" => { "code" => "200", "result" => "failure", "status" => "OK" }, "values" => [{ "time" => "2015-07-08T07:15:22Z", "value" => 943 }] },
{ "name" => "access_times.service_2", "tags" => { "code" => "500", "result" => "failure", "status" => "Internal Server Error" }, "values" => [{ "time" => "2015-07-08T06:15:22Z", "value" => 606 }] }]
end
let(:query) { "SELECT * FROM /access_times.*/" }
it "should return array with 4 elements grouped by name and tags" do
expect(subject.query(query)).to eq(expected_result)
end
end
...
1つのレスポンスデータで複数tagで
正規表現指定の複数テーブルからの結果を取得できる
cases/querying_spec.rb
リクエストのqueryとそのレスポンス
...
before do
stub_request(:get, "http://influxdb.test:9999/query")
.with(query: { q: query, u: "username", p: "password", precision: 's', db: database }.merge(extra_params))
.to_return(body: JSON.generate(response))
end
...
context "with single series with multiple points" do
let(:response) do
{ "results" => [{ "series" => [{ "name" => "cpu", "tags" => { "region" => "us" },
"columns" => %w(time temp value),
"values" => [["2015-07-07T14:58:37Z", 92, 0.3445], ["2015-07-07T14:59:09Z", 68, 0.8787]] }] }] }
end
let(:expected_result) do
[{ "name" => "cpu", "tags" => { "region" => "us" },
"values" => [{ "time" => "2015-07-07T14:58:37Z", "temp" => 92, "value" => 0.3445 },
{ "time" => "2015-07-07T14:59:09Z", "temp" => 68, "value" => 0.8787 }] }]
end
let(:query) { 'SELECT * FROM cpu' }
it "should return array with single hash containing multiple values" do
expect(subject.query(query)).to eq(expected_result)
end
end
...
queryメソッドが想定通りに動くかどうかの確認。
JSON.generate(response)とexpected_resultを突き合わせているので、
resultsから1つのcpuというseriesのデータがjsonから
取り出せているかの確認ぐらいなので、
どちらかというとレスポンスデータの扱いに関するテストにも見える。
このテストコード以降の
- with series with different tags
- with multiple series with different tags
- with multiple series for explicit value only
なども
responseとexpected_resultを切り替えているだけなので、そう見える
...
context "with epoch set to seconds" do
let(:args) { { epoch: 's' } }
let(:extra_params) { { epoch: 's' } }
let(:response) do
{ "results" => [{ "series" => [{ "name" => "cpu", "tags" => { "region" => "pl" }, "columns" => %w(time temp value), "values" => [[1_438_580_576, 34, 0.343443]] },
{ "name" => "cpu", "tags" => { "region" => "us" }, "columns" => %w(time temp value), "values" => [[1_438_612_976, 92, 0.3445], [1_438_612_989, 68, 0.8787]] }] }] }
end
let(:expected_result) do
[{ "name" => "cpu", "tags" => { "region" => "pl" },
"values" => [{ "time" => 1_438_580_576, "temp" => 34, "value" => 0.343443 }] },
{ "name" => "cpu", "tags" => { "region" => "us" },
"values" => [{ "time" => 1_438_612_976, "temp" => 92, "value" => 0.3445 },
{ "time" => 1_438_612_989, "temp" => 68, "value" => 0.8787 }] }]
end
let(:query) { 'SELECT * FROM cpu' }
it "should return results with integer timestamp" do
expect(subject.query(query)).to eq(expected_result)
end
end
...
Epoch timeの指定もできる
Epoch time is the amount of time that has elapsed since 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970.
sは秒
cases/retry_requests_spec.rb
リクエストのリトライ
...
context "when retry is 'n'" do
let(:args) { { retry: 3 } }
it "raise error after 'n' attemps" do
expect(client).to receive(:sleep).exactly(3).times
expect { subject }.to raise_error(InfluxDB::ConnectionError) do |e|
expect(e.cause).to be_an_instance_of(Timeout::Error)
end
end
end
...
指定回数分のリトライを行ったかを確認
...
context "when retry is -1" do
let(:args) { { retry: -1 } }
before do
stub_request(:post, "http://influxdb.test:9999/write")
.with(
query: { u: "username", p: "password", precision: 's', db: database },
headers: { "Content-Type" => "application/octet-stream" },
body: body
)
.to_raise(Timeout::Error).then
.to_raise(Timeout::Error).then
.to_raise(Timeout::Error).then
.to_raise(Timeout::Error).then
.to_return(status: 204)
end
it "keep trying until get the connection" do
expect(client).to receive(:sleep).exactly(4).times
expect { subject }.to_not raise_error
end
end
...
retryオプションに-1を設定した場合は
retryし続けることを確認
cases/udp_client_spec.rb
udpでデータ登録
...
let(:client) { described_class.new(udp: { host: "localhost", port: 44_444 }) }
specify { expect(client.writer).to be_a(InfluxDB::Writer::UDP) }
describe "#write" do
let(:message) { 'responses,region=eu value=5i' }
it "sends a UPD packet" do
s = UDPSocket.new
s.bind("localhost", 44_444)
client.write_point("responses", values: { value: 5 }, tags: { region: 'eu' })
rec_message = s.recvfrom(30).first
expect(rec_message).to eq message
end
end
...
ローカルにあけたudpソケットを介して、
UDPでデータ登録ができそうかを確認
cases/write_points_spec.rb
値の登録
...
describe "#write_point" do
let(:series) { "cpu" }
let(:data) do
{ tags: { region: 'us', host: 'server_1' },
values: { temp: 88, value: 54 } }
end
let(:body) do
InfluxDB::PointValue.new(data.merge(series: series)).dump
end
before do
stub_request(:post, "http://influxdb.test:9999/write").with(
query: { u: "username", p: "password", precision: 's', db: database },
headers: { "Content-Type" => "application/octet-stream" },
body: body
).to_return(status: 204)
end
it "should POST to add single point" do
expect(subject.write_point(series, data)).to be_a(Net::HTTPNoContent)
end
it "should not mutate data object" do
original_data = data
subject.write_point(series, data)
expect(data[:series]).to be_nil
expect(original_data).to eql(data)
end
end
...
1つのデータだけが登録できるwrite_pointの確認
...
describe "#write_points" do
context "with multiple series" do
let(:data) do
[{ series: 'cpu',
tags: { region: 'us', host: 'server_1' },
values: { temp: 88, value: 54 } },
{ series: 'gpu',
tags: { region: 'uk', host: 'server_5' },
values: { value: 0.5435345 } }]
end
let(:body) do
data.map do |point|
InfluxDB::PointValue.new(point).dump
end.join("\n")
end
before do
stub_request(:post, "http://influxdb.test:9999/write").with(
query: { u: "username", p: "password", precision: 's', db: database },
headers: { "Content-Type" => "application/octet-stream" },
body: body
).to_return(status: 204)
end
it "should POST multiple points" do
expect(subject.write_points(data)).to be_a(Net::HTTPNoContent)
end
end
...
...
複数seriesへのデータ登録を確認
終わりに
influxdb自体のテストではなく、
influxdbを利用するクライアント側のテストですので、
stubを利用して想定通りにパラメータやconfigの設定ができる、
という確認が多かったように思えます。