0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

specから理解するinfluxdb-ruby gemの機能 2

Posted at

influxdb-rubyとは
InfluxDBの開発元のinfluxdataが出しているclient libraryです。
そのテストコードを通して、influxdbを少し理解していこうという試みの2回目です。

cases/async_client_spec.rb

非同期のデータ登録

...
  describe "#write_point" do
    it "sends writes to client" do
      post_request = stub_request(:post, stub_url).to_return(status: 204)

      (worker.max_post_points + 100).times do
        subject.write_point('a', {})
      end

      # The timout code is fragile, and heavily dependent on system load
      # (and scheduler decisions). On the CI, the system is less
      # responsive and needs a bit more time.
      timeout_stretch = ENV["TRAVIS"] == "true" ? 10 : 3

      Timeout.timeout(timeout_stretch * worker.sleep_interval) do
        subject.stop!
      end

      worker.threads.each do |t|
        expect(t.stop?).to be true
      end

      # exact times can be 2 or 3 (because we have 3 worker threads),
      # but cannot be less than 2 due to MAX_POST_POINTS limit
      expect(post_request).to have_been_requested.at_least_times(2)
    end
  end
...

ある程度の時間の間(timeout_stretch * worker.sleep_interval)に
writerからrequestをpostされていることを確認

...
  describe "async options" do
    let(:async_options) do
      {
        max_post_points:    10,
        max_queue_size:     100,
        num_worker_threads: 1,
        sleep_interval:     0.5
      }
    end

    subject { worker }
    before { worker.stop! }

    specify { expect(subject.max_post_points).to be 10 }
    specify { expect(subject.max_queue_size).to be 100 }
    specify { expect(subject.num_worker_threads).to be 1 }
    specify { expect(subject.sleep_interval).to be_within(0.0001).of(0.5) }
  end
...

非同期のworkerに
キューやインターバル時間を設定できることを確認
be_within(0.0001).of(0.5)
はインターバル時間が0.5秒でプラスマイナス0.0001秒までは許容するという意味

cases/query_cluster_spec.rb

cluster adminのユーザを作成、一覧参照、削除
(データベースを作成ということとは別)

cases/query_continuous_query_spec.rb

Continuous Queries

...
  describe "#list_continuous_queries" do
    let(:query) { "SHOW CONTINUOUS QUERIES" }
    let(:database) { "testdb" }
    let(:response) do
      { "results" => [{ "series" => [{ "name" => "otherdb", "columns" => %w(name query),
                                       "values" => [["clicks_per_hour", "CREATE CONTINUOUS QUERY clicks_per_hour ON otherdb BEGIN SELECT count(name) INTO \"otherdb\".\"default\".clicksCount_1h FROM \"otherdb\".\"default\".clicks GROUP BY time(1h) END"]] },
                                     { "name" => "testdb", "columns" => %w(name query),
                                       "values" => [["event_counts", "CREATE CONTINUOUS QUERY event_counts ON testdb BEGIN SELECT count(type) INTO \"testdb\".\"default\".typeCount_10m_byType FROM \"testdb\".\"default\".events GROUP BY time(10m), type END"]] }] }] }
    end

    let(:expected_result) do
      [{ "name" => "event_counts", "query" => "CREATE CONTINUOUS QUERY event_counts ON testdb BEGIN SELECT count(type) INTO \"testdb\".\"default\".typeCount_10m_byType FROM \"testdb\".\"default\".events GROUP BY time(10m), type END" }]
    end

    before do
      stub_request(:get, "http://influxdb.test:9999/query").with(
        query: { u: "username", p: "password", q: query }
      ).to_return(body: JSON.generate(response), status: 200)
    end

    it "should GET a list of continuous queries for specified db only" do
      expect(subject.list_continuous_queries(database)).to eq(expected_result)
    end
  end
...

testdbに登録したCONTINUOUS QUERYの定義を
list_continuous_queriesで取得できることを確認

