大規模CSVデータを効率的にElasticsearchにロードする

Elasticsearchは大規模データの高速検索に適した検索エンジンだが、大量のデータを投入する際には効率的な方法を選ぶ必要がある。本稿では、Pythonを用いて大規模なCSVファイルからElasticsearchにデータを効率的にロードする方法について解説する。

問題点

Elasticsearchにデータを投入する際、1度に1ドキュメントずつ処理を行うと、ネットワークオーバーヘッドが大きくなり、処理速度が低下する。特に、大規模なCSVファイルを扱う場合、この問題は顕著になる。

解決策

上記の課題を解決するために、以下の3つの技術を組み合わせる。

  1. バルクAPI: ElasticsearchのバルクAPIを利用することで、複数のドキュメントをまとめてインサートし、ネットワークオーバーヘッドを削減する。
  2. ジェネレータ: CSVファイルを読み込む際に、Pythonのジェネレータ機能を利用する。これにより、メモリ使用量を抑えながら巨大なCSVファイルにも対応可能になる。
  3. 並列処理 (オプション): Elasticsearchへのバルクインサート処理を並列化することで、処理速度を向上させることができる。

実装例

以下に、上記技術を用いたPythonコードの例を示す。

import csv
from elasticsearch import Elasticsearch, helpers
import os
import argparse
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime

def create_es_client(host, port):
    """Elasticsearchクライアントを作成する"""
    return Elasticsearch([f"http://{host}:{port}"])

def generate_actions(csv_file, index_name, id_field, has_header=True):
    """CSVファイルを読み込み、Elasticsearchに投入するためのアクションを生成するジェネレータ"""
    with open(csv_file, "r", encoding="utf-8") as f:
        if has_header:
            reader = csv.DictReader(f)
        else:
            # ヘッダーがない場合は、先頭行からカラム名を推測する
            first_line = f.readline().strip().split(',')
            f.seek(0)  # ファイルポインタを先頭に戻す
            reader = csv.DictReader(f, fieldnames=first_line)

        for row in reader:
            try:
                if not row.get(id_field):
                    print(f"Warning: ID field '{id_field}' missing in row: {row}, skipping")
                    continue
                doc_id = row.pop(id_field)
                yield {
                    "_index": index_name,
                    "_id": doc_id,
                    "_source": row
                }
            except Exception as e:
                print(f"Error processing row {row}: {e}")

def upload_csv_to_es(es_client, csv_file, index_name, id_field, has_header=True, chunk_size=500, thread_num=1):
    """CSVファイルをElasticsearchにアップロードする"""

    actions = generate_actions(csv_file, index_name, id_field, has_header)

    if thread_num > 1:
        with ThreadPoolExecutor(max_workers=thread_num) as executor:
            for success, result in helpers.parallel_bulk(es_client, actions, chunk_size=chunk_size, thread_count=thread_num, raise_on_error=False):
                if not success:
                    print(f"Error: Bulk insert failed: {result}")
    else:
        for success, result in helpers.bulk(es_client, actions, chunk_size=chunk_size, raise_on_error=False):
            if not success:
                print(f"Error: Bulk insert failed: {result}")


def parse_args():
    """コマンドライン引数を解析する"""
    parser = argparse.ArgumentParser(description="CSVファイルをElasticsearchにアップロードする")
    parser.add_argument("csv_file", help="アップロードするCSVファイルのパス")
    parser.add_argument("--host", default="localhost", help="Elasticsearchホスト (デフォルト: localhost)")
    parser.add_argument("--port", type=int, default=9200, help="Elasticsearchポート (デフォルト: 9200)")
    parser.add_argument("--index", required=True, help="Elasticsearchインデックス名")
    parser.add_argument("--id_field", default="id", help="ドキュメントIDとして使用するCSVカラム名 (デフォルト: 'id')")
    parser.add_argument("--no_header", action="store_true", help="CSVファイルにヘッダーがない場合はこのオプションを指定")
    parser.add_argument("--chunk_size", type=int, default=500, help="バルクインサート時のチャンクサイズ (デフォルト: 500)")
    parser.add_argument("--thread_num", type=int, default=1, help="並列実行スレッド数 (デフォルト: 1)")

    return parser.parse_args()

if __name__ == "__main__":
    args = parse_args()

    start_time = datetime.now()

    if not os.path.exists(args.csv_file):
        print(f"Error: CSV file not found: {args.csv_file}")
        exit(1)

    es = create_es_client(args.host, args.port)
    try:
        upload_csv_to_es(es, args.csv_file, args.index, args.id_field, not args.no_header, args.chunk_size, args.thread_num)
    except Exception as e:
        print(f"Error during upload: {e}")
        exit(1)

    end_time = datetime.now()
    print("CSV upload completed.")
    print(f"Started at: {start_time}")
    print(f"Finished at: {end_time}")
    print(f"Elapsed time: {end_time - start_time}")

コード解説

  1. create_es_client(host, port): Elasticsearchクライアントを作成する関数。
  2. generate_actions(csv_file, index_name, id_field, has_header=True): CSVファイルを読み込み、Elasticsearchに投入するためのアクションを生成するジェネレータ関数。各行を辞書に変換し、_index, _id, _source を設定する。
  3. upload_csv_to_es(es_client, csv_file, index_name, id_field, has_header=True, chunk_size=500, thread_num=1): CSVファイルをElasticsearchにアップロードする関数。generate_actions関数で生成されたアクションをhelpers.bulkまたはhelpers.parallel_bulk関数に渡すことで、バルクインサートを実行する。
  4. parse_args(): コマンドライン引数を解析する関数。
  5. if __name__ == "__main__":: スクリプトが直接実行された場合に実行される部分。コマンドライン引数を解析し、upload_csv_to_es関数を呼び出す。

コマンドライン実行

python csv_to_es.py example.csv --host localhost --port 9200 --index test_index --id_field product_id

まとめ

本稿では、Pythonを用いて大規模なCSVファイルからElasticsearchにデータを効率的にロードする方法について解説した。バルクAPI、ジェネレータ、並列処理を活用することで、高速かつメモリ効率の高いデータロードが可能になる。

Previous Article

法人情報検索API

Next Article

目標達成型日報サンプル

Write a Comment

Leave a Comment

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

Subscribe to our Newsletter

Subscribe to our email newsletter to get the latest posts delivered right to your email.
Pure inspiration, zero spam ✨