Lavorare con file JSON grandi: Streaming, performance, best practices
Guida completa per gestire file JSON di grandi dimensioni: streaming, chunk processing, memory optimization, strumenti e tecniche per big data JSON.
Big JSON Team
• Technical WriterExpert in JSON data manipulation, API development, and web technologies. Passionate about creating tools that make developers' lives easier.
# Lavorare con file JSON grandi
File JSON di centinaia di MB o GB richiedono tecniche speciali. Questa guida ti mostrerà come processare efficientemente JSON di grandi dimensioni senza esaurire la memoria.
Il problema
Approccio naive (❌)
// ❌ Carica TUTTO in memoria
const fs = require('fs');
const hugeData = JSON.parse(fs.readFileSync('huge.json', 'utf8'));
// CRASH! Out of memory per file > 500MB
Problemi:
- 💥 Out of memory errors
- 🐌 Parsing lento
- ⏳ Blocking operations
- 💾 Spreco risorse
Soluzioni
Streaming JSON
JSON Lines (JSONL)
Formato:{"id": 1, "name": "Marco", "age": 30}
{"id": 2, "name": "Laura", "age": 25}
{"id": 3, "name": "Giovanni", "age": 35}
Ogni riga = JSON object valido.
Vantaggi:- ✅ Processa riga per riga
- ✅ Memoria costante
- ✅ Facile append
- ✅ Parallelize facilmente
const fs = require('fs');
const readline = require('readline');
async function processLargeJSONL(filePath) {
const fileStream = fs.createReadStream(filePath);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
let count = 0;
for await (const line of rl) {
// Parse singola riga
const record = JSON.parse(line);
// Process record
await processRecord(record);
count++;
if (count % 10000 === 0) {
console.log(Processed ${count} records);
}
}
console.log(Total: ${count} records);
}
// Uso
processLargeJSONL('huge.jsonl');
Python streaming:
def process_jsonl(filename):
"""Process JSONL file riga per riga"""
with open(filename, 'r') as f:
for line_num, line in enumerate(f, 1):
# Parse single JSON object
record = json.loads(line)
# Process
process_record(record)
# Progress
if line_num % 10000 == 0:
print(f"Processed {line_num} records")
# Uso
process_jsonl('data.jsonl')
Streaming parser
Node.js - JSONStream:const fs = require('fs');
const JSONStream = require('JSONStream');
// Stream array di objects
fs.createReadStream('large.json')
.pipe(JSONStream.parse('items.')) // Path to array
.on('data', (item) => {
// Process singolo item
processItem(item);
})
.on('end', () => {
console.log('Completed');
});
Con transform stream:
const { Transform } = require('stream');
const processStream = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
// Transform data
const processed = {
...chunk,
processed: true,
timestamp: new Date()
};
callback(null, processed);
}
});
fs.createReadStream('input.json')
.pipe(JSONStream.parse(''))
.pipe(processStream)
.pipe(JSONStream.stringify())
.pipe(fs.createWriteStream('output.json'));
Python - ijson:
import ijson
def stream_large_json(filename):
"""Stream parse JSON grande"""
with open(filename, 'rb') as f:
# Iterate su array items
parser = ijson.items(f, 'items.item')
for item in parser:
# Process item senza caricare tutto
process_item(item)
# Con filtering
def stream_filtered(filename, min_value):
"""Stream con filtro"""
with open(filename, 'rb') as f:
parser = ijson.items(f, 'data.item')
for item in parser:
if item.get('value', 0) > min_value:
yield item
# Uso
for item in stream_filtered('large.json', 100):
print(item)
Chunk processing
Pandas chunking
import pandas as pd
# Read in chunks
chunk_size = 10000
chunks = []
for chunk in pd.read_json('large.json', lines=True, chunksize=chunk_size):
# Process chunk
filtered = chunk[chunk['value'] > 100]
aggregated = filtered.groupby('category').sum()
chunks.append(aggregated)
# Combine results
result = pd.concat(chunks)
final = result.groupby(level=0).sum()
Custom chunking
const fs = require('fs');
async function chunkJSON(filePath, chunkSize = 1000) {
const fileStream = fs.createReadStream(filePath);
const rl = readline.createInterface({ input: fileStream });
let chunk = [];
for await (const line of rl) {
const record = JSON.parse(line);
chunk.push(record);
if (chunk.length >= chunkSize) {
yield chunk;
chunk = [];
}
}
// Ultimo chunk
if (chunk.length > 0) {
yield chunk;
}
}
// Uso
for await (const chunk of chunkJSON('data.jsonl', 5000)) {
console.log(Processing ${chunk.length} records);
// Process chunk
const results = await processBatch(chunk);
await saveToDB(results);
}
Memory optimization
JavaScript
1. Usa streams invece di readFile:❌ Errato:
const data = JSON.parse(fs.readFileSync('huge.json'));
✅ Corretto:
const stream = fs.createReadStream('huge.json')
.pipe(JSONStream.parse(''));
2. Garbage collection hints:
async function processInBatches(items) {
for (let i = 0; i < items.length; i += 1000) {
const batch = items.slice(i, i + 1000);
await processBatch(batch);
// Hint GC dopo ogni batch
if (global.gc) {
global.gc();
}
}
}
3. WeakMap per caching:
const cache = new WeakMap();
function processWithCache(obj) {
if (cache.has(obj)) {
return cache.get(obj);
}
const result = expensiveOperation(obj);
cache.set(obj, result);
return result;
}
// WeakMap permette GC di oggetti non più usati
Python
1. Generators invece di list:❌ Errato:
def load_all():
with open('huge.json') as f:
return [json.loads(line) for line in f]
data = load_all() # Tutto in memoria!
✅ Corretto:
def stream_data():
"""Generator - memoria costante"""
with open('huge.json') as f:
for line in f:
yield json.loads(line)
for record in stream_data():
process(record) # Uno alla volta
2. Explicit cleanup:
import gc
def process_large_file(filename):
for chunk in read_chunks(filename):
process_chunk(chunk)
# Clear memory
del chunk
gc.collect()
3. Memory profiling:
from memory_profiler import profile
@profile
def process_data(filename):
# Memory usage mostrato per ogni riga
data = []
with open(filename) as f:
for line in f:
data.append(json.loads(line))
return data
# Run con: python -m memory_profiler script.py
Parallel processing
Node.js Worker Threads
const { Worker } = require('worker_threads');
const os = require('os');
async function processInParallel(filePath) {
const numWorkers = os.cpus().length;
const workers = [];
// Split file in chunks
const chunks = await splitFile(filePath, numWorkers);
// Create workers
for (const chunk of chunks) {
workers.push(new Promise((resolve, reject) => {
const worker = new Worker('./worker.js', {
workerData: { chunk }
});
worker.on('message', resolve);
worker.on('error', reject);
}));
}
// Wait all
const results = await Promise.all(workers);
return mergeResults(results);
}
// worker.js
const { workerData, parentPort } = require('worker_threads');
function processChunk(chunk) {
const results = [];
for (const record of chunk) {
// Process record
const result = process(record);
results.push(result);
}
return results;
}
parentPort.postMessage(processChunk(workerData.chunk));
Python multiprocessing
from multiprocessing import Pool
import json
def process_chunk(chunk_info):
"""Process un chunk del file"""
filename, start_line, end_line = chunk_info
results = []
with open(filename) as f:
# Skip a start
for _ in range(start_line):
next(f)
# Process chunk
for i, line in enumerate(f):
if i >= end_line - start_line:
break
record = json.loads(line)
result = process_record(record)
results.append(result)
return results
def parallel_process(filename, num_workers=4):
"""Process file in parallelo"""
# Count lines
with open(filename) as f:
total_lines = sum(1 for _ in f)
# Split in chunks
chunk_size = total_lines // num_workers
chunks = [
(filename, i chunk_size, (i + 1) chunk_size)
for i in range(num_workers)
]
# Process in parallel
with Pool(num_workers) as pool:
results = pool.map(process_chunk, chunks)
# Merge
return [item for sublist in results for item in sublist]
Alternative formats
Convertire a Parquet
Python:import pandas as pd
import json
# Read JSON in chunks
chunks = []
for chunk in pd.read_json('huge.json', lines=True, chunksize=100000):
chunks.append(chunk)
df = pd.concat(chunks, ignore_index=True)
# Write Parquet (molto più efficiente!)
df.to_parquet('data.parquet', compression='snappy')
# Read Parquet (10-100x più veloce)
df = pd.read_parquet('data.parquet')
Dimensioni confronto:
- JSON: 1.2 GB
- Parquet (snappy): 350 MB (3.4x più piccolo)
- Parquet (gzip): 280 MB (4.3x più piccolo)
- JSON read: 45 secondi
- Parquet read: 3 secondi (15x più veloce!)
Arrow IPC
import pyarrow as pa
import pyarrow.json as paj
# Read JSON → Arrow
table = paj.read_json('data.json')
# Write Arrow IPC (formato binario efficiente)
with pa.OSFile('data.arrow', 'wb') as f:
with pa.RecordBatchFileWriter(f, table.schema) as writer:
writer.write_table(table)
# Read Arrow (velocissimo!)
with pa.OSFile('data.arrow', 'rb') as f:
with pa.RecordBatchFileReader(f) as reader:
table = reader.read_all()
Database approach
SQLite
Per query ripetute:import sqlite3
import json
# Create DB
conn = sqlite3.connect('data.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE records (
id INTEGER PRIMARY KEY,
data JSON,
category TEXT,
value REAL
)
''')
# Import JSON
with open('huge.jsonl') as f:
for line in f:
record = json.loads(line)
cursor.execute(
'INSERT INTO records (data, category, value) VALUES (?, ?, ?)',
(json.dumps(record), record['category'], record['value'])
)
conn.commit()
# Query veloce!
cursor.execute('SELECT * FROM records WHERE value > 100')
results = cursor.fetchall()
MongoDB
Per JSON complessi:from pymongo import MongoClient
import json
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']
collection = db['records']
# Bulk insert
def bulk_import(filename, batch_size=10000):
batch = []
with open(filename) as f:
for line in f:
record = json.loads(line)
batch.append(record)
if len(batch) >= batch_size:
collection.insert_many(batch)
batch = []
if batch:
collection.insert_many(batch)
# Import
bulk_import('huge.jsonl')
# Query con index
collection.create_index('category')
results = collection.find({'category': 'electronics', 'value': {'$gt': 100}})
Tools esterni
jq (command line)
Filtra file grandi:# Filtra e compatta
jq -c '.[] | select(.value > 100)' huge.json > filtered.jsonl
# Estrai campi
jq -c '{id, name, value}' input.json > output.json
# Streaming (--stream per file ENORMI)
jq --stream -c 'select(length == 2 and .[0][0] == "items")' huge.json
Miller (mlr)
Swiss army knife per dati:# JSON → CSV
mlr --j2c cat huge.json > output.csv
# Filter
mlr --json filter '$value > 100' huge.json
# Stats
mlr --json stats1 -a mean,count -f value -g category huge.json
Best practices
1. Scegli formato giusto
| Scenario | Formato | Motivo |
|----------|---------|--------|
| Streaming | JSONL | Riga per riga |
| Analytics | Parquet | Compresso, columnar |
| Real-time | Arrow | Zero-copy, veloce |
| General | JSON | Compatibilità |
2. Profila prima di ottimizzare
import cProfile
import pstats
# Profile code
profiler = cProfile.Profile()
profiler.enable()
process_large_file('data.json')
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20) # Top 20 funzioni
3. Monitora memoria
// Node.js memory usage
setInterval(() => {
const usage = process.memoryUsage();
console.log(Memory: ${Math.round(usage.heapUsed / 1024 / 1024)}MB);
}, 1000);
4. Usa compressione
# Comprimi con gzip
gzip huge.json
# huge.json.gz (5-10x più piccolo)
# Stream decompressione + processing
zcat huge.json.gz | jq -c 'select(.value > 100)'
import gzip
import json
# Read gzipped JSON
with gzip.open('data.json.gz', 'rt') as f:
for line in f:
record = json.loads(line)
process(record)
Conclusione
Strategie per JSON grandi: < 100MB: Standard JSON.parse OK 100MB - 1GB: Streaming o chunking > 1GB: JSON Lines + streaming > 10GB: Database o Parquet Workflow consigliato:- jq - CLI filtering
- ijson/JSONStream - Streaming parsing
- Pandas - Chunked reading
- Parquet - Efficient storage
Non usare JSON per tutto! Per big data, considera formati ottimizzati come Parquet o Arrow.
Articoli Correlati
Python e JSON: Guida completa alla manipolazione dati
Impara a lavorare con JSON in Python: parsing, serializzazione, validazione, file I/O e best practices. Include esempi pratici con json, jsonschema e pydantic.
JavaScript e JSON: Guida completa a JSON.parse() e JSON.stringify()
Guida completa su JSON in JavaScript: parsing, serializzazione, gestione errori, localStorage, fetch API e best practices. Include esempi pratici e troubleshooting.
JSON nella scienza dei dati: Analisi e processing con Python
Guida completa all'uso di JSON in data science: caricamento dati, analisi con Pandas, machine learning, visualizzazione, big data e best practices.