...
  describe "#create_continuous_query" do
    let(:name)            { "event_counts_per_10m_by_type" }
    let(:database)        { "testdb" }
    let(:query)           { "SELECT COUNT(type) INTO typeCount_10m_byType FROM events GROUP BY time(10m), type" }
    let(:every_interval)  { nil }
    let(:for_interval)    { nil }

    let(:clause) do
      c = "CREATE CONTINUOUS QUERY #{name} ON #{database}"

      if every_interval && for_interval
        c << " RESAMPLE EVERY #{every_interval} FOR #{for_interval}"
      elsif every_interval
        c << " RESAMPLE EVERY #{every_interval}"
      elsif for_interval
        c << " RESAMPLE FOR #{for_interval}"
      end

      c << " BEGIN\n#{query}\nEND"
    end

    before do
      stub_request(:get, "http://influxdb.test:9999/query").with(
        query: { u: "username", p: "password", q: clause }
      )
    end

    context "without resampling" do
      it "should GET to create a new continuous query" do
        expect(subject.create_continuous_query(name, database, query)).to be_a(Net::HTTPOK)
      end
    end

    context "with resampling" do
      context "EVERY <interval>" do
        let(:every_interval) { "10m" }

        it "should GET to create a new continuous query" do
          expect(subject.create_continuous_query(name, database, query, resample_every: every_interval)).to be_a(Net::HTTPOK)
        end
      end

      context "FOR <interval>" do
        let(:for_interval) { "7d" }

        it "should GET to create a new continuous query" do
          expect(subject.create_continuous_query(name, database, query, resample_for: for_interval)).to be_a(Net::HTTPOK)
        end
      end

      context "EVERY <interval> FOR <interval>" do
        let(:every_interval)  { "5m" }
        let(:for_interval)    { "3w" }

        it "should GET to create a new continuous query" do
          expect(subject.create_continuous_query(name, database, query, resample_for: for_interval, resample_every: every_interval)).to be_a(Net::HTTPOK)
        end
      end
    end
  end
...

CONTINUOUS QUERYの登録と実行周期などのオプション設定を確認

cases/query_core_spec.rb

InfluxDB::Query::Coreは
builderやqueryやwrite_pointsなどの基本的なメソッドを持つが、
このspecではあまりテストコードを書かず、
InfluxDB::Query::Core内のメソッドを利用する
他の機能でテストコードを書いている感じ。
ただ、単体テスト的なものはあってもいいような、、、

...
  describe "#query" do
    let(:query) { "SELECT value FROM requests_per_minute WHERE time > 1437019900" }
    let(:response) do
      { "results" => [{ "series" => [{ "name" => "requests_per_minute",
                                       "columns" => %w(time value) }] }] }
    end

    before do
      stub_request(:get, "http://influxdb.test:9999/query")
        .with(query: { db: "database", precision: "s", u: "username", p: "password", q: query })
        .to_return(body: JSON.generate(response), status: 200)
    end

    it "should handle responses with no values" do
      # Some requests (such as trying to retrieve values from the future)
      # return a result with no "values" key set.
      expected_result = [{ "name" => "requests_per_minute", "tags" => nil, "values" => [] }]
      expect(subject.query(query)).to eq(expected_result)
    end
  end
...

influxdbに送るqueryの送信と結果取得を確認

cases/query_database_spec.rb

databaseの作成、削除、一覧参照

...
  describe "#create_database" do
    describe "from param" do
      let(:query) { "CREATE DATABASE foo" }

      it "should GET to create a new database" do
        expect(subject.create_database("foo")).to be_a(Net::HTTPOK)
      end
    end

    describe "from config" do
      let(:query) { "CREATE DATABASE database" }

      it "should GET to create a new database using database name from config" do
        expect(subject.create_database).to be_a(Net::HTTPOK)
      end
    end
  end
...

databaseの作成を確認

cases/query_retention_policy_spec.rb

データの保持期間の設定。
InfluxDBはdatabaseが作成された時にRETENTION POLICYも
autogenを介して、設定する。
この自動生成は/var/lib/influxdb/meta
で無効化することもできる

...
  describe "#create_retention_policy" do
    context "default" do
      let(:query) { "CREATE RETENTION POLICY \"1h.cpu\" ON foo DURATION 1h REPLICATION 2 DEFAULT" }

      it "should GET to create a new database" do
        expect(subject.create_retention_policy('1h.cpu', 'foo', '1h', 2, true)).to be_a(Net::HTTPOK)
      end
    end

    context "non-default" do
      let(:query) { "CREATE RETENTION POLICY \"1h.cpu\" ON foo DURATION 1h REPLICATION 2" }

      it "should GET to create a new database" do
        expect(subject.create_retention_policy('1h.cpu', 'foo', '1h', 2)).to be_a(Net::HTTPOK)
      end
    end
  end
