← ブログに戻る

データサイエンスにおけるJSON

データサイエンスと機械学習でのJSON活用。Pandas、NumPy、分析ワークフローでの使用方法。

Big JSON Team15 分で読めますadvanced
B

Big JSON Team

Technical Writer

Expert in JSON data manipulation, API development, and web technologies. Passionate about creating tools that make developers' lives easier.

15 分読む

# データサイエンスにおけるJSON

データサイエンスと機械学習プロジェクトでJSONを効果的に活用する方法を学びます。

なぜデータサイエンスでJSONか

利点

  • 🌐 API統合 - ウェブAPIから直接データ取得
  • 📊 柔軟な構造 - ネストされた複雑なデータ
  • 🔄 相互運用性 - 異なるシステム間のデータ交換
  • 📦 メタデータ - データと一緒にメタデータを保存
  • 🚀 NoSQLデータベース - MongoDBなどとの親和性

用途

  • APIレスポンスの処理
  • 設定ファイル
  • 機械学習モデルの設定
  • 実験結果の保存
  • データセットのメタデータ

Pandasとの統合

JSONからDataFrameへ

基本的な読み込み

import pandas as pd

# JSONファイルから読み込み

df = pd.read_json('data.json')

# JSON文字列から

json_str = '{"name": ["太郎", "花子"], "age": [30, 25]}'

df = pd.read_json(json_str)

print(df)

# name age

# 0 太郎 30

# 1 花子 25

レコード形式

[

{"name": "田中太郎", "age": 30, "city": "東京"},

{"name": "佐藤花子", "age": 25, "city": "大阪"}

]

df = pd.read_json('users.json')

カラム形式

{

"name": ["田中太郎", "佐藤花子"],

"age": [30, 25],

"city": ["東京", "大阪"]

}

df = pd.read_json('users.json', orient='columns')

ネストされたJSONの処理

json_normalize

from pandas import json_normalize

data = {

"users": [

{

"name": "田中太郎",

"age": 30,

"address": {

"city": "東京",

"zip": "100-0001"

}

},

{

"name": "佐藤花子",

"age": 25,

"address": {

"city": "大阪",

"zip": "530-0001"

}

}

]

}

# ネストを展開

df = json_normalize(data['users'])

print(df)

# name age address.city address.zip

# 0 田中太郎 30 東京 100-0001

# 1 佐藤花子 25 大阪 530-0001

セパレーターのカスタマイズ

df = json_normalize(

data['users'],

sep='_'

)

print(df)

# name age address_city address_zip

# 0 田中太郎 30 東京 100-0001

深いネスト

data = {

"company": {

"name": "Tech Corp",

"employees": [

{

"name": "田中",

"projects": [

{"name": "A", "status": "完了"},

{"name": "B", "status": "進行中"}

]

}

]

}

}

df = json_normalize(

data['company']['employees'],

record_path='projects',

meta=['name']

)

print(df)

# name status name

# 0 A 完了 田中

# 1 B 進行中 田中

DataFrameからJSONへ

# レコード形式

json_str = df.to_json(orient='records', force_ascii=False)

# ファイルに保存

df.to_json('output.json', orient='records', force_ascii=False, indent=2)

# 他の形式

df.to_json('data.json', orient='columns') # カラム形式

df.to_json('data.json', orient='index') # インデックス形式

df.to_json('data.json', orient='split') # 分割形式

NumPyとの統合

NumPy配列とJSON

import numpy as np

import json

# NumPy配列

arr = np.array([[1, 2, 3], [4, 5, 6]])

# JSONに変換(カスタムエンコーダー)

class NumpyEncoder(json.JSONEncoder):

def default(self, obj):

if isinstance(obj, np.ndarray):

return obj.tolist()

if isinstance(obj, np.integer):

return int(obj)

if isinstance(obj, np.floating):

return float(obj)

return super().default(obj)

# 使用

data = {

'array': arr,

'mean': np.mean(arr),

'std': np.std(arr)

}

json_str = json.dumps(data, cls=NumpyEncoder, indent=2)

print(json_str)

JSONからNumPy配列

json_data = '{"array": [[1, 2, 3], [4, 5, 6]]}'

data = json.loads(json_data)

# リストをNumPy配列に

