バッチ処理

コマンドシグネチャ

php artisan specific-viewpoint:process-batches {--log-creation-status=} {--dry-run}

目的

specific-viewpoint:process-batchesコマンドは、レビュー文をOpenAIに送信して事前定義された視点基準に対して分類するバッチジョブを作成およびディスパッチします。このコマンドは、アナライザーデータベースから保留中のバッチジョブを特定し、関連するレビュー文と視点定義を取得し、その後、構造化された分類リクエストをOpenAIのバッチ処理APIに送信します。このコマンドは、AI駆動の感情分析と視点カテゴリ化のために大量のレビューデータを効率的に処理することを保証します。

シーケンス図

ステップ1:コマンド初期化とジョブ取得

sequenceDiagram
    participant System
    participant ProcessCommand as specific-viewpoint:process-batches
    participant BatchJobRepo as BatchJobViewpointRepository
    participant AnalyzerDB[(gb_analyzer.batch_jobs_vps)]
    participant Logger
    participant Slack
    
    Note over System,Slack: コマンド初期化(5分毎)
    
    rect rgb(200, 255, 200)
    Note right of System: 正常ケース - コマンド起動
    System->>ProcessCommand: コマンド実行
    ProcessCommand->>Logger: パラメータ付きでコマンド開始をログ
    
    ProcessCommand->>BatchJobRepo: getPendingBatchJobs(log_creation_status)
    BatchJobRepo->>AnalyzerDB: WHERE log_creation_status = 1 (保留中)をクエリ
    AnalyzerDB-->>BatchJobRepo: 保留中のバッチジョブを返す
    BatchJobRepo-->>ProcessCommand: 保留中のバッチジョブコレクションを返す
    ProcessCommand->>Logger: 見つかったジョブ数をログ
    end
    
    rect rgb(255, 200, 200)
    Note right of System: エラーハンドリング
    rect rgb(255, 230, 230)
    alt データベース接続エラー
        BatchJobRepo->>Logger: データベース接続エラーをログ
        BatchJobRepo->>Slack: データベースエラー通知を送信
    else 保留中ジョブなし
        ProcessCommand->>Logger: 処理するジョブがないことをログ
    end
    end
    end

ステップ2:ジョブディスパッチとキュー管理

sequenceDiagram
    participant ProcessCommand as specific-viewpoint:process-batches
    participant BatchJob as CreateBatchPredictionJob
    participant QueueSystem as Laravel Queue
    participant Logger
    participant Slack
    
    Note over ProcessCommand,Slack: バッチジョブディスパッチプロセス
    
    rect rgb(200, 255, 200)
    Note right of ProcessCommand: 正常ケース - ジョブ処理
    
    rect rgb(200, 230, 255)
    Note right of ProcessCommand: ドライラン確認
    alt --dry-runフラグあり
        ProcessCommand->>Logger: ドライランモード有効化をログ
        ProcessCommand->>ProcessCommand: 実行なしでジョブディスパッチをシミュレート
        ProcessCommand->>Logger: シミュレートされた操作をログ
    else 通常実行
        Note right of ProcessCommand: 実際のジョブディスパッチ
        loop 各保留中バッチジョブに対して
            ProcessCommand->>QueueSystem: CreateBatchPredictionJob(batch_job)をディスパッチ
            QueueSystem->>BatchJob: 処理のためにジョブをキューに入れる
            ProcessCommand->>Logger: バッチジョブID付きでジョブディスパッチをログ
        end
    end
    end
    
    ProcessCommand->>Logger: 処理統計をログ
    ProcessCommand->>Slack: コマンド完了サマリーを送信
    end
    
    rect rgb(255, 200, 200)
    Note right of ProcessCommand: エラーハンドリング
    rect rgb(255, 230, 230)
    alt キューシステムエラー
        ProcessCommand->>Logger: キューディスパッチエラーをログ
        ProcessCommand->>Slack: キューエラー通知を送信
    else ジョブ作成エラー
        ProcessCommand->>Logger: ジョブ作成エラーをログ
        ProcessCommand->>Slack: ジョブ作成エラー通知を送信
    end
    end
    end

ステップ3:データ収集と準備

