← Volver al Blog

Trabajar con Archivos JSON Grandes: Técnicas de Streaming y Optimización

Aprende a manejar archivos JSON de varios GB con streaming, procesamiento incremental y optimización de memoria.

Big JSON Team16 min de lecturaAvanzado
B

Big JSON Team

Technical Writer

Expert in JSON data manipulation, API development, and web technologies. Passionate about creating tools that make developers' lives easier.

16 min lectura

# Trabajar con Archivos JSON Grandes: Técnicas de Streaming y Optimización

Procesar archivos JSON de varios gigabytes requiere técnicas especiales. Esta guía completa te enseñará a manejar JSON grande eficientemente sin agotar la memoria.

El Problema con JSON Grande

Enfoque Tradicional (❌ No Escalable)

// ❌ Carga todo en memoria

const fs = require('fs');

const contenido = fs.readFileSync('archivo_enorme.json', 'utf8');

const datos = JSON.parse(contenido);

// OutOfMemory con archivos > 500MB

Problemas:
  • Usa 3-5x el tamaño del archivo en memoria
  • Bloquea el event loop
  • Falla con archivos grandes
  • Sin progreso durante carga

Definiendo "Grande"

Pequeño: < 10 MB
  • Carga normal funciona bien

Mediano: 10-100 MB
  • Considera streaming si procesamiento es pesado

Grande: 100 MB - 1 GB
  • Requiere streaming

Muy Grande: > 1 GB
  • Streaming obligatorio
  • Considera procesamiento por lotes
  • Evalúa bases de datos alternativas

Streaming JSON en Node.js

Opción 1: JSONStream

Instalación:
npm install JSONStream
Uso Básico:
const fs = require('fs');

const JSONStream = require('JSONStream');

// Archivo: usuarios.json con formato:

// { "usuarios": [...] }

const stream = fs.createReadStream('usuarios.json', { encoding: 'utf8' })

.pipe(JSONStream.parse('usuarios.'))

.on('data', (usuario) => {

// Procesar cada usuario individualmente

console.log(usuario.nombre);

})

.on('end', () => {

console.log('Procesamiento completo');

})

.on('error', (error) => {

console.error('Error:', error);

});

Procesar Array en Raíz:
// Archivo: array.json con formato: [...]

const stream = fs.createReadStream('array.json', { encoding: 'utf8' })

.pipe(JSONStream.parse(''))

.on('data', (item) => {

procesarItem(item);

});

Filtrar Durante Streaming:
const JSONStream = require('JSONStream');

const es = require('event-stream');

const fs = require('fs');

fs.createReadStream('usuarios.json')

.pipe(JSONStream.parse('usuarios.'))

.pipe(es.mapSync((usuario) => {

// Filtrar usuarios activos mayores de 30

if (usuario.activo && usuario.edad > 30) {

return usuario;

}

return null;

}))

.pipe(es.through(

function write(data) {

if (data) {

console.log(data);

}

}

));

Opción 2: stream-json

Instalación:
npm install stream-json
Uso:
const fs = require('fs');

const { chain } = require('stream-chain');

const { parser } = require('stream-json');

const { streamArray } = require('stream-json/streamers/StreamArray');

const { pick } = require('stream-json/filters/Pick');

