← ブログに戻る

大きなJSONファイルの処理方法

大規模JSONファイルの効率的な処理。ストリーミング、メモリ最適化、パフォーマンステクニック。

Big JSON Team14 分で読めますadvanced
B

Big JSON Team

Technical Writer

Expert in JSON data manipulation, API development, and web technologies. Passionate about creating tools that make developers' lives easier.

14 分読む

# 大きなJSONファイルの処理方法

大規模なJSONファイルを効率的に処理する技術とツールを学びます。

問題点

通常の読み込みの問題

// ❌ 大きなファイルでメモリ不足

const data = JSON.parse(fs.readFileSync('huge.json', 'utf8'));

// Error: JavaScript heap out of memory

メモリ使用量

例:
  • JSONファイル: 500MB
  • メモリ使用: 2-3GB (パース後)
  • 問題: メモリ不足、遅いパース

ストリーミング処理

Node.js - JSONStream

インストール

npm install JSONStream

基本的な使い方

const fs = require('fs');

const JSONStream = require('JSONStream');

// 配列の各要素をストリーム処理

fs.createReadStream('large.json')

.pipe(JSONStream.parse('items.'))

.on('data', (item) => {

// 各アイテムを処理

console.log(item);

})

.on('end', () => {

console.log('完了');

});

特定のパスのみ

// users配列のみ処理

fs.createReadStream('data.json')

.pipe(JSONStream.parse('users.'))

.on('data', (user) => {

processUser(user);

});

フィルタリング

const through = require('through2');

fs.createReadStream('large.json')

.pipe(JSONStream.parse('items.'))

.pipe(through.obj((item, enc, callback) => {

// 条件でフィルタ

if (item.price > 1000) {

callback(null, item);

} else {

callback();

}

}))

.pipe(JSONStream.stringify())

.pipe(fs.createWriteStream('filtered.json'));

stream-json(高速代替)

const { parser } = require('stream-json');

const { streamArray } = require('stream-json/streamers/StreamArray');

const pipeline = fs.createReadStream('large.json')

.pipe(parser())

.pipe(streamArray());

pipeline.on('data', ({ key, value }) => {

console.log(Item ${key}:, value);

});

Python でのストリーミング

ijson ライブラリ

インストール

pip install ijson

基本的な使い方

import ijson

# ファイルをストリーミング

with open('large.json', 'rb') as f:

# 'items'配列の各要素

items = ijson.items(f, 'items.item')

for item in items:

# 各アイテムを処理

print(item)

特定のフィールドのみ

import ijson

with open('large.json', 'rb') as f:

# すべてのname フィールド

for name in ijson.items(f, 'users.item.name'):

print(name)

フィルタリングと変換

import ijson

def process_large_file(input_file, output_file):

with open(input_file, 'rb') as inf:

with open(output_file, 'w') as outf:

items = ijson.items(inf, 'items.item')

filtered = [

item for item in items

if item.get('active', False)

]

json.dump(filtered, outf, indent=2)

JSON Lines (.jsonl)

フォーマット

各行が1つの独立したJSONオブジェクト:

{"id": 1, "name": "太郎", "age": 30}

{"id": 2, "name": "花子", "age": 25}

{"id": 3, "name": "次郎", "age": 35}

利点

  • ✅ ストリーミング処理が容易
  • ✅ 部分的な読み取りが簡単
  • ✅ 追記が可能
  • ✅ エラーに強い(1行が壊れても他は読める)

処理(Python)

import json

# 読み込み

with open('data.jsonl', 'r') as f:

for line in f:

item = json.loads(line)

process(item)

# 書き込み

with open('output.jsonl', 'w') as f:

for item in items:

f.write(json.dumps(item) + '\n')

処理(Node.js)

const fs = require('fs');

const readline = require('readline');

const rl = readline.createInterface({

input: fs.createReadStream('data.jsonl')

});

rl.on('line', (line) => {

const item = JSON.parse(line);

console.log(item);

});

Pandas での処理

import pandas as pd

# JSON Linesを読み込み

df = pd.read_json('data.jsonl', lines=True)

# チャンクで読み込み(大きなファイル)

for chunk in pd.read_json('large.jsonl', lines=True, chunksize=1000):

process(chunk)

チャンク処理

Node.js - カスタムチャンク

const fs = require('fs');