sequenceDiagram
    participant BatchJob as CreateBatchPredictionJob
    participant ReviewRepo as ReviewSentenceRepository
    participant ViewpointRepo as WishlistSpecificViewpointRepository
    participant AnalyzerDB[(gb_analyzer)]
    participant ConsoleDB[(gb_console)]
    participant Logger
    participant Slack
    
    Note over BatchJob,Slack: データ収集フェーズ
    
    rect rgb(200, 255, 200)
    Note right of BatchJob: 正常ケース - データ取得
    
    BatchJob->>Logger: バッチジョブ詳細付きでジョブ開始をログ
    
    rect rgb(200, 230, 255)
    Note right of BatchJob: レビュー文収集
    BatchJob->>ReviewRepo: getReviewSentences(product_ids, last_sentence_id)
    ReviewRepo->>AnalyzerDB: WHERE product_id IN (ids)でreview_sentencesをクエリ
    AnalyzerDB-->>ReviewRepo: レビュー文コレクションを返す
    ReviewRepo-->>BatchJob: メタデータ付きでレビュー文を返す
    BatchJob->>Logger: レビュー文数をログ
    end
    
    rect rgb(200, 230, 255)
    Note right of BatchJob: 視点定義収集
    BatchJob->>ViewpointRepo: getViewpointDefinitions(wl_spec_vp_ids)
    ViewpointRepo->>ConsoleDB: wl_cat_vp_details付きでwl_spec_vpsをクエリ
    ConsoleDB-->>ViewpointRepo: 視点定義を返す
    ViewpointRepo-->>BatchJob: カテゴリ付きで視点データを返す
    BatchJob->>Logger: 視点定義数をログ
    end
    
    rect rgb(255, 255, 200)
    Note right of BatchJob: データ準備
    BatchJob->>BatchJob: preparePromptsAndViewpoints()
    BatchJob->>BatchJob: validateDataCompleteness()
    BatchJob->>Logger: データ準備完了をログ
    end
    end
    
    rect rgb(255, 200, 200)
    Note right of BatchJob: エラーハンドリング
    rect rgb(255, 230, 230)
    alt レビューデータ不足
        BatchJob->>Logger: レビュー文不足をログ
        BatchJob->>BatchJob: バッチジョブ処理をスキップ
    else 視点定義なし
        BatchJob->>Logger: 視点データ不足をログ
        BatchJob->>Slack: 視点設定エラーを送信
    else データ検証エラー
        BatchJob->>Logger: データ検証失敗をログ
        BatchJob->>Slack: データ検証エラー通知を送信
    end
    end
    end

ステップ4:OpenAI API連携

sequenceDiagram
    participant BatchJob as CreateBatchPredictionJob
    participant ViewpointAPI as ViewpointClassifyProcessingInterface
    participant OpenAI as OpenAI Batch API
    participant Logger
    participant Slack
    
    Note over BatchJob,Slack: OpenAIバッチ処理統合
    
    rect rgb(200, 255, 200)
    Note right of BatchJob: 正常ケース - API通信
    
    rect rgb(200, 230, 255)
    Note right of BatchJob: バッチリクエスト準備
    BatchJob->>ViewpointAPI: prepareStructuredPrompts(sentences, viewpoints)
    ViewpointAPI->>ViewpointAPI: formatBatchRequest(prompts)
    ViewpointAPI->>ViewpointAPI: validateRequestStructure()
    ViewpointAPI-->>BatchJob: フォーマットされたバッチリクエストを返す
    BatchJob->>Logger: バッチリクエスト準備をログ
    end
    
    rect rgb(255, 255, 200)
    Note right of BatchJob: API送信
    BatchJob->>ViewpointAPI: submitBatchPredictionRequest(batch_request)
    ViewpointAPI->>OpenAI: POST /v1/batches(バッチ処理リクエスト)
    OpenAI-->>ViewpointAPI: batch_idとステータスを返す
    ViewpointAPI-->>BatchJob: OpenAI batch_idを返す
    BatchJob->>Logger: batch_id付きでAPI送信成功をログ
    end
    end
    
    rect rgb(255, 200, 200)
    Note right of BatchJob: エラーハンドリング
    rect rgb(255, 230, 230)
    alt API認証エラー
        ViewpointAPI->>Logger: 認証失敗をログ
        ViewpointAPI->>Slack: API認証エラーを送信
        ViewpointAPI->>BatchJob: 認証エラーを返す
    else レート制限エラー
        ViewpointAPI->>Logger: リトライ情報付きでレート制限をログ
        ViewpointAPI->>ViewpointAPI: 指数バックオフを実装
        ViewpointAPI->>OpenAI: 遅延後にリクエストを再試行
    else APIサービスエラー
        ViewpointAPI->>Logger: レスポンス付きでAPIサービスエラーをログ
        ViewpointAPI->>Slack: APIサービスエラー通知を送信
        ViewpointAPI->>BatchJob: サービスエラーを返す
    else リクエスト検証エラー
        ViewpointAPI->>Logger: リクエスト検証失敗をログ
        ViewpointAPI->>Slack: リクエスト形式エラーを送信
        ViewpointAPI->>BatchJob: 検証エラーを返す
    end
    end
    end