arr = np.array(data['array'])

print(arr)

API データの取得と処理

requestsとPandas

import requests

import pandas as pd

# APIからデータ取得

response = requests.get('https://api.example.com/users')

data = response.json()

# DataFrameに変換

df = pd.DataFrame(data['results'])

# データクリーニング

df = df.dropna()

df['created_at'] = pd.to_datetime(df['created_at'])

# 分析

print(df.describe())

print(df.groupby('city').size())

ページネーション処理

import requests

import pandas as pd

def fetch_all_pages(base_url):

all_data = []

page = 1

while True:

response = requests.get(f'{base_url}?page={page}')

data = response.json()

if not data['results']:

break

all_data.extend(data['results'])

page += 1

return pd.DataFrame(all_data)

df = fetch_all_pages('https://api.example.com/users')

機械学習での使用

データセットのメタデータ

import json

# データセットのメタデータ

metadata = {

"name": "顧客データセット",

"version": "1.0",

"created_at": "2026-01-16",

"features": [

{

"name": "age",

"type": "numeric",

"description": "年齢",

"min": 18,

"max": 80

},

{

"name": "income",

"type": "numeric",

"description": "年収(万円)"

},

{

"name": "category",

"type": "categorical",

"values": ["A", "B", "C"]

}

],

"target": "purchased",

"samples": 10000

}

# 保存

with open('dataset_metadata.json', 'w', encoding='utf-8') as f:

json.dump(metadata, f, ensure_ascii=False, indent=2)

モデル設定

# ハイパーパラメータ設定

config = {

"model_type": "random_forest",

"parameters": {

"n_estimators": 100,

"max_depth": 10,

"min_samples_split": 2,

"random_state": 42

},

"features": ["age", "income", "education"],

"target": "purchased",

"cv_folds": 5

}

# モデルトレーニング

from sklearn.ensemble import RandomForestClassifier

model = RandomForestClassifier(*config['parameters'])

実験結果の記録

import json

from datetime import datetime

# 実験結果

experiment = {

"id": "exp_001",

"timestamp": datetime.now().isoformat(),

"model": "RandomForest",

"parameters": {

"n_estimators": 100,

"max_depth": 10

},

"metrics": {

"accuracy": 0.85,

"precision": 0.82,

"recall": 0.88,

"f1_score": 0.85

},

"confusion_matrix": [[100, 20], [15, 165]],

"feature_importance": {

"age": 0.35,

"income": 0.45,

"education": 0.20

}

}

# 保存

with open(f'experiments/{experiment["id"]}.json', 'w') as f:

json.dump(experiment, f, indent=2)

時系列データ

JSON Lines形式

import pandas as pd

# JSON Lines (.jsonl) - 1行に1つのJSONオブジェクト

data = '''

{"timestamp": "2026-01-16T10:00:00", "value": 100, "sensor": "A"}

{"timestamp": "2026-01-16T10:01:00", "value": 105, "sensor": "A"}

{"timestamp": "2026-01-16T10:02:00", "value": 103, "sensor": "A"}

'''

# 読み込み

df = pd.read_json(data, lines=True)

df['timestamp'] = pd.to_datetime(df['timestamp'])

df = df.set_index('timestamp')

print(df)

ストリーミングデータ

import json

def process_streaming_data(file_path):

"""大きなJSONLファイルをストリーム処理"""

with open(file_path, 'r') as f:

for line in f:

data = json.loads(line)

# 各レコードを処理

yield process_record(data)

def process_record(record):

# データ処理ロジック

return {

'timestamp': record['timestamp'],

'processed_value': record['value'] 2

}

# 使用

for processed in process_streaming_data('sensor_data.jsonl'):

print(processed)

ビッグデータ処理

Daskでの並列処理

import dask.dataframe as dd

# 大きなJSONファイルをDaskで読み込み

ddf = dd.read_json('large_data.json', lines=True)

# 遅延実行

result = ddf.groupby('category')['value'].mean()

# 実際の計算

print(result.compute())

チャンク処理

import pandas as pd

def process_large_json(file_path, chunksize=1000):

"""JSONファイルをチャンクで処理"""

chunks = []

with open(file_path, 'r') as f:

chunk = []

for i, line in enumerate(f):