const pipeline = chain([

fs.createReadStream('datos.json'),

parser(),

pick({ filter: 'usuarios' }),

streamArray(),

(data) => {

const { key, value } = data;

console.log(Usuario #${key}: ${value.nombre});

return value;

}

]);

pipeline.on('data', (usuario) => {

procesarUsuario(usuario);

});

pipeline.on('end', () => console.log('Completo'));

Transformar y Escribir:
const { chain } = require('stream-chain');

const { parser } = require('stream-json');

const { streamArray } = require('stream-json/streamers/StreamArray');

const { stringer } = require('stream-json/Stringer');

chain([

fs.createReadStream('entrada.json'),

parser(),

streamArray(),

(data) => {

// Transformar cada item

data.value.procesado = true;

data.value.timestamp = Date.now();

return data;

},

stringer(),

fs.createWriteStream('salida.json')

]);

Opción 3: oboe.js (Navegador y Node)

Instalación:
npm install oboe
Node.js:
const oboe = require('oboe');

oboe(fs.createReadStream('datos.json'))

.node('usuarios.', (usuario) => {

console.log('Usuario:', usuario.nombre);

// Retornar oboe.drop para liberar memoria

return oboe.drop;

})

.done(() => {

console.log('Completado');

})

.fail((error) => {

console.error('Error:', error);

});

Navegador (Fetch API):
oboe('/api/usuarios-grandes')

.node('usuarios.', (usuario) => {

añadirALista(usuario);

return oboe.drop;

})

.done(() => {

console.log('Todos los usuarios cargados');

});

Python: Streaming JSON

Opción 1: ijson

Instalación:
pip install ijson
Uso Básico:
import ijson

# Iterar sobre items

with open('usuarios.json', 'rb') as archivo:

usuarios = ijson.items(archivo, 'usuarios.item')

for usuario in usuarios:

print(f"Usuario: {usuario['nombre']}")

Procesar con Filtro:
import ijson

def procesar_usuarios_grandes(archivo_path):

contador = 0

with open(archivo_path, 'rb') as archivo:

parser = ijson.items(archivo, 'usuarios.item')

for usuario in parser:

if usuario.get('edad', 0) > 30:

procesar(usuario)

contador += 1

# Progreso cada 1000 usuarios

if contador % 1000 == 0:

print(f"Procesados: {contador}")

return contador

Extraer Valores Específicos:
import ijson

# Solo extraer nombres

with open('usuarios.json', 'rb') as archivo:

nombres = ijson.items(archivo, 'usuarios.item.nombre')

for nombre in nombres:

print(nombre)

Opción 2: json-stream

from json_stream import streamable_list

from json_stream.requests import load

# Desde archivo

with open('datos.json') as archivo:

datos = streamable_list(archivo)

for item in datos:

procesar(item)

# Desde HTTP

respuesta = requests.get('https://api.ejemplo.com/datos-grandes', stream=True)

datos = load(respuesta.raw)

for item in datos['items']:

procesar(item)

Pandas con Chunks

import pandas as pd

# Leer JSON en chunks

chunks = []

tamano_chunk = 10000

for chunk in pd.read_json(

'datos.jsonl',

lines=True,

chunksize=tamano_chunk

):

# Procesar cada chunk

chunk_procesado = chunk[chunk['edad'] > 30]

chunks.append(chunk_procesado)

print(f"Procesado chunk de {len(chunk)} filas")

# Combinar resultados

resultado = pd.concat(chunks, ignore_index=True)

JSONLines para Big Data

¿Qué es JSONLines?

Formato donde cada línea es un JSON válido:

{"id": 1, "nombre": "Ana", "edad": 28}

{"id": 2, "nombre": "Carlos", "edad": 35}

{"id": 3, "nombre": "María", "edad": 31}

Ventajas

  • ✅ Streaming trivial (línea por línea)
  • ✅ Fácil de agregar datos
  • ✅ Tolerante a errores parciales
  • ✅ Compatible con herramientas Unix
  • ✅ Ideal para logs

Procesar JSONLines (Node.js)

const fs = require('fs');

const readline = require('readline');

const rl = readline.createInterface({

input: fs.createReadStream('datos.jsonl'),

crlfDelay: Infinity

});

let contador = 0;

rl.on('line', (linea) => {

try {

const obj = JSON.parse(linea);

procesarObjeto(obj);

contador++;

if (contador % 10000 === 0) {

console.log(Procesadas ${contador} líneas);

}

} catch (error) {

console.error('Error en línea:', error);

}

});

rl.on('close', () => {

console.log(Total procesadas: ${contador});

});

Procesar JSONLines (Python)

import json

def procesar_jsonlines(archivo_path):

contador = 0

with open(archivo_path, 'r') as archivo:

for linea in archivo:

try:

obj = json.loads(linea)

procesar(obj)

contador += 1

if contador % 10000 == 0:

print(f"Procesadas {contador} líneas")

except json.JSONDecodeError as e:

print(f"Error en línea {contador}: {e}")

return contador

Convertir JSON a JSONLines

import json

def json_a_jsonlines(entrada, salida):

with open(entrada, 'r') as f_entrada:

datos = json.load(f_entrada)

with open(salida, 'w') as f_salida:

# Si es array

if isinstance(datos, list):

for item in datos:

f_salida.write(json.dumps(item) + '\n')

# Si tiene array anidado

elif 'items' in datos:

for item in datos['items']:

f_salida.write(json.dumps(item) + '\n')

# Uso

json_a_jsonlines('usuarios.json', 'usuarios.jsonl')

Optimización de Memoria

1. Procesar en Batches

const fs = require('fs');

const JSONStream = require('JSONStream');

const es = require('event-stream');

let batch = [];

const BATCH_SIZE = 100;

fs.createReadStream('datos.json')

.pipe(JSONStream.parse('items.'))

.pipe(es.through(

function write(data) {

batch.push(data);

if (batch.length >= BATCH_SIZE) {

procesarBatch(batch);

batch = [];

}

},

function end() {

// Procesar batch restante

if (batch.length > 0) {

procesarBatch(batch);

}

this.emit('end');

}

));

function procesarBatch(items) {

// Procesar 100 items a la vez

const resultado = items.map(transformar);

guardarEnBD(resultado);

}

2. Liberar Memoria Explícitamente

const v8 = require('v8');

function procesarConLiberacion() {

let procesados = 0;

stream.on('data', (item) => {

procesarItem(item);

procesados++;

// Liberar cada 10000 items

if (procesados % 10000 === 0) {

if (global.gc) {

global.gc();

}

console.log(Heap usado: ${v8.getHeapStatistics().used_heap_size / 1024 / 1024}MB);

}

});

}

// Ejecutar con: node --expose-gc script.js

3. Usar Workers para Paralelizar

const { Worker } = require('worker_threads');

const fs = require('fs');

const readline = require('readline');

const NUM_WORKERS = 4;

const workers = [];

let tareaId = 0;

// Crear workers

for (let i = 0; i < NUM_WORKERS; i++) {

const worker = new Worker('./worker.js');

workers.push(worker);

}

// Leer y distribuir

const rl = readline.createInterface({

input: fs.createReadStream('datos.jsonl')

});

rl.on('line', (linea) => {

const workerIndex = tareaId % NUM_WORKERS;

workers[workerIndex].postMessage({ id: tareaId, data: linea });

tareaId++;

});

// worker.js

const { parentPort } = require('worker_threads');

parentPort.on('message', ({ id, data }) => {

const obj = JSON.parse(data);

const resultado = procesarPesado(obj);

parentPort.postMessage({ id, resultado });

});

Herramientas de Línea de Comandos

jq para Filtrado

# Extraer campo específico

jq '.usuarios[].nombre' archivo_grande.json

# Filtrar

jq '.usuarios[] | select(.edad > 30)' archivo_grande.json

# Streaming mode para archivos grandes

jq --stream 'fromstream(1|truncate_stream([[0]]))' archivo.json

# Procesar JSONLines

cat datos.jsonl | jq -c 'select(.activo == true)'

jq con Streaming

# Modo streaming para JSON muy grande

jq --stream -c 'fromstream(1|truncate_stream([[2]]))' enorme.json

# Extraer valores sin cargar todo

jq -c --stream 'select(.[0][0] == "usuarios" and .[0][2] == "nombre") | .[1]' datos.json

GNU Parallel con jq

# Dividir y procesar en paralelo

cat datos.jsonl | parallel --pipe -N1000 'jq -c "select(.edad > 30)"' > filtrado.jsonl

# Procesar chunks

split -l 10000 datos.jsonl chunk_

ls chunk_ | parallel 'jq -c "select(.activo)" {} > {}.filtered'

cat .filtered > resultado.jsonl

Compresión

Leer JSON Comprimido

Node.js:
const fs = require('fs');

const zlib = require('zlib');

const JSONStream = require('JSONStream');

fs.createReadStream('datos.json.gz')

.pipe(zlib.createGunzip())

.pipe(JSONStream.parse('usuarios.'))

.on('data', (usuario) => {

console.log(usuario);

});

Python:
import gzip

import json

# Leer JSON gzip

with gzip.open('datos.json.gz', 'rt', encoding='utf-8') as archivo:

datos = json.load(archivo)

# Streaming con gzip

import ijson

with gzip.open('datos.json.gz', 'rb') as archivo:

for item in ijson.items(archivo, 'usuarios.item'):

procesar(item)

Escribir Comprimido

const fs = require('fs');

const zlib = require('zlib');

const JSONStream = require('JSONStream');

// Leer, transformar, y escribir comprimido

fs.createReadStream('entrada.json')

.pipe(JSONStream.parse(''))

.pipe(JSONStream.stringify())

.pipe(zlib.createGzip())

.pipe(fs.createWriteStream('salida.json.gz'));

Visualizar JSON Grande

Big JSON Viewer

URL: bigjson.online Características:
✓ Carga incremental

✓ Maneja archivos > 100 MB

✓ Vista de árbol navegable

✓ Búsqueda sin cargar todo

✓ No requiere instalación

VS Code con Streaming

Extensión: "Large File Viewer"
- Carga solo partes visibles
  • Búsqueda incremental
  • Resaltado de sintaxis

Mejores Prácticas

1. Usa el Formato Correcto

JSONLines (.jsonl) - Mejor para:

✓ Logs

✓ Eventos

✓ Datos que crecen continuamente

✓ Procesamiento paralelo

JSON estándar - Mejor para:

✓ Configuración

✓ APIs

✓ Documentos pequeños/medianos

2. Monitorea Uso de Memoria

const used = process.memoryUsage();

console.log(

RSS: ${Math.round(used.rss / 1024 / 1024)} MB

Heap Total: ${Math.round(used.heapTotal / 1024 / 1024)} MB

Heap Used: ${Math.round(used.heapUsed / 1024 / 1024)} MB

);

3. Procesa Incrementalmente

def procesar_incremental(archivo_path, callback, tamano_batch=1000):

"""Procesa JSON en batches con callback"""

batch = []

with open(archivo_path, 'r') as f:

for linea in f:

item = json.loads(linea)

batch.append(item)

if len(batch) >= tamano_batch:

callback(batch)

batch = []

# Procesar restante

if batch:

callback(batch)

# Uso

def guardar_en_db(items):

db.insert_many(items)

procesar_incremental('datos.jsonl', guardar_en_db)

4. Considera Alternativas

Para datos > 10 GB, considera:

  • MongoDB - JSON nativo
  • PostgreSQL JSONB - Consultas SQL en JSON
  • Parquet - Formato columnar comprimido
  • Apache Arrow - Procesamiento en memoria eficiente

Conclusión

Manejar JSON grande requiere streaming y procesamiento incremental. Nunca cargues archivos grandes completamente en memoria.

Checklist

  • ✓ Usa streaming para archivos > 100 MB
  • ✓ Considera JSONLines para datos continuos
  • ✓ Procesa en batches, no todo a la vez
  • ✓ Monitorea uso de memoria
  • ✓ Comprime cuando sea posible
  • ✓ Usa herramientas apropiadas (jq, ijson, JSONStream)

¡Trabaja con JSON de cualquier tamaño eficientemente!

Share:

Artículos Relacionados

Read in English