大きなJSONファイルの処理方法
大規模JSONファイルの効率的な処理。ストリーミング、メモリ最適化、パフォーマンステクニック。
Big JSON Team
• Technical WriterExpert in JSON data manipulation, API development, and web technologies. Passionate about creating tools that make developers' lives easier.
# 大きなJSONファイルの処理方法
大規模なJSONファイルを効率的に処理する技術とツールを学びます。
問題点
通常の読み込みの問題
// ❌ 大きなファイルでメモリ不足
const data = JSON.parse(fs.readFileSync('huge.json', 'utf8'));
// Error: JavaScript heap out of memory
メモリ使用量
例:- JSONファイル: 500MB
- メモリ使用: 2-3GB (パース後)
- 問題: メモリ不足、遅いパース
ストリーミング処理
Node.js - JSONStream
インストール
npm install JSONStream
基本的な使い方
const fs = require('fs');
const JSONStream = require('JSONStream');
// 配列の各要素をストリーム処理
fs.createReadStream('large.json')
.pipe(JSONStream.parse('items.'))
.on('data', (item) => {
// 各アイテムを処理
console.log(item);
})
.on('end', () => {
console.log('完了');
});
特定のパスのみ
// users配列のみ処理
fs.createReadStream('data.json')
.pipe(JSONStream.parse('users.'))
.on('data', (user) => {
processUser(user);
});
フィルタリング
const through = require('through2');
fs.createReadStream('large.json')
.pipe(JSONStream.parse('items.'))
.pipe(through.obj((item, enc, callback) => {
// 条件でフィルタ
if (item.price > 1000) {
callback(null, item);
} else {
callback();
}
}))
.pipe(JSONStream.stringify())
.pipe(fs.createWriteStream('filtered.json'));
stream-json(高速代替)
const { parser } = require('stream-json');
const { streamArray } = require('stream-json/streamers/StreamArray');
const pipeline = fs.createReadStream('large.json')
.pipe(parser())
.pipe(streamArray());
pipeline.on('data', ({ key, value }) => {
console.log(Item ${key}:, value);
});
Python でのストリーミング
ijson ライブラリ
インストール
pip install ijson
基本的な使い方
import ijson
# ファイルをストリーミング
with open('large.json', 'rb') as f:
# 'items'配列の各要素
items = ijson.items(f, 'items.item')
for item in items:
# 各アイテムを処理
print(item)
特定のフィールドのみ
import ijson
with open('large.json', 'rb') as f:
# すべてのname フィールド
for name in ijson.items(f, 'users.item.name'):
print(name)
フィルタリングと変換
import ijson
def process_large_file(input_file, output_file):
with open(input_file, 'rb') as inf:
with open(output_file, 'w') as outf:
items = ijson.items(inf, 'items.item')
filtered = [
item for item in items
if item.get('active', False)
]
json.dump(filtered, outf, indent=2)
JSON Lines (.jsonl)
フォーマット
各行が1つの独立したJSONオブジェクト:
{"id": 1, "name": "太郎", "age": 30}
{"id": 2, "name": "花子", "age": 25}
{"id": 3, "name": "次郎", "age": 35}
利点
- ✅ ストリーミング処理が容易
- ✅ 部分的な読み取りが簡単
- ✅ 追記が可能
- ✅ エラーに強い(1行が壊れても他は読める)
処理(Python)
import json
# 読み込み
with open('data.jsonl', 'r') as f:
for line in f:
item = json.loads(line)
process(item)
# 書き込み
with open('output.jsonl', 'w') as f:
for item in items:
f.write(json.dumps(item) + '\n')
処理(Node.js)
const fs = require('fs');
const readline = require('readline');
const rl = readline.createInterface({
input: fs.createReadStream('data.jsonl')
});
rl.on('line', (line) => {
const item = JSON.parse(line);
console.log(item);
});
Pandas での処理
import pandas as pd
# JSON Linesを読み込み
df = pd.read_json('data.jsonl', lines=True)
# チャンクで読み込み(大きなファイル)
for chunk in pd.read_json('large.jsonl', lines=True, chunksize=1000):
process(chunk)
チャンク処理
Node.js - カスタムチャンク
const fs = require('fs');
function processInChunks(filePath, chunkSize = 1000) {
return new Promise((resolve, reject) => {
const stream = fs.createReadStream(filePath);
let buffer = '';
let items = [];
stream.on('data', (chunk) => {
buffer += chunk.toString();
// 改行で分割
const lines = buffer.split('\n');
buffer = lines.pop(); // 最後の不完全な行を保持
lines.forEach(line => {
if (line.trim()) {
items.push(JSON.parse(line));
// チャンクサイズに達したら処理
if (items.length >= chunkSize) {
processChunk(items);
items = [];
}
}
});
});
stream.on('end', () => {
if (items.length > 0) {
processChunk(items);
}
resolve();
});
stream.on('error', reject);
});
}
function processChunk(items) {
console.log(Processing ${items.length} items);
// チャンクを処理
}
Python - ジェネレーター
import json
def read_json_chunks(file_path, chunk_size=1000):
"""JSONLファイルをチャンクで読み込み"""
chunk = []
with open(file_path, 'r') as f:
for line in f:
if line.strip():
chunk.append(json.loads(line))
if len(chunk) >= chunk_size:
yield chunk
chunk = []
# 残りのデータ
if chunk:
yield chunk
# 使用
for chunk in read_json_chunks('large.jsonl', chunk_size=1000):
# チャンクを処理
process_chunk(chunk)
メモリマップドファイル
Python - mmap
import mmap
import json
def search_in_large_json(file_path, search_term):
"""大きなJSONファイルから検索"""
with open(file_path, 'r+b') as f:
# メモリマップ
mmapped = mmap.mmap(f.fileno(), 0)
# 検索
position = mmapped.find(search_term.encode())
if position != -1:
# 見つかった位置周辺を読む
start = max(0, position - 100)
end = min(len(mmapped), position + 100)
context = mmapped[start:end].decode('utf-8')
print(context)
mmapped.close()
並列処理
Python - multiprocessing
import json
import multiprocessing as mp
from functools import partial
def process_item(item):
"""各アイテムの処理"""
# 重い処理
return item['value'] 2
def process_large_json_parallel(file_path, num_workers=4):
# ファイルを読み込み
with open(file_path, 'r') as f:
data = json.load(f)
# プールで並列処理
with mp.Pool(processes=num_workers) as pool:
results = pool.map(process_item, data['items'])
return results
# 使用
results = process_large_json_parallel('large.json', num_workers=8)
Node.js - Worker Threads
const { Worker } = require('worker_threads');
const fs = require('fs');
function processInParallel(items, numWorkers = 4) {
return new Promise((resolve) => {
const chunkSize = Math.ceil(items.length / numWorkers);
const workers = [];
const results = [];
for (let i = 0; i < numWorkers; i++) {
const start = i chunkSize;
const end = Math.min(start + chunkSize, items.length);
const chunk = items.slice(start, end);
const worker = new Worker('./worker.js', {
workerData: chunk
});
worker.on('message', (result) => {
results.push(...result);
if (results.length === items.length) {
resolve(results);
}
});
workers.push(worker);
}
});
}
データベース統合
SQLiteへのインポート
import json
import sqlite3
def json_to_sqlite(json_file, db_file, table_name):
# データベース接続
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
# テーブル作成(動的)
# JSON Lines を1行ずつ読んでインポート
with open(json_file, 'r') as f:
first_line = json.loads(f.readline())
columns = list(first_line.keys())
# テーブル作成
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {table_name} (
{', '.join([f'{col} TEXT' for col in columns])}
)
""")
# データ挿入
f.seek(0)
for line in f:
item = json.loads(line)
values = [item.get(col) for col in columns]
placeholders = ','.join(['?' for _ in columns])
cursor.execute(
f"INSERT INTO {table_name} VALUES ({placeholders})",
values
)
conn.commit()
conn.close()
# 使用
json_to_sqlite('large.jsonl', 'data.db', 'items')
# クエリで取得
conn = sqlite3.connect('data.db')
cursor = conn.cursor()
cursor.execute("SELECT FROM items WHERE price > 1000 LIMIT 10")
results = cursor.fetchall()
圧縮の活用
gzip圧縮されたJSON
Python:import gzip
import json
# 圧縮ファイルを読む
with gzip.open('data.json.gz', 'rt', encoding='utf-8') as f:
data = json.load(f)
# 圧縮して書く
with gzip.open('output.json.gz', 'wt', encoding='utf-8') as f:
json.dump(data, f)
# ストリーミング + 圧縮
import ijson
with gzip.open('large.json.gz', 'rb') as f:
items = ijson.items(f, 'items.item')
for item in items:
process(item)
Node.js:
const zlib = require('zlib');
const fs = require('fs');
// 読み込み
fs.createReadStream('data.json.gz')
.pipe(zlib.createGunzip())
.pipe(JSONStream.parse('*'))
.on('data', (data) => {
console.log(data);
});
// 書き込み
fs.createReadStream('input.json')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('output.json.gz'));
最適化テクニック
1. 高速JSONライブラリ
Python - ujson
pip install ujson
import ujson
# 標準jsonより高速
data = ujson.load(open('data.json'))
ujson.dump(data, open('output.json', 'w'))
Python - orjson(最速)
pip install orjson
import orjson
# 読み込み
with open('data.json', 'rb') as f:
data = orjson.loads(f.read())
# 書き込み
with open('output.json', 'wb') as f:
f.write(orjson.dumps(data))
ベンチマーク:
- json: 1.0x
- ujson: 2-3x 速い
- orjson: 3-5x 速い
2. インデックス作成
import json
def create_index(json_file, key_field):
"""大きなJSONファイルのインデックス作成"""
index = {}
position = 0
with open(json_file, 'r') as f:
for line in f:
item = json.loads(line)
index[item[key_field]] = position
position = f.tell()
# インデックス保存
with open(f'{json_file}.index', 'w') as f:
json.dump(index, f)
return index
def lookup_by_key(json_file, index, key):
"""インデックスを使って高速検索"""
position = index.get(key)
if position is not None:
with open(json_file, 'r') as f:
f.seek(position)
return json.loads(f.readline())
return None
3. 部分的な読み込み
def read_partial_json(file_path, start_line, num_lines):
"""JSONLファイルの一部のみ読む"""
items = []
with open(file_path, 'r') as f:
# 開始位置まで移動
for _ in range(start_line):
f.readline()
# 必要な行数だけ読む
for _ in range(num_lines):
line = f.readline()
if not line:
break
items.append(json.loads(line))
return items
# 使用(ページネーション)
page_1 = read_partial_json('data.jsonl', 0, 100)
page_2 = read_partial_json('data.jsonl', 100, 100)
ツール
jq(コマンドライン)
# 大きなファイルからフィルタ
jq '.items[] | select(.price > 1000)' large.json
# ストリーミングモード
jq -cn --stream 'fromstream(1|truncate_stream([[0]]))' large.json
# 最初の10件のみ
jq '.items[:10]' large.json
# 特定のフィールドのみ
jq '.items[] | {name, price}' large.json > filtered.json
BigJSON.online
- ブラウザで大きなファイルを開く
- ストリーミングビュー
- メモリ効率的
- 検索とフィルタ
ベストプラクティス
1. 適切な形式を選択
- 大きな配列 → JSON Lines
- ネストされた構造 → ストリーミング
- 頻繁なアクセス → データベース
2. 増分処理
def process_incrementally(file_path, batch_size=1000):
"""バッチで処理して結果を保存"""
results = []
for chunk in read_json_chunks(file_path, batch_size):
processed = process_batch(chunk)
results.extend(processed)
# 定期的に保存
if len(results) >= 10000:
save_results(results)
results = []
# 残りを保存
if results:
save_results(results)
3. メモリ監視
import psutil
import os
def monitor_memory():
"""メモリ使用量を監視"""
process = psutil.Process(os.getpid())
mem = process.memory_info().rss / 1024 / 1024 # MB
print(f'Memory usage: {mem:.2f} MB')
# 処理中に定期的に呼び出し
まとめ
サイズ別推奨
| ファイルサイズ | 方法 |
|--------------|------|
| < 10MB | 通常のJSON.parse |
| 10MB - 100MB | チャンク処理 |
| 100MB - 1GB | ストリーミング/JSON Lines |
| > 1GB | データベース/専用ツール |
選択フローチャート
- YES → ストリーミング + 処理
- NO → 部分読み込み
- YES → データベースへインポート
- NO → ストリーミング処理
- YES → JSON Lines + 増分処理
- NO → バッチ処理
大きなJSONファイルも、適切な技術で効率的に処理できます!