LoginSignup
5
5

More than 5 years have passed since last update.

SizedQueue を使った無駄撃ちのない地理院タイルダウンローダ(Ruby)

Last updated at Posted at 2015-02-24

ttp://cyberjapandata.gsi.go.jp/xyz/std/mokuroku.csv.gz をダウンロードしたフォルダで、下記スクリプト(qdl.rb)を実行する。あとはあとで書く。LICENSEはCC0で。

require 'open-uri'
require 'digest/md5'
require 'fileutils'
require 'zlib'
require 'thread'

Z_EXTENT = (18..18)
T = 'std'
N_THREADS = 8
Q_SIZE = 200
WAIT = 5

$threads = Array.new(N_THREADS)
$status = {:skip => 0, :ok => 0, :ng => 0, :path => nil}
$q = SizedQueue.new(Q_SIZE)

$threads.size.times {|i|
  $threads[i] = Thread.new(i) do
    while o = $q.pop
      buf = open(o[:url]).read
      buf_md5 = Digest::MD5.hexdigest(buf)
      if o[:md5] != buf_md5
        $status[:ng] += 1
        FileUtils.rm(o[:path]) if File.exist?(o[:path])
      else
        [File.dirname(o[:path])].each{|it|
          FileUtils.mkdir_p(it) unless File.directory?(it)
        }
        File.open("#{o[:path]}", 'w') {|w| w.print buf}
        $status[:ok] += 1
      end
    end
  end
}
watcher = Thread.new do
  while $threads.reduce(false) {|any_alive, t| any_alive or t.alive?}
    last_status = $status.clone
    sleep WAIT
    print <<-EOS
#{Time.now.iso8601[11..18]} #{$status[:path]} #{$q.size} \
#{%w{skip ok ng}.map{|k| ($status[k.to_sym] - last_status[k.to_sym]) / WAIT}}\
/s #{%w{skip ok ng}.map{|k| $status[k.to_sym]}}
    EOS
  end
end

Zlib::GzipReader.open('mokuroku.csv.gz').each_line {|l|
  (path, date, size, md5) = l.strip.split(',')
  url = "http://cyberjapandata.gsi.go.jp/xyz/#{T}/#{path}"
  $status[:path] = path
  if (!Z_EXTENT.include?(path.split('/')[0].to_i)) or
      (File.exist?("#{path}") && Digest::MD5.file(path) == md5)
    $status[:skip] += 1
    next
  end
  $q.push({:url => url, :md5 => md5, :path => path})
}

$threads.size.times {|i| $q.push(nil)}

$threads.each {|t| t.join}
watcher.join

パフォーマンスチェック

手元の計算機環境はMacBook Air 11-inch, Mid 2011 OS X Yosemite。使用しているディスクはUSB接続の安売りのUSB 3.0ハードディスク。

  • Z_EXTENT 外のパスをスキップするフェーズではだいたい8万レコード/s
  • md5sum チェックして全部スキップするフェーズではだいたい200レコード/s
  • ダウンロードするフェーズではだいたい80レコード/s(但しz=18)なお、OS Xのアクティビティモニタレベルで、毎秒データ受信は調子が良いときで350KB程度。また、ダウンロードするファイルがある「鉱脈」にぶつかると、一瞬で SizedQueue はいっぱいになる。つまり、mokuroku をチェックする速度は、ダウンロードを行う速度に比べて十分に速い。

std の mokuroku.csv.gz のレコード数は4227万8138だから、

  • Z_EXTENT 外のパスをスキップする速度で528秒(9分弱)
  • md5sum チェックしてスキップする速度で21万1390秒(60時間=2日半)
  • ダウンロードする速度で52万8476秒(150時間=6日6時間)

課題

  • tmp を多く使ってしまう感じ(未確認)。
  • タイルあたりファイルサイズが大きいズームレベルでは、ややネットワーク帯域に負担を与えすぎてしまう可能性が残る。

行数指定で再開できる改造

ダウンロードも後期段階に入ってくると、行数指定で再開する必要が発生するので、その改造を行った。定数 CONTINUE に行数を入れると、そこまでは無条件でスキップするようにした。これから先の改造は、GitHub で行っていくことが適当かと思っている。

require 'open-uri'
require 'digest/md5'
require 'fileutils'
require 'zlib'
require 'thread'

Z_EXTENT = (18..18)
T = 'std'
N_THREADS = 8
Q_SIZE = 200
WAIT = 5
CONTINUE = 23549708

$threads = Array.new(N_THREADS)
$status = {:skip => 0, :ok => 0, :ng => 0, :path => nil}
$q = SizedQueue.new(Q_SIZE)

$threads.size.times {|i|
  $threads[i] = Thread.new(i) do
    while o = $q.pop
      buf = open(o[:url]).read
      buf_md5 = Digest::MD5.hexdigest(buf)
      if o[:md5] != buf_md5
        $status[:ng] += 1
        FileUtils.rm(o[:path]) if File.exist?(o[:path])
      else
        [File.dirname(o[:path])].each{|it|
          FileUtils.mkdir_p(it) unless File.directory?(it)
        }
        File.open("#{o[:path]}", 'w') {|w| w.print buf}
        $status[:ok] += 1
      end
    end
  end
}
watcher = Thread.new do
  while $threads.reduce(false) {|any_alive, t| any_alive or t.alive?}
    last_status = $status.clone
    sleep WAIT
    print <<-EOS
#{Time.now.iso8601[11..18]} #{$status[:path]} #{$q.size} \
#{%w{skip ok ng}.map{|k| ($status[k.to_sym] - last_status[k.to_sym]) / WAIT}}\
/s #{%w{skip ok ng}.map{|k| $status[k.to_sym]}} #{$count}
    EOS
  end
end

$count = 0
Zlib::GzipReader.open('mokuroku.csv.gz').each_line {|l|
  $count += 1
  (path, date, size, md5) = l.strip.split(',')
  url = "http://cyberjapandata.gsi.go.jp/xyz/#{T}/#{path}"
  $status[:path] = path
  if ((CONTINUE ? $count < CONTINUE : false) || 
      !Z_EXTENT.include?(path.split('/')[0].to_i)) ||
      (File.exist?("#{path}") && Digest::MD5.file(path) == md5)
    $status[:skip] += 1
    next
  end
  $q.push({:url => url, :md5 => md5, :path => path})
}

$threads.size.times {|i| $q.push(nil)}

$threads.each {|t| t.join}
watcher.join

スレッド数についての実験

N_THREADS は 8 としたが、これを 16 にすると、USB 3.0 HDD で実施している場合ではあるが、かえって通信量の時系列がいびつになる結果となった。多分USB HDDの性能を追い越しているのだろう。高性能なHDDをつなぐなどしない限り、N_THREADS を上げることで帯域を食いつくすようなダウンロードはできないことが分かった。

5
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
5