データサイエンスにおけるJSON
データサイエンスと機械学習でのJSON活用。Pandas、NumPy、分析ワークフローでの使用方法。
Big JSON Team
• Technical WriterExpert in JSON data manipulation, API development, and web technologies. Passionate about creating tools that make developers' lives easier.
# データサイエンスにおけるJSON
データサイエンスと機械学習プロジェクトでJSONを効果的に活用する方法を学びます。
なぜデータサイエンスでJSONか
利点
- 🌐 API統合 - ウェブAPIから直接データ取得
- 📊 柔軟な構造 - ネストされた複雑なデータ
- 🔄 相互運用性 - 異なるシステム間のデータ交換
- 📦 メタデータ - データと一緒にメタデータを保存
- 🚀 NoSQLデータベース - MongoDBなどとの親和性
用途
- APIレスポンスの処理
- 設定ファイル
- 機械学習モデルの設定
- 実験結果の保存
- データセットのメタデータ
Pandasとの統合
JSONからDataFrameへ
基本的な読み込み
import pandas as pd
# JSONファイルから読み込み
df = pd.read_json('data.json')
# JSON文字列から
json_str = '{"name": ["太郎", "花子"], "age": [30, 25]}'
df = pd.read_json(json_str)
print(df)
# name age
# 0 太郎 30
# 1 花子 25
レコード形式
[
{"name": "田中太郎", "age": 30, "city": "東京"},
{"name": "佐藤花子", "age": 25, "city": "大阪"}
]
df = pd.read_json('users.json')
カラム形式
{
"name": ["田中太郎", "佐藤花子"],
"age": [30, 25],
"city": ["東京", "大阪"]
}
df = pd.read_json('users.json', orient='columns')
ネストされたJSONの処理
json_normalize
from pandas import json_normalize
data = {
"users": [
{
"name": "田中太郎",
"age": 30,
"address": {
"city": "東京",
"zip": "100-0001"
}
},
{
"name": "佐藤花子",
"age": 25,
"address": {
"city": "大阪",
"zip": "530-0001"
}
}
]
}
# ネストを展開
df = json_normalize(data['users'])
print(df)
# name age address.city address.zip
# 0 田中太郎 30 東京 100-0001
# 1 佐藤花子 25 大阪 530-0001
セパレーターのカスタマイズ
df = json_normalize(
data['users'],
sep='_'
)
print(df)
# name age address_city address_zip
# 0 田中太郎 30 東京 100-0001
深いネスト
data = {
"company": {
"name": "Tech Corp",
"employees": [
{
"name": "田中",
"projects": [
{"name": "A", "status": "完了"},
{"name": "B", "status": "進行中"}
]
}
]
}
}
df = json_normalize(
data['company']['employees'],
record_path='projects',
meta=['name']
)
print(df)
# name status name
# 0 A 完了 田中
# 1 B 進行中 田中
DataFrameからJSONへ
# レコード形式
json_str = df.to_json(orient='records', force_ascii=False)
# ファイルに保存
df.to_json('output.json', orient='records', force_ascii=False, indent=2)
# 他の形式
df.to_json('data.json', orient='columns') # カラム形式
df.to_json('data.json', orient='index') # インデックス形式
df.to_json('data.json', orient='split') # 分割形式
NumPyとの統合
NumPy配列とJSON
import numpy as np
import json
# NumPy配列
arr = np.array([[1, 2, 3], [4, 5, 6]])
# JSONに変換(カスタムエンコーダー)
class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
return super().default(obj)
# 使用
data = {
'array': arr,
'mean': np.mean(arr),
'std': np.std(arr)
}
json_str = json.dumps(data, cls=NumpyEncoder, indent=2)
print(json_str)
JSONからNumPy配列
json_data = '{"array": [[1, 2, 3], [4, 5, 6]]}'
data = json.loads(json_data)
# リストをNumPy配列に
arr = np.array(data['array'])
print(arr)
API データの取得と処理
requestsとPandas
import requests
import pandas as pd
# APIからデータ取得
response = requests.get('https://api.example.com/users')
data = response.json()
# DataFrameに変換
df = pd.DataFrame(data['results'])
# データクリーニング
df = df.dropna()
df['created_at'] = pd.to_datetime(df['created_at'])
# 分析
print(df.describe())
print(df.groupby('city').size())
ページネーション処理
import requests
import pandas as pd
def fetch_all_pages(base_url):
all_data = []
page = 1
while True:
response = requests.get(f'{base_url}?page={page}')
data = response.json()
if not data['results']:
break
all_data.extend(data['results'])
page += 1
return pd.DataFrame(all_data)
df = fetch_all_pages('https://api.example.com/users')
機械学習での使用
データセットのメタデータ
import json
# データセットのメタデータ
metadata = {
"name": "顧客データセット",
"version": "1.0",
"created_at": "2026-01-16",
"features": [
{
"name": "age",
"type": "numeric",
"description": "年齢",
"min": 18,
"max": 80
},
{
"name": "income",
"type": "numeric",
"description": "年収(万円)"
},
{
"name": "category",
"type": "categorical",
"values": ["A", "B", "C"]
}
],
"target": "purchased",
"samples": 10000
}
# 保存
with open('dataset_metadata.json', 'w', encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
モデル設定
# ハイパーパラメータ設定
config = {
"model_type": "random_forest",
"parameters": {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 2,
"random_state": 42
},
"features": ["age", "income", "education"],
"target": "purchased",
"cv_folds": 5
}
# モデルトレーニング
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(*config['parameters'])
実験結果の記録
import json
from datetime import datetime
# 実験結果
experiment = {
"id": "exp_001",
"timestamp": datetime.now().isoformat(),
"model": "RandomForest",
"parameters": {
"n_estimators": 100,
"max_depth": 10
},
"metrics": {
"accuracy": 0.85,
"precision": 0.82,
"recall": 0.88,
"f1_score": 0.85
},
"confusion_matrix": [[100, 20], [15, 165]],
"feature_importance": {
"age": 0.35,
"income": 0.45,
"education": 0.20
}
}
# 保存
with open(f'experiments/{experiment["id"]}.json', 'w') as f:
json.dump(experiment, f, indent=2)
時系列データ
JSON Lines形式
import pandas as pd
# JSON Lines (.jsonl) - 1行に1つのJSONオブジェクト
data = '''
{"timestamp": "2026-01-16T10:00:00", "value": 100, "sensor": "A"}
{"timestamp": "2026-01-16T10:01:00", "value": 105, "sensor": "A"}
{"timestamp": "2026-01-16T10:02:00", "value": 103, "sensor": "A"}
'''
# 読み込み
df = pd.read_json(data, lines=True)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.set_index('timestamp')
print(df)
ストリーミングデータ
import json
def process_streaming_data(file_path):
"""大きなJSONLファイルをストリーム処理"""
with open(file_path, 'r') as f:
for line in f:
data = json.loads(line)
# 各レコードを処理
yield process_record(data)
def process_record(record):
# データ処理ロジック
return {
'timestamp': record['timestamp'],
'processed_value': record['value'] 2
}
# 使用
for processed in process_streaming_data('sensor_data.jsonl'):
print(processed)
ビッグデータ処理
Daskでの並列処理
import dask.dataframe as dd
# 大きなJSONファイルをDaskで読み込み
ddf = dd.read_json('large_data.json', lines=True)
# 遅延実行
result = ddf.groupby('category')['value'].mean()
# 実際の計算
print(result.compute())
チャンク処理
import pandas as pd
def process_large_json(file_path, chunksize=1000):
"""JSONファイルをチャンクで処理"""
chunks = []
with open(file_path, 'r') as f:
chunk = []
for i, line in enumerate(f):
chunk.append(json.loads(line))
if (i + 1) % chunksize == 0:
df_chunk = pd.DataFrame(chunk)
# チャンクごとに処理
processed = process_chunk(df_chunk)
chunks.append(processed)
chunk = []
# 残りのデータ
if chunk:
df_chunk = pd.DataFrame(chunk)
processed = process_chunk(df_chunk)
chunks.append(processed)
return pd.concat(chunks, ignore_index=True)
データ可視化
PlotlyでのJSON使用
import plotly.graph_objects as go
import json
# 可視化設定をJSONとして保存
plot_config = {
"type": "scatter",
"x": [1, 2, 3, 4, 5],
"y": [10, 15, 13, 17, 20],
"mode": "lines+markers",
"name": "データ1"
}
# 設定から可視化作成
fig = go.Figure(data=[go.Scatter(plot_config)])
fig.show()
# 図をJSONとして保存
fig.write_json('plot.json')
データパイプライン
ETLパイプライン
import pandas as pd
import json
class DataPipeline:
def __init__(self, config_file):
with open(config_file, 'r') as f:
self.config = json.load(f)
def extract(self):
"""データ抽出"""
source = self.config['source']
if source['type'] == 'api':
return self.extract_from_api(source['url'])
elif source['type'] == 'file':
return pd.read_json(source['path'])
def transform(self, df):
"""データ変換"""
transforms = self.config['transforms']
for transform in transforms:
if transform['type'] == 'filter':
df = df[df[transform['column']] > transform['value']]
elif transform['type'] == 'rename':
df = df.rename(columns=transform['mapping'])
return df
def load(self, df):
"""データロード"""
destination = self.config['destination']
df.to_json(destination['path'], orient='records', indent=2)
def run(self):
"""パイプライン実行"""
df = self.extract()
df = self.transform(df)
self.load(df)
return df
# 設定ファイル (pipeline_config.json)
config = {
"source": {
"type": "api",
"url": "https://api.example.com/data"
},
"transforms": [
{"type": "filter", "column": "value", "value": 100},
{"type": "rename", "mapping": {"old_name": "new_name"}}
],
"destination": {
"path": "output.json"
}
}
# 使用
pipeline = DataPipeline('pipeline_config.json')
result = pipeline.run()
ベストプラクティス
1. 効率的な読み書き
# 良い - JSON Lines(大きなデータ向け)
df.to_json('data.jsonl', orient='records', lines=True)
# 悪い - 1つの大きな配列
df.to_json('data.json', orient='records')
2. メモリ管理
# チャンクで読み込み
for chunk in pd.read_json('large.jsonl', lines=True, chunksize=1000):
process(chunk)
3. データ型の指定
# 明示的にデータ型を指定
df = pd.read_json(
'data.json',
dtype={
'id': 'int64',
'value': 'float64',
'category': 'category'
}
)
4. エラーハンドリング
import json
def safe_load_json(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except json.JSONDecodeError as e:
print(f'JSON解析エラー: {e}')
return None
except FileNotFoundError:
print(f'ファイルが見つかりません: {file_path}')
return None
パフォーマンス最適化
ujsonの使用
import ujson # 高速JSONライブラリ
# 通常のjsonより高速
data = ujson.load(open('data.json'))
ujson.dump(data, open('output.json', 'w'))
orjsonの使用
import orjson # 最速のJSONライブラリ
# 読み込み
with open('data.json', 'rb') as f:
data = orjson.loads(f.read())
# 書き込み
with open('output.json', 'wb') as f:
f.write(orjson.dumps(data))
まとめ
データサイエンスでのJSON活用
推奨ツール
- pandas - データフレーム操作
- json_normalize - ネスト解除
- ujson/orjson - 高速処理
- Dask** - ビッグデータ
データサイエンスワークフローでJSONを効果的に活用しましょう!