Trabajar con Archivos JSON Grandes: Técnicas de Streaming y Optimización
Aprende a manejar archivos JSON de varios GB con streaming, procesamiento incremental y optimización de memoria.
Big JSON Team
• Technical WriterExpert in JSON data manipulation, API development, and web technologies. Passionate about creating tools that make developers' lives easier.
# Trabajar con Archivos JSON Grandes: Técnicas de Streaming y Optimización
Procesar archivos JSON de varios gigabytes requiere técnicas especiales. Esta guía completa te enseñará a manejar JSON grande eficientemente sin agotar la memoria.
El Problema con JSON Grande
Enfoque Tradicional (❌ No Escalable)
// ❌ Carga todo en memoria
const fs = require('fs');
const contenido = fs.readFileSync('archivo_enorme.json', 'utf8');
const datos = JSON.parse(contenido);
// OutOfMemory con archivos > 500MB
Problemas:
- Usa 3-5x el tamaño del archivo en memoria
- Bloquea el event loop
- Falla con archivos grandes
- Sin progreso durante carga
Definiendo "Grande"
Pequeño: < 10 MB- Carga normal funciona bien
- Considera streaming si procesamiento es pesado
- Requiere streaming
- Streaming obligatorio
- Considera procesamiento por lotes
- Evalúa bases de datos alternativas
Streaming JSON en Node.js
Opción 1: JSONStream
Instalación:npm install JSONStream
Uso Básico:
const fs = require('fs');
const JSONStream = require('JSONStream');
// Archivo: usuarios.json con formato:
// { "usuarios": [...] }
const stream = fs.createReadStream('usuarios.json', { encoding: 'utf8' })
.pipe(JSONStream.parse('usuarios.'))
.on('data', (usuario) => {
// Procesar cada usuario individualmente
console.log(usuario.nombre);
})
.on('end', () => {
console.log('Procesamiento completo');
})
.on('error', (error) => {
console.error('Error:', error);
});
Procesar Array en Raíz:
// Archivo: array.json con formato: [...]
const stream = fs.createReadStream('array.json', { encoding: 'utf8' })
.pipe(JSONStream.parse(''))
.on('data', (item) => {
procesarItem(item);
});
Filtrar Durante Streaming:
const JSONStream = require('JSONStream');
const es = require('event-stream');
const fs = require('fs');
fs.createReadStream('usuarios.json')
.pipe(JSONStream.parse('usuarios.'))
.pipe(es.mapSync((usuario) => {
// Filtrar usuarios activos mayores de 30
if (usuario.activo && usuario.edad > 30) {
return usuario;
}
return null;
}))
.pipe(es.through(
function write(data) {
if (data) {
console.log(data);
}
}
));
Opción 2: stream-json
Instalación:npm install stream-json
Uso:
const fs = require('fs');
const { chain } = require('stream-chain');
const { parser } = require('stream-json');
const { streamArray } = require('stream-json/streamers/StreamArray');
const { pick } = require('stream-json/filters/Pick');
const pipeline = chain([
fs.createReadStream('datos.json'),
parser(),
pick({ filter: 'usuarios' }),
streamArray(),
(data) => {
const { key, value } = data;
console.log(Usuario #${key}: ${value.nombre});
return value;
}
]);
pipeline.on('data', (usuario) => {
procesarUsuario(usuario);
});
pipeline.on('end', () => console.log('Completo'));
Transformar y Escribir:
const { chain } = require('stream-chain');
const { parser } = require('stream-json');
const { streamArray } = require('stream-json/streamers/StreamArray');
const { stringer } = require('stream-json/Stringer');
chain([
fs.createReadStream('entrada.json'),
parser(),
streamArray(),
(data) => {
// Transformar cada item
data.value.procesado = true;
data.value.timestamp = Date.now();
return data;
},
stringer(),
fs.createWriteStream('salida.json')
]);
Opción 3: oboe.js (Navegador y Node)
Instalación:npm install oboe
Node.js:
const oboe = require('oboe');
oboe(fs.createReadStream('datos.json'))
.node('usuarios.', (usuario) => {
console.log('Usuario:', usuario.nombre);
// Retornar oboe.drop para liberar memoria
return oboe.drop;
})
.done(() => {
console.log('Completado');
})
.fail((error) => {
console.error('Error:', error);
});
Navegador (Fetch API):
oboe('/api/usuarios-grandes')
.node('usuarios.', (usuario) => {
añadirALista(usuario);
return oboe.drop;
})
.done(() => {
console.log('Todos los usuarios cargados');
});
Python: Streaming JSON
Opción 1: ijson
Instalación:pip install ijson
Uso Básico:
import ijson
# Iterar sobre items
with open('usuarios.json', 'rb') as archivo:
usuarios = ijson.items(archivo, 'usuarios.item')
for usuario in usuarios:
print(f"Usuario: {usuario['nombre']}")
Procesar con Filtro:
import ijson
def procesar_usuarios_grandes(archivo_path):
contador = 0
with open(archivo_path, 'rb') as archivo:
parser = ijson.items(archivo, 'usuarios.item')
for usuario in parser:
if usuario.get('edad', 0) > 30:
procesar(usuario)
contador += 1
# Progreso cada 1000 usuarios
if contador % 1000 == 0:
print(f"Procesados: {contador}")
return contador
Extraer Valores Específicos:
import ijson
# Solo extraer nombres
with open('usuarios.json', 'rb') as archivo:
nombres = ijson.items(archivo, 'usuarios.item.nombre')
for nombre in nombres:
print(nombre)
Opción 2: json-stream
from json_stream import streamable_list
from json_stream.requests import load
# Desde archivo
with open('datos.json') as archivo:
datos = streamable_list(archivo)
for item in datos:
procesar(item)
# Desde HTTP
respuesta = requests.get('https://api.ejemplo.com/datos-grandes', stream=True)
datos = load(respuesta.raw)
for item in datos['items']:
procesar(item)
Pandas con Chunks
import pandas as pd
# Leer JSON en chunks
chunks = []
tamano_chunk = 10000
for chunk in pd.read_json(
'datos.jsonl',
lines=True,
chunksize=tamano_chunk
):
# Procesar cada chunk
chunk_procesado = chunk[chunk['edad'] > 30]
chunks.append(chunk_procesado)
print(f"Procesado chunk de {len(chunk)} filas")
# Combinar resultados
resultado = pd.concat(chunks, ignore_index=True)
JSONLines para Big Data
¿Qué es JSONLines?
Formato donde cada línea es un JSON válido:
{"id": 1, "nombre": "Ana", "edad": 28}
{"id": 2, "nombre": "Carlos", "edad": 35}
{"id": 3, "nombre": "María", "edad": 31}
Ventajas
- ✅ Streaming trivial (línea por línea)
- ✅ Fácil de agregar datos
- ✅ Tolerante a errores parciales
- ✅ Compatible con herramientas Unix
- ✅ Ideal para logs
Procesar JSONLines (Node.js)
const fs = require('fs');
const readline = require('readline');
const rl = readline.createInterface({
input: fs.createReadStream('datos.jsonl'),
crlfDelay: Infinity
});
let contador = 0;
rl.on('line', (linea) => {
try {
const obj = JSON.parse(linea);
procesarObjeto(obj);
contador++;
if (contador % 10000 === 0) {
console.log(Procesadas ${contador} líneas);
}
} catch (error) {
console.error('Error en línea:', error);
}
});
rl.on('close', () => {
console.log(Total procesadas: ${contador});
});
Procesar JSONLines (Python)
import json
def procesar_jsonlines(archivo_path):
contador = 0
with open(archivo_path, 'r') as archivo:
for linea in archivo:
try:
obj = json.loads(linea)
procesar(obj)
contador += 1
if contador % 10000 == 0:
print(f"Procesadas {contador} líneas")
except json.JSONDecodeError as e:
print(f"Error en línea {contador}: {e}")
return contador
Convertir JSON a JSONLines
import json
def json_a_jsonlines(entrada, salida):
with open(entrada, 'r') as f_entrada:
datos = json.load(f_entrada)
with open(salida, 'w') as f_salida:
# Si es array
if isinstance(datos, list):
for item in datos:
f_salida.write(json.dumps(item) + '\n')
# Si tiene array anidado
elif 'items' in datos:
for item in datos['items']:
f_salida.write(json.dumps(item) + '\n')
# Uso
json_a_jsonlines('usuarios.json', 'usuarios.jsonl')
Optimización de Memoria
1. Procesar en Batches
const fs = require('fs');
const JSONStream = require('JSONStream');
const es = require('event-stream');
let batch = [];
const BATCH_SIZE = 100;
fs.createReadStream('datos.json')
.pipe(JSONStream.parse('items.'))
.pipe(es.through(
function write(data) {
batch.push(data);
if (batch.length >= BATCH_SIZE) {
procesarBatch(batch);
batch = [];
}
},
function end() {
// Procesar batch restante
if (batch.length > 0) {
procesarBatch(batch);
}
this.emit('end');
}
));
function procesarBatch(items) {
// Procesar 100 items a la vez
const resultado = items.map(transformar);
guardarEnBD(resultado);
}
2. Liberar Memoria Explícitamente
const v8 = require('v8');
function procesarConLiberacion() {
let procesados = 0;
stream.on('data', (item) => {
procesarItem(item);
procesados++;
// Liberar cada 10000 items
if (procesados % 10000 === 0) {
if (global.gc) {
global.gc();
}
console.log(Heap usado: ${v8.getHeapStatistics().used_heap_size / 1024 / 1024}MB);
}
});
}
// Ejecutar con: node --expose-gc script.js
3. Usar Workers para Paralelizar
const { Worker } = require('worker_threads');
const fs = require('fs');
const readline = require('readline');
const NUM_WORKERS = 4;
const workers = [];
let tareaId = 0;
// Crear workers
for (let i = 0; i < NUM_WORKERS; i++) {
const worker = new Worker('./worker.js');
workers.push(worker);
}
// Leer y distribuir
const rl = readline.createInterface({
input: fs.createReadStream('datos.jsonl')
});
rl.on('line', (linea) => {
const workerIndex = tareaId % NUM_WORKERS;
workers[workerIndex].postMessage({ id: tareaId, data: linea });
tareaId++;
});
// worker.js
const { parentPort } = require('worker_threads');
parentPort.on('message', ({ id, data }) => {
const obj = JSON.parse(data);
const resultado = procesarPesado(obj);
parentPort.postMessage({ id, resultado });
});
Herramientas de Línea de Comandos
jq para Filtrado
# Extraer campo específico
jq '.usuarios[].nombre' archivo_grande.json
# Filtrar
jq '.usuarios[] | select(.edad > 30)' archivo_grande.json
# Streaming mode para archivos grandes
jq --stream 'fromstream(1|truncate_stream([[0]]))' archivo.json
# Procesar JSONLines
cat datos.jsonl | jq -c 'select(.activo == true)'
jq con Streaming
# Modo streaming para JSON muy grande
jq --stream -c 'fromstream(1|truncate_stream([[2]]))' enorme.json
# Extraer valores sin cargar todo
jq -c --stream 'select(.[0][0] == "usuarios" and .[0][2] == "nombre") | .[1]' datos.json
GNU Parallel con jq
# Dividir y procesar en paralelo
cat datos.jsonl | parallel --pipe -N1000 'jq -c "select(.edad > 30)"' > filtrado.jsonl
# Procesar chunks
split -l 10000 datos.jsonl chunk_
ls chunk_ | parallel 'jq -c "select(.activo)" {} > {}.filtered'
cat .filtered > resultado.jsonl
Compresión
Leer JSON Comprimido
Node.js:const fs = require('fs');
const zlib = require('zlib');
const JSONStream = require('JSONStream');
fs.createReadStream('datos.json.gz')
.pipe(zlib.createGunzip())
.pipe(JSONStream.parse('usuarios.'))
.on('data', (usuario) => {
console.log(usuario);
});
Python:
import gzip
import json
# Leer JSON gzip
with gzip.open('datos.json.gz', 'rt', encoding='utf-8') as archivo:
datos = json.load(archivo)
# Streaming con gzip
import ijson
with gzip.open('datos.json.gz', 'rb') as archivo:
for item in ijson.items(archivo, 'usuarios.item'):
procesar(item)
Escribir Comprimido
const fs = require('fs');
const zlib = require('zlib');
const JSONStream = require('JSONStream');
// Leer, transformar, y escribir comprimido
fs.createReadStream('entrada.json')
.pipe(JSONStream.parse(''))
.pipe(JSONStream.stringify())
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('salida.json.gz'));
Visualizar JSON Grande
Big JSON Viewer
URL: bigjson.online Características:✓ Carga incremental
✓ Maneja archivos > 100 MB
✓ Vista de árbol navegable
✓ Búsqueda sin cargar todo
✓ No requiere instalación
VS Code con Streaming
Extensión: "Large File Viewer"- Carga solo partes visibles
- Búsqueda incremental
- Resaltado de sintaxis
Mejores Prácticas
1. Usa el Formato Correcto
JSONLines (.jsonl) - Mejor para:
✓ Logs
✓ Eventos
✓ Datos que crecen continuamente
✓ Procesamiento paralelo
JSON estándar - Mejor para:
✓ Configuración
✓ APIs
✓ Documentos pequeños/medianos
2. Monitorea Uso de Memoria
const used = process.memoryUsage();
console.log(
RSS: ${Math.round(used.rss / 1024 / 1024)} MB
Heap Total: ${Math.round(used.heapTotal / 1024 / 1024)} MB
Heap Used: ${Math.round(used.heapUsed / 1024 / 1024)} MB
);
3. Procesa Incrementalmente
def procesar_incremental(archivo_path, callback, tamano_batch=1000):
"""Procesa JSON en batches con callback"""
batch = []
with open(archivo_path, 'r') as f:
for linea in f:
item = json.loads(linea)
batch.append(item)
if len(batch) >= tamano_batch:
callback(batch)
batch = []
# Procesar restante
if batch:
callback(batch)
# Uso
def guardar_en_db(items):
db.insert_many(items)
procesar_incremental('datos.jsonl', guardar_en_db)
4. Considera Alternativas
Para datos > 10 GB, considera:
- MongoDB - JSON nativo
- PostgreSQL JSONB - Consultas SQL en JSON
- Parquet - Formato columnar comprimido
- Apache Arrow - Procesamiento en memoria eficiente
Conclusión
Manejar JSON grande requiere streaming y procesamiento incremental. Nunca cargues archivos grandes completamente en memoria.
Checklist
- ✓ Usa streaming para archivos > 100 MB
- ✓ Considera JSONLines para datos continuos
- ✓ Procesa en batches, no todo a la vez
- ✓ Monitorea uso de memoria
- ✓ Comprime cuando sea posible
- ✓ Usa herramientas apropiadas (jq, ijson, JSONStream)
¡Trabaja con JSON de cualquier tamaño eficientemente!
Artículos Relacionados
Las Mejores Herramientas JSON Online en 2024
Descubre las mejores herramientas JSON online para formatear, validar y trabajar con datos JSON. Comparativa completa incluyendo Big JSON Viewer.
Buscador de Rutas JSON: Guía Completa de JSONPath y jq
Aprende a navegar y consultar datos JSON complejos usando JSONPath y jq. Incluye ejemplos prácticos y mejores prácticas.
JSON en Ciencia de Datos: Pandas, NumPy y Análisis de Datos
Aprende a usar JSON en ciencia de datos con Python. Incluye Pandas, NumPy, visualización y casos de uso prácticos.