1. The One Billion Row Challenge とは?
10億ジュールになっている地域:温度データを読み取り、これを最も速い速度で処理するチャレンジです。
詳しくは?
github : https://github.com/gunnarmorling/1brc
2. はじめに
pythonを使って10億のデイタ―を作ります。
それでmeasurements.txtが生成されます。
#!/usr/bin/env python
#
# Copyright 2023 The original authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Based on https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CreateMeasurements.java
import os
import sys
import random
import time
def check_args(file_args):
"""
Sanity checks out input and prints out usage if input is not a positive integer
"""
try:
if len(file_args) != 2 or int(file_args[1]) <= 0:
raise Exception()
except:
print("Usage: create_measurements.sh <positive integer number of records to create>")
print(" You can use underscore notation for large number of records.")
print(" For example: 1_000_000_000 for one billion")
exit()
def build_weather_station_name_list():
"""
Grabs the weather station names from example data provided in repo and dedups
"""
station_names = []
with open('weather_stations.csv', 'r') as file:
file_contents = file.read()
for station in file_contents.splitlines():
if "#" in station:
next
else:
station_names.append(station.split(';')[0])
return list(set(station_names))
def convert_bytes(num):
"""
Convert bytes to a human-readable format (e.g., KiB, MiB, GiB)
"""
for x in ['bytes', 'KiB', 'MiB', 'GiB']:
if num < 1024.0:
return "%3.1f %s" % (num, x)
num /= 1024.0
def format_elapsed_time(seconds):
"""
Format elapsed time in a human-readable format
"""
if seconds < 60:
return f"{seconds:.3f} seconds"
elif seconds < 3600:
minutes, seconds = divmod(seconds, 60)
return f"{int(minutes)} minutes {int(seconds)} seconds"
else:
hours, remainder = divmod(seconds, 3600)
minutes, seconds = divmod(remainder, 60)
if minutes == 0:
return f"{int(hours)} hours {int(seconds)} seconds"
else:
return f"{int(hours)} hours {int(minutes)} minutes {int(seconds)} seconds"
def estimate_file_size(weather_station_names, num_rows_to_create):
"""
Tries to estimate how large a file the test data will be
"""
total_name_bytes = sum(len(s.encode("utf-8")) for s in weather_station_names)
avg_name_bytes = total_name_bytes / float(len(weather_station_names))
# avg_temp_bytes = sum(len(str(n / 10.0)) for n in range(-999, 1000)) / 1999
avg_temp_bytes = 4.400200100050025
# add 2 for separator and newline
avg_line_length = avg_name_bytes + avg_temp_bytes + 2
human_file_size = convert_bytes(num_rows_to_create * avg_line_length)
return f"Estimated max file size is: {human_file_size}."
def build_test_data(weather_station_names, num_rows_to_create):
"""
Generates and writes to file the requested length of test data
"""
start_time = time.time()
coldest_temp = -99.9
hottest_temp = 99.9
station_names_10k_max = random.choices(weather_station_names, k=10_000)
batch_size = 10000 # instead of writing line by line to file, process a batch of stations and put it to disk
chunks = num_rows_to_create // batch_size
print('Building test data...')
try:
with open("data/measurements.txt", 'w') as file:
progress = 0
for chunk in range(chunks):
batch = random.choices(station_names_10k_max, k=batch_size)
prepped_deviated_batch = '\n'.join([f"{station};{random.uniform(coldest_temp, hottest_temp):.1f}" for station in batch]) # :.1f should quicker than round on a large scale, because round utilizes mathematical operation
file.write(prepped_deviated_batch + '\n')
# Update progress bar every 1%
if (chunk + 1) * 100 // chunks != progress:
progress = (chunk + 1) * 100 // chunks
bars = '=' * (progress // 2)
sys.stdout.write(f"\r[{bars:<50}] {progress}%")
sys.stdout.flush()
sys.stdout.write('\n')
except Exception as e:
print("Something went wrong. Printing error info and exiting...")
print(e)
exit()
end_time = time.time()
elapsed_time = end_time - start_time
file_size = os.path.getsize("data/measurements.txt")
human_file_size = convert_bytes(file_size)
print("Test data successfully written to 1brc/data/measurements.txt")
print(f"Actual file size: {human_file_size}")
print(f"Elapsed time: {format_elapsed_time(elapsed_time)}")
def main():
"""
main program function
"""
check_args(sys.argv)
num_rows_to_create = int(sys.argv[1])
weather_station_names = []
weather_station_names = build_weather_station_name_list()
print(estimate_file_size(weather_station_names, num_rows_to_create))
build_test_data(weather_station_names, num_rows_to_create)
print("Test data build complete.")
if __name__ == "__main__":
main()
exit()
3. コード作成
txtファイルの10億行のデータを読み取り、各地域の温度の最大最小平均を求めるコードを作成し、これを出力します。
私のコードは
import os
import threading
from collections import defaultdict
from time import time
FILE = "./data/measurements.txt"
CHUNK_SIZE = 1024 * 1024 * 32
CORES = os.cpu_count()
class TemperatureAggregation:
def __init__(self):
self.sum = 0
self.count = 0
self.min_temp = None
self.max_temp = None
def merge(self, sum, count, min_temp, max_temp):
self.sum += sum
self.count += count
if self.min_temp is None or min_temp < self.min_temp:
self.min_temp = min_temp
if self.max_temp is None or max_temp > self.max_temp:
self.max_temp = max_temp
def __str__(self):
avg = self.sum / self.count if self.count else 0
return f"{self.min_temp:.1f}/{avg:.1f}/{self.max_temp:.1f}"
def process_chunk(chunk_start, chunk_end):
local_results = defaultdict(TemperatureAggregation)
with open(FILE, "r", encoding="utf-8", errors="ignore") as f:
f.seek(chunk_start)
data = f.read(chunk_end - chunk_start)
lines = data.split("\n")
for line in lines:
if line.strip():
try:
city, temp_str = line.split(";")
temp = float(temp_str)
if city not in local_results:
local_results[city] = TemperatureAggregation()
local_results[city].sum += temp
local_results[city].count += 1
if local_results[city].min_temp is None or temp < local_results[city].min_temp:
local_results[city].min_temp = temp
if local_results[city].max_temp is None or temp > local_results[city].max_temp:
local_results[city].max_temp = temp
except (ValueError, IndexError):
continue
return local_results
def calculate_stats(filepath):
file_size = os.path.getsize(filepath)
chunk_size = min(file_size, CHUNK_SIZE)
threads = []
thread_results = []
def worker(chunk_start, chunk_end):
result = process_chunk(chunk_start, chunk_end)
thread_results.append(result)
for i in range(CORES):
chunk_start = i * chunk_size
chunk_end = min(chunk_start + chunk_size, file_size)
t = threading.Thread(target=worker, args=(chunk_start, chunk_end))
threads.append(t)
t.start()
for t in threads:
t.join()
final_results = defaultdict(TemperatureAggregation)
for local_results in thread_results:
for city, agg in local_results.items():
if city not in final_results:
final_results[city] = TemperatureAggregation()
final_results[city].merge(agg.sum, agg.count, agg.min_temp, agg.max_temp)
return final_results
def print_stats(stats):
for city, agg in sorted(stats.items()):
print(f"{city}: {agg}")
if __name__ == "__main__":
start_time = time()
results = calculate_stats(FILE)
print_stats(results)
print(f"Execution time: {time() - start_time:.2f} seconds")
説明
txt ファイルを読み取り、それぞれ 32MB のチャンクにします。
チャンクを各スレッドで実行させた後、結果をマージします。
4. 結果
5. 後記.
他の大容量のデータ処理もこのような方式なのか気になりました。