ステップ5:バッチログ作成とステータス更新

sequenceDiagram
    participant BatchJob as CreateBatchPredictionJob
    participant BatchLogRepo as BatchJobViewpointLogRepository
    participant BatchJobRepo as BatchJobViewpointRepository
    participant AnalyzerDB[(gb_analyzer)]
    participant Logger
    participant Slack
    
    Note over BatchJob,Slack: バッチログ管理
    
    rect rgb(200, 255, 200)
    Note right of BatchJob: 正常ケース - ログ作成
    
    rect rgb(200, 230, 255)
    Note right of BatchJob: バッチログ作成
    BatchJob->>BatchLogRepo: createBatchLog(batch_id, sentences_count, metadata)
    BatchLogRepo->>AnalyzerDB: INSERT INTO batch_jobs_vp_logs
    AnalyzerDB-->>BatchLogRepo: ログレコードIDを返す
    BatchLogRepo-->>BatchJob: ID付きでログ作成を確認
    BatchJob->>Logger: バッチログ作成成功をログ
    end
    
    rect rgb(200, 230, 255)
    Note right of BatchJob: ステータス更新
    BatchJob->>BatchJobRepo: updateLogCreationStatus(batch_job_id, PROCESSING)
    BatchJobRepo->>AnalyzerDB: UPDATE batch_jobs_vps SET log_creation_status = 2
    AnalyzerDB-->>BatchJobRepo: ステータス更新を確認
    BatchJobRepo-->>BatchJob: 更新確認を返す
    BatchJob->>Logger: ステータス更新完了をログ
    end
    
    rect rgb(230, 200, 255)
    Note right of BatchJob: 成功監視
    BatchJob->>Logger: 統計付きでバッチ作成成功をログ
    BatchJob->>Slack: 詳細付きでバッチ作成通知を送信
    end
    end
    
    rect rgb(255, 200, 200)
    Note right of BatchJob: エラーハンドリング
    rect rgb(255, 230, 230)
    alt データベーストランザクションエラー
        BatchLogRepo->>Logger: データベーストランザクションエラーをログ
        BatchLogRepo->>AnalyzerDB: トランザクションをROLLBACK
        BatchLogRepo->>Slack: データベースエラー通知を送信
        BatchLogRepo->>BatchJob: トランザクションエラーを返す
    else ログ作成失敗
        BatchJob->>Logger: バッチ詳細付きでログ作成失敗をログ
        BatchJob->>BatchJobRepo: バッチジョブを失敗としてマーク(status = 9)
        BatchJob->>Slack: ログ作成失敗通知を送信
    else ステータス更新失敗
        BatchJob->>Logger: ステータス更新失敗をログ
        BatchJob->>Slack: ステータス更新エラー通知を送信
    end
    end
    end

ステップ6:ジョブ完了と監視

sequenceDiagram
    participant BatchJob as CreateBatchPredictionJob
    participant ProcessCommand as specific-viewpoint:process-batches
    participant QueueSystem as Laravel Queue
    participant Logger
    participant Slack
    
    Note over BatchJob,Slack: ジョブ完了とシステム監視
    
    rect rgb(200, 255, 200)
    Note right of BatchJob: 正常ケース - 正常完了
    
    rect rgb(230, 200, 255)
    Note right of BatchJob: ジョブ最終化
    BatchJob->>Logger: 処理メトリクス付きでジョブ完了をログ
    BatchJob->>Logger: OpenAIバッチ送信統計をログ
    BatchJob->>QueueSystem: ジョブを完了としてマーク
    QueueSystem-->>ProcessCommand: ジョブ完了ステータスを更新
    end
    
    rect rgb(230, 200, 255)
    Note right of ProcessCommand: コマンドサマリー
    ProcessCommand->>Logger: 全体統計付きでコマンド完了をログ
    ProcessCommand->>Logger: 処理されたジョブの合計と成功率をログ
    ProcessCommand->>Slack: メトリクス付きでコマンド完了サマリーを送信
    end
    end
    
    rect rgb(255, 200, 200)
    Note right of BatchJob: エラーハンドリングと回復
    rect rgb(255, 230, 230)
    alt ジョブ処理失敗
        BatchJob->>Logger: エラー詳細付きでジョブ失敗をログ
        BatchJob->>QueueSystem: 再試行のためにジョブを失敗としてマーク
        BatchJob->>Slack: ジョブ失敗通知を送信
    else メモリ/タイムアウト問題
        BatchJob->>Logger: リソース制限エラーをログ
        BatchJob->>QueueSystem: 遅延付きで再試行のためにジョブをリリース
        BatchJob->>Slack: リソースエラー通知を送信
    else 重大なシステムエラー
        BatchJob->>Logger: 重大なシステムエラーをログ
        BatchJob->>Slack: 重大なエラーアラートを送信
        BatchJob->>ProcessCommand: 以降の処理を停止
    end
    end
    end