function processInChunks(filePath, chunkSize = 1000) {

return new Promise((resolve, reject) => {

const stream = fs.createReadStream(filePath);

let buffer = '';

let items = [];

stream.on('data', (chunk) => {

buffer += chunk.toString();

// 改行で分割

const lines = buffer.split('\n');

buffer = lines.pop(); // 最後の不完全な行を保持

lines.forEach(line => {

if (line.trim()) {

items.push(JSON.parse(line));

// チャンクサイズに達したら処理

if (items.length >= chunkSize) {

processChunk(items);

items = [];

}

}

});

});

stream.on('end', () => {

if (items.length > 0) {

processChunk(items);

}

resolve();

});

stream.on('error', reject);

});

}

function processChunk(items) {

console.log(Processing ${items.length} items);

// チャンクを処理

}

Python - ジェネレーター

import json

def read_json_chunks(file_path, chunk_size=1000):

"""JSONLファイルをチャンクで読み込み"""

chunk = []

with open(file_path, 'r') as f:

for line in f:

if line.strip():

chunk.append(json.loads(line))

if len(chunk) >= chunk_size:

yield chunk

chunk = []

# 残りのデータ

if chunk:

yield chunk

# 使用

for chunk in read_json_chunks('large.jsonl', chunk_size=1000):

# チャンクを処理

process_chunk(chunk)

メモリマップドファイル

Python - mmap

import mmap

import json

def search_in_large_json(file_path, search_term):

"""大きなJSONファイルから検索"""

with open(file_path, 'r+b') as f:

# メモリマップ

mmapped = mmap.mmap(f.fileno(), 0)

# 検索

position = mmapped.find(search_term.encode())

if position != -1:

# 見つかった位置周辺を読む

start = max(0, position - 100)

end = min(len(mmapped), position + 100)

context = mmapped[start:end].decode('utf-8')

print(context)

mmapped.close()

並列処理

Python - multiprocessing

import json

import multiprocessing as mp

from functools import partial

def process_item(item):

"""各アイテムの処理"""

# 重い処理

return item['value'] 2

def process_large_json_parallel(file_path, num_workers=4):

# ファイルを読み込み

with open(file_path, 'r') as f:

data = json.load(f)

# プールで並列処理

with mp.Pool(processes=num_workers) as pool:

results = pool.map(process_item, data['items'])

return results

# 使用

results = process_large_json_parallel('large.json', num_workers=8)

Node.js - Worker Threads

const { Worker } = require('worker_threads');

const fs = require('fs');

function processInParallel(items, numWorkers = 4) {

return new Promise((resolve) => {

const chunkSize = Math.ceil(items.length / numWorkers);

const workers = [];

const results = [];

for (let i = 0; i < numWorkers; i++) {

const start = i chunkSize;

const end = Math.min(start + chunkSize, items.length);

const chunk = items.slice(start, end);

const worker = new Worker('./worker.js', {

workerData: chunk

});

worker.on('message', (result) => {

results.push(...result);

if (results.length === items.length) {

resolve(results);

}

});

workers.push(worker);

}

});

}

データベース統合

SQLiteへのインポート

import json

import sqlite3

def json_to_sqlite(json_file, db_file, table_name):

# データベース接続

conn = sqlite3.connect(db_file)

cursor = conn.cursor()

# テーブル作成(動的)

# JSON Lines を1行ずつ読んでインポート

with open(json_file, 'r') as f:

first_line = json.loads(f.readline())

columns = list(first_line.keys())

# テーブル作成

cursor.execute(f"""

CREATE TABLE IF NOT EXISTS {table_name} (

{', '.join([f'{col} TEXT' for col in columns])}

)

""")

# データ挿入

f.seek(0)

for line in f:

item = json.loads(line)

values = [item.get(col) for col in columns]

placeholders = ','.join(['?' for _ in columns])

cursor.execute(

f"INSERT INTO {table_name} VALUES ({placeholders})",

values

)

conn.commit()

conn.close()

# 使用

json_to_sqlite('large.jsonl', 'data.db', 'items')

# クエリで取得

conn = sqlite3.connect('data.db')

cursor = conn.cursor()

cursor.execute("SELECT FROM items WHERE price > 1000 LIMIT 10")

results = cursor.fetchall()

圧縮の活用

gzip圧縮されたJSON

Python:
import gzip

import json

# 圧縮ファイルを読む

with gzip.open('data.json.gz', 'rt', encoding='utf-8') as f:

data = json.load(f)

# 圧縮して書く

with gzip.open('output.json.gz', 'wt', encoding='utf-8') as f:

json.dump(data, f)

# ストリーミング + 圧縮