chunk.append(json.loads(line))

if (i + 1) % chunksize == 0:

df_chunk = pd.DataFrame(chunk)

# チャンクごとに処理

processed = process_chunk(df_chunk)

chunks.append(processed)

chunk = []

# 残りのデータ

if chunk:

df_chunk = pd.DataFrame(chunk)

processed = process_chunk(df_chunk)

chunks.append(processed)

return pd.concat(chunks, ignore_index=True)

データ可視化

PlotlyでのJSON使用

import plotly.graph_objects as go

import json

# 可視化設定をJSONとして保存

plot_config = {

"type": "scatter",

"x": [1, 2, 3, 4, 5],

"y": [10, 15, 13, 17, 20],

"mode": "lines+markers",

"name": "データ1"

}

# 設定から可視化作成

fig = go.Figure(data=[go.Scatter(plot_config)])

fig.show()

# 図をJSONとして保存

fig.write_json('plot.json')

データパイプライン

ETLパイプライン

import pandas as pd

import json

class DataPipeline:

def __init__(self, config_file):

with open(config_file, 'r') as f:

self.config = json.load(f)

def extract(self):

"""データ抽出"""

source = self.config['source']

if source['type'] == 'api':

return self.extract_from_api(source['url'])

elif source['type'] == 'file':

return pd.read_json(source['path'])

def transform(self, df):

"""データ変換"""

transforms = self.config['transforms']

for transform in transforms:

if transform['type'] == 'filter':

df = df[df[transform['column']] > transform['value']]

elif transform['type'] == 'rename':

df = df.rename(columns=transform['mapping'])

return df

def load(self, df):

"""データロード"""

destination = self.config['destination']

df.to_json(destination['path'], orient='records', indent=2)

def run(self):

"""パイプライン実行"""

df = self.extract()

df = self.transform(df)

self.load(df)

return df

# 設定ファイル (pipeline_config.json)

config = {

"source": {

"type": "api",

"url": "https://api.example.com/data"

},

"transforms": [

{"type": "filter", "column": "value", "value": 100},

{"type": "rename", "mapping": {"old_name": "new_name"}}

],

"destination": {

"path": "output.json"

}

}

# 使用

pipeline = DataPipeline('pipeline_config.json')

result = pipeline.run()

ベストプラクティス

1. 効率的な読み書き

# 良い - JSON Lines(大きなデータ向け)

df.to_json('data.jsonl', orient='records', lines=True)

# 悪い - 1つの大きな配列

df.to_json('data.json', orient='records')

2. メモリ管理

# チャンクで読み込み

for chunk in pd.read_json('large.jsonl', lines=True, chunksize=1000):

process(chunk)

3. データ型の指定

# 明示的にデータ型を指定

df = pd.read_json(

'data.json',

dtype={

'id': 'int64',

'value': 'float64',

'category': 'category'

}

)

4. エラーハンドリング

import json

def safe_load_json(file_path):

try:

with open(file_path, 'r', encoding='utf-8') as f:

return json.load(f)

except json.JSONDecodeError as e:

print(f'JSON解析エラー: {e}')

return None

except FileNotFoundError:

print(f'ファイルが見つかりません: {file_path}')

return None

パフォーマンス最適化

ujsonの使用

import ujson  # 高速JSONライブラリ

# 通常のjsonより高速

data = ujson.load(open('data.json'))

ujson.dump(data, open('output.json', 'w'))

orjsonの使用

import orjson  # 最速のJSONライブラリ

# 読み込み

with open('data.json', 'rb') as f:

data = orjson.loads(f.read())

# 書き込み

with open('output.json', 'wb') as f:

f.write(orjson.dumps(data))

まとめ

データサイエンスでのJSON活用

  • API統合 - requestsとPandas
  • データ変換 - json_normalizeでフラット化
  • 実験管理 - 設定と結果をJSON保存
  • ビッグデータ - JSON LinesとDask
  • パイプライン - JSON設定ファイル
  • 推奨ツール

    • pandas - データフレーム操作
    • json_normalize - ネスト解除
    • ujson/orjson - 高速処理
    • Dask** - ビッグデータ

    データサイエンスワークフローでJSONを効果的に活用しましょう!

    Share:

    関連記事

    Read in English