...

RETENTION POLICYの作成を確認。
databaseのデフォルトにする場合、第5引数にtrueを設定

cases/query_series_spec.rb

seriesの作成など(?)
テスト未作成

cases/query_shard_space_spec.rb

shard関連(?)
テスト未作成

cases/query_shard_spec.rb

shard関連(?)
テスト未作成

cases/query_user_spec.rb

ユーザの作成、削除、一覧参照や権限変更

...
  describe "#create_database_user" do
    let(:user) { 'useruser' }
    let(:pass) { 'passpass' }
    let(:db) { 'foo' }
    let(:query) { "CREATE user #{user} WITH PASSWORD '#{pass}'; GRANT ALL ON #{db} TO #{user}" }

    context "without specifying permissions" do
      it "should GET to create a new database user with all permissions" do
        expect(subject.create_database_user(db, user, pass)).to be_a(Net::HTTPOK)
      end
    end

    context "with passing permission as argument" do
      let(:permission) { :read }
      let(:query) { "CREATE user #{user} WITH PASSWORD '#{pass}'; GRANT #{permission.to_s.upcase} ON #{db} TO #{user}" }

      it "should GET to create a new database user with permission set" do
        expect(subject.create_database_user(db, user, pass, permissions: permission)).to be_a(Net::HTTPOK)
      end
    end
  end
...

ユーザの作成(権限設定付きのケースもあり)

cases/query_with_params_spec.rb

queryへのパラメータの設定

...
  describe "#query_with_params" do
    let(:query)           { "select * from foo where bar > %{param}" }
    let(:compiled_query)  { subject.builder.build(query, query_params) }

    context "with empty params hash" do
      let(:query_params) { {} }
      it { expect { compiled_query }.to raise_error ArgumentError }
    end

    context "with empty params array" do
      let(:query_params) { [] }
      it { expect { compiled_query }.to raise_error ArgumentError }
    end

    context "with empty params" do
      let(:query_params) { nil }
      it { expect { compiled_query }.to raise_error ArgumentError }
    end

    context "with simple params" do
      let(:query_params) { { param: 42 } }
      it { expect(compiled_query).to eq "select * from foo where bar > 42" }
    end

    context "string escaping" do
      let(:query_params) { { param: "string" } }
      it { expect(compiled_query).to eq "select * from foo where bar > 'string'" }
    end
  end
...

hash,arrayは設定できず、
stringは「'」で括られる
https://github.com/influxdata/influxdb-ruby/blob/master/lib/influxdb/query/builder.rb#L18-L27

cases/querying_issue_7000_spec.rb

https://github.com/influxdata/influxdb/issues/7000
で対応した内容へのテストコード。
https://github.com/mhodson-qxbranch/influxdb-ruby/commit/044a350dc91997b4d690a9293c7048ce90b1ea48
chunk_sizeパラメータが追加された

