← Torna al Blog

Lavorare con file JSON grandi: Streaming, performance, best practices

Guida completa per gestire file JSON di grandi dimensioni: streaming, chunk processing, memory optimization, strumenti e tecniche per big data JSON.

Big JSON Team15 min di letturaadvanced
B

Big JSON Team

Technical Writer

Expert in JSON data manipulation, API development, and web technologies. Passionate about creating tools that make developers' lives easier.

15 min di lettura

# Lavorare con file JSON grandi

File JSON di centinaia di MB o GB richiedono tecniche speciali. Questa guida ti mostrerà come processare efficientemente JSON di grandi dimensioni senza esaurire la memoria.

Il problema

Approccio naive (❌)

// ❌ Carica TUTTO in memoria

const fs = require('fs');

const hugeData = JSON.parse(fs.readFileSync('huge.json', 'utf8'));

// CRASH! Out of memory per file > 500MB

Problemi:
  • 💥 Out of memory errors
  • 🐌 Parsing lento
  • ⏳ Blocking operations
  • 💾 Spreco risorse

Soluzioni

  • Streaming - Processa dati incrementalmente
  • Chunking - Divide in parti gestibili
  • JSON Lines - Un object per riga
  • Database - Usa strumento adatto
  • Alternative formats - Parquet, Arrow
  • Streaming JSON

    JSON Lines (JSONL)

    Formato:
    {"id": 1, "name": "Marco", "age": 30}
    

    {"id": 2, "name": "Laura", "age": 25}

    {"id": 3, "name": "Giovanni", "age": 35}

    Ogni riga = JSON object valido.

    Vantaggi:
    • ✅ Processa riga per riga
    • ✅ Memoria costante
    • ✅ Facile append
    • ✅ Parallelize facilmente

    Node.js streaming:
    const fs = require('fs');
    

    const readline = require('readline');

    async function processLargeJSONL(filePath) {

    const fileStream = fs.createReadStream(filePath);

    const rl = readline.createInterface({

    input: fileStream,

    crlfDelay: Infinity

    });

    let count = 0;

    for await (const line of rl) {

    // Parse singola riga

    const record = JSON.parse(line);

    // Process record

    await processRecord(record);

    count++;

    if (count % 10000 === 0) {

    console.log(Processed ${count} records);

    }

    }

    console.log(Total: ${count} records);

    }

    // Uso

    processLargeJSONL('huge.jsonl');

    Python streaming:
    def process_jsonl(filename):
    

    """Process JSONL file riga per riga"""

    with open(filename, 'r') as f:

    for line_num, line in enumerate(f, 1):

    # Parse single JSON object

    record = json.loads(line)

    # Process

    process_record(record)

    # Progress

    if line_num % 10000 == 0:

    print(f"Processed {line_num} records")

    # Uso

    process_jsonl('data.jsonl')

    Streaming parser

    Node.js - JSONStream:
    const fs = require('fs');
    

    const JSONStream = require('JSONStream');

    // Stream array di objects

    fs.createReadStream('large.json')

    .pipe(JSONStream.parse('items.')) // Path to array

    .on('data', (item) => {

    // Process singolo item

    processItem(item);

    })

    .on('end', () => {

    console.log('Completed');

    });

    Con transform stream:
    const { Transform } = require('stream');
    
    

    const processStream = new Transform({

    objectMode: true,

    transform(chunk, encoding, callback) {

    // Transform data

    const processed = {

    ...chunk,

    processed: true,

    timestamp: new Date()

    };

    callback(null, processed);

    }

    });

    fs.createReadStream('input.json')

    .pipe(JSONStream.parse(''))

    .pipe(processStream)

    .pipe(JSONStream.stringify())

    .pipe(fs.createWriteStream('output.json'));

    Python - ijson:
    import ijson
    
    

    def stream_large_json(filename):

    """Stream parse JSON grande"""

    with open(filename, 'rb') as f:

    # Iterate su array items

    parser = ijson.items(f, 'items.item')

    for item in parser:

    # Process item senza caricare tutto

    process_item(item)

    # Con filtering

    def stream_filtered(filename, min_value):

    """Stream con filtro"""

    with open(filename, 'rb') as f:

    parser = ijson.items(f, 'data.item')

    for item in parser:

    if item.get('value', 0) > min_value:

    yield item

    # Uso

    for item in stream_filtered('large.json', 100):

    print(item)

    Chunk processing

    Pandas chunking

    import pandas as pd
    
    

    # Read in chunks

    chunk_size = 10000

    chunks = []

    for chunk in pd.read_json('large.json', lines=True, chunksize=chunk_size):

    # Process chunk

    filtered = chunk[chunk['value'] > 100]

    aggregated = filtered.groupby('category').sum()

    chunks.append(aggregated)

    # Combine results

    result = pd.concat(chunks)

    final = result.groupby(level=0).sum()

    Custom chunking

    const fs = require('fs');
    
    

    async function chunkJSON(filePath, chunkSize = 1000) {

    const fileStream = fs.createReadStream(filePath);

    const rl = readline.createInterface({ input: fileStream });

    let chunk = [];

    for await (const line of rl) {

    const record = JSON.parse(line);

    chunk.push(record);

    if (chunk.length >= chunkSize) {

    yield chunk;

    chunk = [];

    }

    }

    // Ultimo chunk

    if (chunk.length > 0) {

    yield chunk;

    }

    }

    // Uso

    for await (const chunk of chunkJSON('data.jsonl', 5000)) {

    console.log(Processing ${chunk.length} records);

    // Process chunk

    const results = await processBatch(chunk);

    await saveToDB(results);

    }

    Memory optimization

    JavaScript

    1. Usa streams invece di readFile:

    Errato:

    const data = JSON.parse(fs.readFileSync('huge.json'));

    Corretto:

    const stream = fs.createReadStream('huge.json')
    

    .pipe(JSONStream.parse(''));

    2. Garbage collection hints:
    async function processInBatches(items) {
    

    for (let i = 0; i < items.length; i += 1000) {

    const batch = items.slice(i, i + 1000);

    await processBatch(batch);

    // Hint GC dopo ogni batch

    if (global.gc) {

    global.gc();

    }

    }

    }

    3. WeakMap per caching:
    const cache = new WeakMap();
    
    

    function processWithCache(obj) {

    if (cache.has(obj)) {

    return cache.get(obj);

    }

    const result = expensiveOperation(obj);

    cache.set(obj, result);

    return result;

    }

    // WeakMap permette GC di oggetti non più usati

    Python

    1. Generators invece di list:

    Errato:

    def load_all():
    

    with open('huge.json') as f:

    return [json.loads(line) for line in f]

    data = load_all() # Tutto in memoria!

    Corretto:

    def stream_data():
    

    """Generator - memoria costante"""

    with open('huge.json') as f:

    for line in f:

    yield json.loads(line)

    for record in stream_data():

    process(record) # Uno alla volta

    2. Explicit cleanup:
    import gc
    
    

    def process_large_file(filename):

    for chunk in read_chunks(filename):

    process_chunk(chunk)

    # Clear memory

    del chunk

    gc.collect()

    3. Memory profiling:
    from memory_profiler import profile
    
    

    @profile

    def process_data(filename):

    # Memory usage mostrato per ogni riga

    data = []

    with open(filename) as f:

    for line in f:

    data.append(json.loads(line))

    return data

    # Run con: python -m memory_profiler script.py

    Parallel processing

    Node.js Worker Threads

    const { Worker } = require('worker_threads');
    

    const os = require('os');

    async function processInParallel(filePath) {

    const numWorkers = os.cpus().length;

    const workers = [];

    // Split file in chunks

    const chunks = await splitFile(filePath, numWorkers);

    // Create workers

    for (const chunk of chunks) {

    workers.push(new Promise((resolve, reject) => {

    const worker = new Worker('./worker.js', {

    workerData: { chunk }

    });

    worker.on('message', resolve);

    worker.on('error', reject);

    }));

    }

    // Wait all

    const results = await Promise.all(workers);

    return mergeResults(results);

    }

    // worker.js

    const { workerData, parentPort } = require('worker_threads');

    function processChunk(chunk) {

    const results = [];

    for (const record of chunk) {

    // Process record

    const result = process(record);

    results.push(result);

    }

    return results;

    }

    parentPort.postMessage(processChunk(workerData.chunk));

    Python multiprocessing

    from multiprocessing import Pool
    

    import json

    def process_chunk(chunk_info):

    """Process un chunk del file"""

    filename, start_line, end_line = chunk_info

    results = []

    with open(filename) as f:

    # Skip a start

    for _ in range(start_line):

    next(f)

    # Process chunk

    for i, line in enumerate(f):

    if i >= end_line - start_line:

    break

    record = json.loads(line)

    result = process_record(record)

    results.append(result)

    return results

    def parallel_process(filename, num_workers=4):

    """Process file in parallelo"""

    # Count lines

    with open(filename) as f:

    total_lines = sum(1 for _ in f)

    # Split in chunks

    chunk_size = total_lines // num_workers

    chunks = [

    (filename, i chunk_size, (i + 1) chunk_size)

    for i in range(num_workers)

    ]

    # Process in parallel

    with Pool(num_workers) as pool:

    results = pool.map(process_chunk, chunks)

    # Merge

    return [item for sublist in results for item in sublist]

    Alternative formats

    Convertire a Parquet

    Python:
    import pandas as pd
    

    import json

    # Read JSON in chunks

    chunks = []

    for chunk in pd.read_json('huge.json', lines=True, chunksize=100000):

    chunks.append(chunk)

    df = pd.concat(chunks, ignore_index=True)

    # Write Parquet (molto più efficiente!)

    df.to_parquet('data.parquet', compression='snappy')

    # Read Parquet (10-100x più veloce)

    df = pd.read_parquet('data.parquet')

    Dimensioni confronto:
    • JSON: 1.2 GB
    • Parquet (snappy): 350 MB (3.4x più piccolo)
    • Parquet (gzip): 280 MB (4.3x più piccolo)

    Performance confronto:
    • JSON read: 45 secondi
    • Parquet read: 3 secondi (15x più veloce!)

    Arrow IPC

    import pyarrow as pa
    

    import pyarrow.json as paj

    # Read JSON → Arrow

    table = paj.read_json('data.json')

    # Write Arrow IPC (formato binario efficiente)

    with pa.OSFile('data.arrow', 'wb') as f:

    with pa.RecordBatchFileWriter(f, table.schema) as writer:

    writer.write_table(table)

    # Read Arrow (velocissimo!)

    with pa.OSFile('data.arrow', 'rb') as f:

    with pa.RecordBatchFileReader(f) as reader:

    table = reader.read_all()

    Database approach

    SQLite

    Per query ripetute:
    import sqlite3
    

    import json

    # Create DB

    conn = sqlite3.connect('data.db')

    cursor = conn.cursor()

    cursor.execute('''

    CREATE TABLE records (

    id INTEGER PRIMARY KEY,

    data JSON,

    category TEXT,

    value REAL

    )

    ''')

    # Import JSON

    with open('huge.jsonl') as f:

    for line in f:

    record = json.loads(line)

    cursor.execute(

    'INSERT INTO records (data, category, value) VALUES (?, ?, ?)',

    (json.dumps(record), record['category'], record['value'])

    )

    conn.commit()

    # Query veloce!

    cursor.execute('SELECT * FROM records WHERE value > 100')

    results = cursor.fetchall()

    MongoDB

    Per JSON complessi:
    from pymongo import MongoClient
    

    import json

    client = MongoClient('mongodb://localhost:27017/')

    db = client['mydb']

    collection = db['records']

    # Bulk insert

    def bulk_import(filename, batch_size=10000):

    batch = []

    with open(filename) as f:

    for line in f:

    record = json.loads(line)

    batch.append(record)

    if len(batch) >= batch_size:

    collection.insert_many(batch)

    batch = []

    if batch:

    collection.insert_many(batch)

    # Import

    bulk_import('huge.jsonl')

    # Query con index

    collection.create_index('category')

    results = collection.find({'category': 'electronics', 'value': {'$gt': 100}})

    Tools esterni

    jq (command line)

    Filtra file grandi:
    # Filtra e compatta
    

    jq -c '.[] | select(.value > 100)' huge.json > filtered.jsonl

    # Estrai campi

    jq -c '{id, name, value}' input.json > output.json

    # Streaming (--stream per file ENORMI)

    jq --stream -c 'select(length == 2 and .[0][0] == "items")' huge.json

    Miller (mlr)

    Swiss army knife per dati:
    # JSON → CSV
    

    mlr --j2c cat huge.json > output.csv

    # Filter

    mlr --json filter '$value > 100' huge.json

    # Stats

    mlr --json stats1 -a mean,count -f value -g category huge.json

    Best practices

    1. Scegli formato giusto

    | Scenario | Formato | Motivo |

    |----------|---------|--------|

    | Streaming | JSONL | Riga per riga |

    | Analytics | Parquet | Compresso, columnar |

    | Real-time | Arrow | Zero-copy, veloce |

    | General | JSON | Compatibilità |

    2. Profila prima di ottimizzare

    import cProfile
    

    import pstats

    # Profile code

    profiler = cProfile.Profile()

    profiler.enable()

    process_large_file('data.json')

    profiler.disable()

    stats = pstats.Stats(profiler)

    stats.sort_stats('cumulative')

    stats.print_stats(20) # Top 20 funzioni

    3. Monitora memoria

    // Node.js memory usage
    

    setInterval(() => {

    const usage = process.memoryUsage();

    console.log(Memory: ${Math.round(usage.heapUsed / 1024 / 1024)}MB);

    }, 1000);

    4. Usa compressione

    # Comprimi con gzip
    

    gzip huge.json

    # huge.json.gz (5-10x più piccolo)

    # Stream decompressione + processing

    zcat huge.json.gz | jq -c 'select(.value > 100)'

    import gzip
    

    import json

    # Read gzipped JSON

    with gzip.open('data.json.gz', 'rt') as f:

    for line in f:

    record = json.loads(line)

    process(record)

    Conclusione

    Strategie per JSON grandi: < 100MB: Standard JSON.parse OK 100MB - 1GB: Streaming o chunking > 1GB: JSON Lines + streaming > 10GB: Database o Parquet Workflow consigliato:
  • Receive: JSON da API
  • Convert: → JSONL per processing
  • Store: → Parquet per analytics
  • Query: Usa database se necessario
  • Tools essenziali:
    • jq - CLI filtering
    • ijson/JSONStream - Streaming parsing
    • Pandas - Chunked reading
    • Parquet - Efficient storage

    Non usare JSON per tutto! Per big data, considera formati ottimizzati come Parquet o Arrow.

    Share:

    Articoli Correlati

    Read in English