import ijson

with gzip.open('large.json.gz', 'rb') as f:

items = ijson.items(f, 'items.item')

for item in items:

process(item)

Node.js:
const zlib = require('zlib');

const fs = require('fs');

// 読み込み

fs.createReadStream('data.json.gz')

.pipe(zlib.createGunzip())

.pipe(JSONStream.parse('*'))

.on('data', (data) => {

console.log(data);

});

// 書き込み

fs.createReadStream('input.json')

.pipe(zlib.createGzip())

.pipe(fs.createWriteStream('output.json.gz'));

最適化テクニック

1. 高速JSONライブラリ

Python - ujson

pip install ujson
import ujson

# 標準jsonより高速

data = ujson.load(open('data.json'))

ujson.dump(data, open('output.json', 'w'))

Python - orjson(最速)

pip install orjson
import orjson

# 読み込み

with open('data.json', 'rb') as f:

data = orjson.loads(f.read())

# 書き込み

with open('output.json', 'wb') as f:

f.write(orjson.dumps(data))

ベンチマーク:
  • json: 1.0x
  • ujson: 2-3x 速い
  • orjson: 3-5x 速い

2. インデックス作成

import json

def create_index(json_file, key_field):

"""大きなJSONファイルのインデックス作成"""

index = {}

position = 0

with open(json_file, 'r') as f:

for line in f:

item = json.loads(line)

index[item[key_field]] = position

position = f.tell()

# インデックス保存

with open(f'{json_file}.index', 'w') as f:

json.dump(index, f)

return index

def lookup_by_key(json_file, index, key):

"""インデックスを使って高速検索"""

position = index.get(key)

if position is not None:

with open(json_file, 'r') as f:

f.seek(position)

return json.loads(f.readline())

return None

3. 部分的な読み込み

def read_partial_json(file_path, start_line, num_lines):

"""JSONLファイルの一部のみ読む"""

items = []

with open(file_path, 'r') as f:

# 開始位置まで移動

for _ in range(start_line):

f.readline()

# 必要な行数だけ読む

for _ in range(num_lines):

line = f.readline()

if not line:

break

items.append(json.loads(line))

return items

# 使用(ページネーション)

page_1 = read_partial_json('data.jsonl', 0, 100)

page_2 = read_partial_json('data.jsonl', 100, 100)

ツール

jq(コマンドライン)

# 大きなファイルからフィルタ

jq '.items[] | select(.price > 1000)' large.json

# ストリーミングモード

jq -cn --stream 'fromstream(1|truncate_stream([[0]]))' large.json

# 最初の10件のみ

jq '.items[:10]' large.json

# 特定のフィールドのみ

jq '.items[] | {name, price}' large.json > filtered.json

BigJSON.online

  • ブラウザで大きなファイルを開く
  • ストリーミングビュー
  • メモリ効率的
  • 検索とフィルタ

ベストプラクティス

1. 適切な形式を選択

  • 大きな配列 → JSON Lines
  • ネストされた構造 → ストリーミング
  • 頻繁なアクセス → データベース

2. 増分処理

def process_incrementally(file_path, batch_size=1000):

"""バッチで処理して結果を保存"""

results = []

for chunk in read_json_chunks(file_path, batch_size):

processed = process_batch(chunk)

results.extend(processed)

# 定期的に保存

if len(results) >= 10000:

save_results(results)

results = []

# 残りを保存

if results:

save_results(results)

3. メモリ監視

import psutil

import os

def monitor_memory():

"""メモリ使用量を監視"""

process = psutil.Process(os.getpid())

mem = process.memory_info().rss / 1024 / 1024 # MB

print(f'Memory usage: {mem:.2f} MB')

# 処理中に定期的に呼び出し

まとめ

サイズ別推奨

| ファイルサイズ | 方法 |

|--------------|------|

| < 10MB | 通常のJSON.parse |

| 10MB - 100MB | チャンク処理 |

| 100MB - 1GB | ストリーミング/JSON Lines |

| > 1GB | データベース/専用ツール |

選択フローチャート

  • ファイル全体が必要?
  • - YES → ストリーミング + 処理

    - NO → 部分読み込み

  • 頻繁なクエリ?
  • - YES → データベースへインポート

    - NO → ストリーミング処理

  • リアルタイム処理?
  • - YES → JSON Lines + 増分処理

    - NO → バッチ処理

    大きなJSONファイルも、適切な技術で効率的に処理できます!

    Share:

    関連記事

    Read in English