詳細

パラメータ

  • {--log-creation-status=}: ログ作成ステータスによってバッチジョブをフィルタリングするためのオプションパラメータ

    • 1: 保留中(指定されない場合のデフォルト)
    • 2: 処理中
    • 3: 完了
    • 9: 失敗
    • 省略された場合、保留中ステータスのジョブの処理がデフォルト
    • 選択的処理と特定のジョブ状態のデバッグに使用
  • {--dry-run}: 実際にジョブをディスパッチせずに実行するためのオプションフラグ

    • 提供された場合、コマンドは実際のバッチジョブを作成せずにプロセスをシミュレート
    • ジョブ選択ロジックのテストと検証に役立つ
    • 副作用なしで実行される操作をすべてログ記録

頻度

  • スケジュール実行: 5分毎
    • Laravelのスケジューラを使用してroutes/console.phpで設定
    • 例: Schedule::command('specific-viewpoint:process-batches')->everyFiveMinutes();
  • 手動実行: 特定のログ作成ステータス用に手動でトリガー可能
    • デバッグや特定のジョブ状態の処理に使用

依存関係

データベース依存関係:

  • バッチジョブとレビュー文用のアナライザーデータベース(gb_analyzer)接続
  • 視点定義用のコンソールデータベース(gb_console)接続
  • 指定された商品の処理に十分なレビュー文データ
  • コンソールデータベースで適切に定義された視点カテゴリ

外部サービス依存関係:

  • OpenAI APIサービスの可用性と認証
  • 環境変数で設定された有効なAPI認証情報
  • OpenAIアカウントで有効化されたバッチ処理機能
  • バッチ操作とレート制限準拠のための十分なAPIクォータ