...
    context "with multiple series with different tags" do
      let(:args) { { chunk_size: 100 } }
      let(:extra_params) { { chunked: "true", chunk_size: "100" } }

      let(:response_line_1) do
        { "results" => [{ "series" => [{ "name" => "access_times.service_1", "tags" => { "code" => "200", "result" => "failure", "status" => "OK" }, "columns" => %w(time value), "values" => [["2015-07-08T07:15:22Z", 327]] }] }] }
      end
      let(:response_line_2) do
        { "results" => [{ "series" => [{ "name" => "access_times.service_1", "tags" => { "code" => "500", "result" => "failure", "status" => "Internal Server Error" }, "columns" => %w(time value), "values" => [["2015-07-08T06:15:22Z", 873]] }] }] }
      end
      let(:response_line_3) do
        { "results" => [{ "series" => [{ "name" => "access_times.service_2", "tags" => { "code" => "200", "result" => "failure", "status" => "OK" }, "columns" => %w(time value), "values" => [["2015-07-08T07:15:22Z", 943]] }] }] }
      end
      let(:response_line_4) do
        { "results" => [{ "series" => [{ "name" => "access_times.service_2", "tags" => { "code" => "500", "result" => "failure", "status" => "Internal Server Error" }, "columns" => %w(time value), "values" => [["2015-07-08T06:15:22Z", 606]] }] }] }
      end
      let(:response) do
        JSON.generate(response_line_1) + "\n" + JSON.generate(response_line_2) + "\n" + JSON.generate(response_line_3) + "\n" + JSON.generate(response_line_4)
      end
      let(:expected_result) do
        [{ "name" => "access_times.service_1", "tags" => { "code" => "200", "result" => "failure", "status" => "OK" }, "values" => [{ "time" => "2015-07-08T07:15:22Z", "value" => 327 }] },
         { "name" => "access_times.service_1", "tags" => { "code" => "500", "result" => "failure", "status" => "Internal Server Error" }, "values" => [{ "time" => "2015-07-08T06:15:22Z", "value" => 873 }] },
         { "name" => "access_times.service_2", "tags" => { "code" => "200", "result" => "failure", "status" => "OK" }, "values" => [{ "time" => "2015-07-08T07:15:22Z", "value" => 943 }] },
         { "name" => "access_times.service_2", "tags" => { "code" => "500", "result" => "failure", "status" => "Internal Server Error" }, "values" => [{ "time" => "2015-07-08T06:15:22Z", "value" => 606 }] }]
      end
      let(:query) { "SELECT * FROM /access_times.*/" }

      it "should return array with 4 elements grouped by name and tags" do
        expect(subject.query(query)).to eq(expected_result)
      end
    end
...

1つのレスポンスデータで複数tagで
正規表現指定の複数テーブルからの結果を取得できる

cases/querying_spec.rb

リクエストのqueryとそのレスポンス

...
  before do
    stub_request(:get, "http://influxdb.test:9999/query")
      .with(query: { q: query, u: "username", p: "password", precision: 's', db: database }.merge(extra_params))
      .to_return(body: JSON.generate(response))
  end
...
    context "with single series with multiple points" do
      let(:response) do
        { "results" => [{ "series" => [{ "name" => "cpu", "tags" => { "region" => "us" },
                                         "columns" => %w(time temp value),
                                         "values" => [["2015-07-07T14:58:37Z", 92, 0.3445], ["2015-07-07T14:59:09Z", 68, 0.8787]] }] }] }
      end
      let(:expected_result) do
        [{ "name" => "cpu", "tags" => { "region" => "us" },
           "values" => [{ "time" => "2015-07-07T14:58:37Z", "temp" => 92, "value" => 0.3445 },
                        { "time" => "2015-07-07T14:59:09Z", "temp" => 68, "value" => 0.8787 }] }]
      end
      let(:query) { 'SELECT * FROM cpu' }

      it "should return array with single hash containing multiple values" do
        expect(subject.query(query)).to eq(expected_result)
      end
    end
...

queryメソッドが想定通りに動くかどうかの確認。
JSON.generate(response)とexpected_resultを突き合わせているので、
resultsから1つのcpuというseriesのデータがjsonから
取り出せているかの確認ぐらいなので、
どちらかというとレスポンスデータの扱いに関するテストにも見える。
このテストコード以降の

  • with series with different tags
  • with multiple series with different tags
  • with multiple series for explicit value only
    なども
    responseとexpected_resultを切り替えているだけなので、そう見える
...
    context "with epoch set to seconds" do
      let(:args) { { epoch: 's' } }
      let(:extra_params) { { epoch: 's' } }

      let(:response) do
        { "results" => [{ "series" => [{ "name" => "cpu", "tags" => { "region" => "pl" }, "columns" => %w(time temp value), "values" => [[1_438_580_576, 34, 0.343443]] },
                                       { "name" => "cpu", "tags" => { "region" => "us" }, "columns" => %w(time temp value), "values" => [[1_438_612_976, 92, 0.3445], [1_438_612_989, 68, 0.8787]] }] }] }
      end
      let(:expected_result) do
        [{ "name" => "cpu", "tags" => { "region" => "pl" },
           "values" => [{ "time" => 1_438_580_576, "temp" => 34, "value" => 0.343443 }] },
         { "name" => "cpu", "tags" => { "region" => "us" },
           "values" => [{ "time" => 1_438_612_976, "temp" => 92, "value" => 0.3445 },
                        { "time" => 1_438_612_989, "temp" => 68, "value" => 0.8787 }] }]
      end
      let(:query) { 'SELECT * FROM cpu' }

      it "should return results with integer timestamp" do
        expect(subject.query(query)).to eq(expected_result)
      end
    end
