0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

2024流行ったThe One Billion Row ChallengeをPythonでしました。

Posted at

image.png

1. The One Billion Row Challenge とは?

10億ジュールになっている地域:温度データを読み取り、これを最も速い速度で処理するチャレンジです。

詳しくは?

github : https://github.com/gunnarmorling/1brc

2. はじめに

pythonを使って10億のデイタ―を作ります。
それでmeasurements.txtが生成されます。

#!/usr/bin/env python
#
#  Copyright 2023 The original authors
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#


# Based on https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CreateMeasurements.java


import os
import sys
import random
import time



def check_args(file_args):
    """
    Sanity checks out input and prints out usage if input is not a positive integer
    """
    try:
        if len(file_args) != 2 or int(file_args[1]) <= 0:
            raise Exception()
    except:
        print("Usage:  create_measurements.sh <positive integer number of records to create>")
        print("        You can use underscore notation for large number of records.")
        print("        For example:  1_000_000_000 for one billion")
        exit()



def build_weather_station_name_list():
    """
    Grabs the weather station names from example data provided in repo and dedups
    """
    station_names = []
    with open('weather_stations.csv', 'r') as file:
        file_contents = file.read()
    for station in file_contents.splitlines():
        if "#" in station:
            next
        else:
            station_names.append(station.split(';')[0])
    return list(set(station_names))



def convert_bytes(num):
    """
    Convert bytes to a human-readable format (e.g., KiB, MiB, GiB)
    """
    for x in ['bytes', 'KiB', 'MiB', 'GiB']:
        if num < 1024.0:
            return "%3.1f %s" % (num, x)
        num /= 1024.0



def format_elapsed_time(seconds):
    """
    Format elapsed time in a human-readable format
    """
    if seconds < 60:
        return f"{seconds:.3f} seconds"
    elif seconds < 3600:
        minutes, seconds = divmod(seconds, 60)
        return f"{int(minutes)} minutes {int(seconds)} seconds"
    else:
        hours, remainder = divmod(seconds, 3600)
        minutes, seconds = divmod(remainder, 60)
        if minutes == 0:
            return f"{int(hours)} hours {int(seconds)} seconds"
        else:
            return f"{int(hours)} hours {int(minutes)} minutes {int(seconds)} seconds"



def estimate_file_size(weather_station_names, num_rows_to_create):
    """
    Tries to estimate how large a file the test data will be
    """
    total_name_bytes = sum(len(s.encode("utf-8")) for s in weather_station_names)
    avg_name_bytes = total_name_bytes / float(len(weather_station_names))


    # avg_temp_bytes = sum(len(str(n / 10.0)) for n in range(-999, 1000)) / 1999
    avg_temp_bytes = 4.400200100050025


    # add 2 for separator and newline
    avg_line_length = avg_name_bytes + avg_temp_bytes + 2


    human_file_size = convert_bytes(num_rows_to_create * avg_line_length)


    return f"Estimated max file size is:  {human_file_size}."