システム依存関係:

  • ジョブ処理用のキューシステム設定(config/queue.php
  • 一時的なバッチ処理ファイル用のストレージ
  • 処理制限のためのconfig/specific_viewpoint.phpの設定
  • 適切に設定されたエラー処理と通知システム

出力

テーブル

プロセスバッチコマンドは複数のデータベーステーブルと相互作用します。完全なテーブル構造、フィールド定義、および関係については、データベーススキーマセクションを参照してください。

主要出力テーブル:

  • batch_jobs_vp_logs: OpenAIバッチIDを含む新しいログレコードを作成
  • batch_jobs_vps: 処理されたジョブのログ作成ステータスを更新

コマンド固有の操作:

  • 作成: OpenAIバッチIDと文数を含むbatch_jobs_vp_logsに新しいレコードを作成
  • 更新: batch_jobs_vpsのログ作成ステータスを保留中から処理中に更新
  • 読み取り: review_sentencesからレビュー文とwl_spec_vpsから視点定義を読み取り

サービス

OpenAI統合:

  • ViewpointClassifyProcessingInterface: OpenAIバッチ処理APIと通信
  • レビュー文と視点定義を含む構造化プロンプトを準備
  • バッチリクエストを送信しAPIレスポンス検証を処理
  • レート制限のための指数バックオフを含むリトライロジックを実装

リポジトリサービス:

  • BatchJobViewpointRepositoryInterface: フィルタリングによる保留中バッチジョブの取得
  • ReviewSentenceRepositoryInterface: 商品IDによるレビュー文の取得
  • WishlistSpecificViewpointRepositoryInterface: 視点定義の読み込み
  • BatchJobViewpointLogRepositoryInterface: バッチ処理ログの作成

バックグラウンドジョブサービス:

  • CreateBatchPredictionJob: 処理を処理するために各バッチジョブにディスパッチ
  • 非同期バッチ作成のためのキュー管理
  • 自動リトライメカニズムによるジョブ失敗処理

エラーハンドリング

ログ

システムはバッチ作成の問題をトラブルシューティングするための包括的なログを生成します:

バッチ作成エラー:

  • 完全なリクエスト/レスポンス詳細を含むOpenAI API通信の失敗
  • 特定のレビュー文と視点情報を含むデータ検証エラー
  • ロールバック詳細と影響を受けたレコードを含むデータベーストランザクションの失敗
  • 不足パラメータ情報と提案を含む設定エラー

ジョブ処理エラー:

  • 商品ID詳細と可用性チェックを含む不十分なレビュー文データ
  • 特定の視点ID検証による無効な視点設定
  • ジョブディスパッチ詳細とリトライ情報を含むキューシステムの失敗
  • 大規模バッチ処理中のメモリやタイムアウトの問題

ログの場所:

  • コンテキスト的なバッチジョブ情報を持つアプリケーションログ: storage/logs/laravel.log
  • 実行統計と処理メトリクスを持つコマンド固有のログ
  • 完全なスタックトレースとOpenAI APIリクエスト/レスポンス詳細を持つエラーログ

Slack

自動Slack通知は運用監視のためにAIBatchPredictSlackChannelを介して送信されます:

成功通知:

  • 処理統計とタイミングを持つバッチ作成完了
  • 成功/失敗カウントとパフォーマンスメトリクスを持つジョブディスパッチサマリー
  • バッチカウントと文ボリュームを含むOpenAI API使用統計

エラー通知:

  • エラーコードと提案された解決手順を持つOpenAI API通信の失敗
  • 影響を受けたバッチジョブ詳細とロールバック情報を持つデータベース操作の失敗
  • 解決提案とドキュメントリンクを含む設定の問題
  • 即時管理者注意を必要とする重大なシステムエラー

通知形式:

  • 運用追跡用のコマンド名と実行タイムスタンプ
  • インシデント優先順位付けのためのエラータイプと重大度レベル
  • 調査用の影響を受けたバッチジョブIDと商品詳細
  • 提案されたトラブルシューティング手順とドキュメント参照

トラブルシューティング

データを確認

保留中バッチジョブの検証:

-- ログ作成を待っているバッチジョブを確認
SELECT id, product_ids, wl_spec_vp_ids, wl_cat_vp_detail_id, 
       last_review_sentence_id, status, log_creation_status, created_at
FROM batch_jobs_vps
WHERE log_creation_status = 1 -- 保留中
ORDER BY created_at ASC;

レビュー文の可用性を検証:

-- 特定の商品のレビュー文を確認
SELECT COUNT(*) as sentence_count, product_id
FROM review_sentences
WHERE product_id IN (SELECT JSON_UNQUOTE(JSON_EXTRACT(product_ids, '$[0]')) FROM batch_jobs_vps WHERE id = BATCH_JOB_ID)
GROUP BY product_id;

視点定義を確認:

-- 視点定義が存在することを確認
SELECT wsvp.id, wsvp.name, wsvp.description, wcvd.detail
FROM wl_spec_vps wsvp
JOIN wl_cat_vp_details wcvd ON wsvp.wl_cat_vp_detail_id = wcvd.id
WHERE wsvp.id IN (SELECT JSON_UNQUOTE(JSON_EXTRACT(wl_spec_vp_ids, '$[0]')) FROM batch_jobs_vps WHERE id = BATCH_JOB_ID);

ログを確認

アプリケーションログ:

# 最近のprocess-batchesコマンドログを確認
tail -f storage/logs/laravel.log | grep -E "specific-viewpoint:process-batches"

# バッチジョブ作成ログを確認
grep "CreateBatchPredictionJob" storage/logs/laravel.log | tail -20

# OpenAI API通信ログを確認
grep "ViewpointClassifyProcessingInterface" storage/logs/laravel.log | tail -10

# エラーパターンを確認
grep -E "(ERROR|CRITICAL)" storage/logs/laravel.log | grep "process-batches" | tail -10

データベースログ:

-- 最近のバッチログ作成を確認
SELECT bjvl.*, bjv.product_ids
FROM batch_jobs_vp_logs bjvl
JOIN batch_jobs_vps bjv ON bjvl.batch_job_id = bjv.id
WHERE bjvl.created_at > DATE_SUB(NOW(), INTERVAL 1 HOUR)
ORDER BY bjvl.created_at DESC;

-- 失敗したバッチジョブを確認
SELECT id, product_ids, wl_spec_vp_ids, status, log_creation_status, updated_at
FROM batch_jobs_vps
WHERE log_creation_status = 9 -- 失敗
AND updated_at > DATE_SUB(NOW(), INTERVAL 1 DAY);

OpenAI API検証:

# OpenAI API接続をテスト
curl -X GET "https://api.openai.com/v1/batches" \
  -H "Authorization: Bearer YOUR_OPENAI_API_KEY"

パフォーマンス監視:

  • パフォーマンス低下について命令実行時間を監視
  • ジョブディスパッチ効率についてキュー処理パフォーマンスを確認
  • OpenAI APIレスポンス時間とタイムアウト設定を検証
  • 最適化機会のためにバッチ準備フェーズ中のメモリ使用量を確認