...

Epoch timeの指定もできる

Epoch time is the amount of time that has elapsed since 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970.
sは秒

cases/retry_requests_spec.rb

リクエストのリトライ

...
    context "when retry is 'n'" do
      let(:args) { { retry: 3 } }

      it "raise error after 'n' attemps" do
        expect(client).to receive(:sleep).exactly(3).times
        expect { subject }.to raise_error(InfluxDB::ConnectionError) do |e|
          expect(e.cause).to be_an_instance_of(Timeout::Error)
        end
      end
    end
...

指定回数分のリトライを行ったかを確認

...
    context "when retry is -1" do
      let(:args) { { retry: -1 } }
      before do
        stub_request(:post, "http://influxdb.test:9999/write")
          .with(
            query: { u: "username", p: "password", precision: 's', db: database },
            headers: { "Content-Type" => "application/octet-stream" },
            body: body
          )
          .to_raise(Timeout::Error).then
          .to_raise(Timeout::Error).then
          .to_raise(Timeout::Error).then
          .to_raise(Timeout::Error).then
          .to_return(status: 204)
      end

      it "keep trying until get the connection" do
        expect(client).to receive(:sleep).exactly(4).times
        expect { subject }.to_not raise_error
      end
    end
...

retryオプションに-1を設定した場合は
retryし続けることを確認

cases/udp_client_spec.rb

udpでデータ登録

...
  let(:client) { described_class.new(udp: { host: "localhost", port: 44_444 }) }

  specify { expect(client.writer).to be_a(InfluxDB::Writer::UDP) }

  describe "#write" do
    let(:message) { 'responses,region=eu value=5i' }

    it "sends a UPD packet" do
      s = UDPSocket.new
      s.bind("localhost", 44_444)

      client.write_point("responses", values: { value: 5 }, tags: { region: 'eu' })

      rec_message = s.recvfrom(30).first
      expect(rec_message).to eq message
    end
  end
...

ローカルにあけたudpソケットを介して、
UDPでデータ登録ができそうかを確認

cases/write_points_spec.rb

値の登録

...
  describe "#write_point" do
    let(:series) { "cpu" }
    let(:data) do
      { tags: { region: 'us', host: 'server_1' },
        values: { temp: 88, value: 54 } }
    end
    let(:body) do
      InfluxDB::PointValue.new(data.merge(series: series)).dump
    end

    before do
      stub_request(:post, "http://influxdb.test:9999/write").with(
        query: { u: "username", p: "password", precision: 's', db: database },
        headers: { "Content-Type" => "application/octet-stream" },
        body: body
      ).to_return(status: 204)
    end

    it "should POST to add single point" do
      expect(subject.write_point(series, data)).to be_a(Net::HTTPNoContent)
    end

    it "should not mutate data object" do
      original_data = data
      subject.write_point(series, data)
      expect(data[:series]).to be_nil
      expect(original_data).to eql(data)
    end
  end
...

1つのデータだけが登録できるwrite_pointの確認

...
describe "#write_points" do
    context "with multiple series" do
      let(:data) do
        [{ series: 'cpu',
           tags: { region: 'us', host: 'server_1' },
           values: { temp: 88, value: 54 } },
         { series: 'gpu',
           tags: { region: 'uk', host: 'server_5' },
           values: { value: 0.5435345 } }]
      end
      let(:body) do
        data.map do |point|
          InfluxDB::PointValue.new(point).dump
        end.join("\n")
      end

      before do
        stub_request(:post, "http://influxdb.test:9999/write").with(
          query: { u: "username", p: "password", precision: 's', db: database },
          headers: { "Content-Type" => "application/octet-stream" },
          body: body
        ).to_return(status: 204)
      end

      it "should POST multiple points" do
        expect(subject.write_points(data)).to be_a(Net::HTTPNoContent)
      end
    end
  ...
...

複数seriesへのデータ登録を確認

終わりに

influxdb自体のテストではなく、
influxdbを利用するクライアント側のテストですので、
stubを利用して想定通りにパラメータやconfigの設定ができる、
という確認が多かったように思えます。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?