def build_test_data(weather_station_names, num_rows_to_create):
    """
    Generates and writes to file the requested length of test data
    """
    start_time = time.time()
    coldest_temp = -99.9
    hottest_temp = 99.9
    station_names_10k_max = random.choices(weather_station_names, k=10_000)
    batch_size = 10000 # instead of writing line by line to file, process a batch of stations and put it to disk
    chunks = num_rows_to_create // batch_size
    print('Building test data...')


    try:
        with open("data/measurements.txt", 'w') as file:
            progress = 0
            for chunk in range(chunks):
                
                batch = random.choices(station_names_10k_max, k=batch_size)
                prepped_deviated_batch = '\n'.join([f"{station};{random.uniform(coldest_temp, hottest_temp):.1f}" for station in batch]) # :.1f should quicker than round on a large scale, because round utilizes mathematical operation
                file.write(prepped_deviated_batch + '\n')
                
                # Update progress bar every 1%
                if (chunk + 1) * 100 // chunks != progress:
                    progress = (chunk + 1) * 100 // chunks
                    bars = '=' * (progress // 2)
                    sys.stdout.write(f"\r[{bars:<50}] {progress}%")
                    sys.stdout.flush()
        sys.stdout.write('\n')
    except Exception as e:
        print("Something went wrong. Printing error info and exiting...")
        print(e)
        exit()
    
    end_time = time.time()
    elapsed_time = end_time - start_time
    file_size = os.path.getsize("data/measurements.txt")
    human_file_size = convert_bytes(file_size)
 
    print("Test data successfully written to 1brc/data/measurements.txt")
    print(f"Actual file size:  {human_file_size}")
    print(f"Elapsed time: {format_elapsed_time(elapsed_time)}")



def main():
    """
    main program function
    """
    check_args(sys.argv)
    num_rows_to_create = int(sys.argv[1])
    weather_station_names = []
    weather_station_names = build_weather_station_name_list()
    print(estimate_file_size(weather_station_names, num_rows_to_create))
    build_test_data(weather_station_names, num_rows_to_create)
    print("Test data build complete.")



if __name__ == "__main__":
    main()
exit()

3. コード作成

txtファイルの10億行のデータを読み取り、各地域の温度の最大最小平均を求めるコードを作成し、これを出力します。

私のコードは

import os
import threading
from collections import defaultdict
from time import time

FILE = "./data/measurements.txt"
CHUNK_SIZE = 1024 * 1024 * 32  
CORES = os.cpu_count()

class TemperatureAggregation:

    def __init__(self):
        self.sum = 0
        self.count = 0
        self.min_temp = None  
        self.max_temp = None  

    def merge(self, sum, count, min_temp, max_temp):
        self.sum += sum
        self.count += count
        
        if self.min_temp is None or min_temp < self.min_temp:
            self.min_temp = min_temp

        if self.max_temp is None or max_temp > self.max_temp:
            self.max_temp = max_temp

    def __str__(self):
        avg = self.sum / self.count if self.count else 0
        return f"{self.min_temp:.1f}/{avg:.1f}/{self.max_temp:.1f}"

    def process_chunk(chunk_start, chunk_end):
    local_results = defaultdict(TemperatureAggregation)
    
    with open(FILE, "r", encoding="utf-8", errors="ignore") as f:
        f.seek(chunk_start)
        data = f.read(chunk_end - chunk_start)
        lines = data.split("\n")
        
        for line in lines:
            if line.strip():
                try:
                    city, temp_str = line.split(";")
                    temp = float(temp_str)
                    if city not in local_results:
                        local_results[city] = TemperatureAggregation()

                    local_results[city].sum += temp
                    local_results[city].count += 1

                    

                    if local_results[city].min_temp is None or temp < local_results[city].min_temp:
                        local_results[city].min_temp = temp
                        
                    if local_results[city].max_temp is None or temp > local_results[city].max_temp:
                        local_results[city].max_temp = temp

                except (ValueError, IndexError):
                    continue

    return local_results



    def calculate_stats(filepath):
        file_size = os.path.getsize(filepath)
        chunk_size = min(file_size, CHUNK_SIZE)
        threads = []
        thread_results = []



    def worker(chunk_start, chunk_end):
        result = process_chunk(chunk_start, chunk_end)
        thread_results.append(result)



    for i in range(CORES):
        chunk_start = i * chunk_size
        chunk_end = min(chunk_start + chunk_size, file_size)
        t = threading.Thread(target=worker, args=(chunk_start, chunk_end))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()



    

    final_results = defaultdict(TemperatureAggregation)

    for local_results in thread_results:
        for city, agg in local_results.items():
            if city not in final_results:
                final_results[city] = TemperatureAggregation()

            final_results[city].merge(agg.sum, agg.count, agg.min_temp, agg.max_temp)


    return final_results



def print_stats(stats):
    for city, agg in sorted(stats.items()):
        print(f"{city}: {agg}")


if __name__ == "__main__":
    start_time = time()
    results = calculate_stats(FILE)
    print_stats(results)
    print(f"Execution time: {time() - start_time:.2f} seconds")

説明

txt ファイルを読み取り、それぞれ 32MB のチャンクにします。
チャンクを各スレッドで実行させた後、結果をマージします。

4. 結果

image.png

5. 後記.

他の大容量のデータ処理もこのような方式なのか気になりました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?