BigQuery定期同期

コマンドシグネチャ

php artisan gcp:sync-products [--items-per-page=] [--all] [--missed]
php artisan gcp:sync-reviews [--items-per-page=] [--all] [--missed]
php artisan gcp:sync-review-sentences [--items-per-page=] [--all] [--missed]

目的

これらのコマンドは、BigQueryからローカルデータベースに商品データ、レビューデータ、レビュー文データを同期します。定期的なスケジュールで実行され、ローカルデータベースが分析と表示のためにBigQueryからの最新情報を含むことを保証します。

シーケンス図

商品同期

sequenceDiagram
    participant System
    participant Products as gcp:sync-products
    participant BigQuery
    participant ProductsTable as products table
    participant ProductDetailsTable as product_details table
    participant Redis
    
    Note over System,Redis: 商品同期フロー
    
    rect rgb(191, 223, 255)
    Note right of System: 30分ごと
    System->>Products: 実行
    Products->>BigQuery: 商品データ取得(最新レコード)
    BigQuery-->>Products: 商品・商品詳細データを返す
    Products->>ProductsTable: 商品レコードの挿入/更新
    Products->>ProductDetailsTable: 商品詳細レコードの挿入/更新
    Products->>Redis: 状態更新用の商品IDを保存
    end

レビュー同期

sequenceDiagram
    participant System
    participant Reviews as gcp:sync-reviews
    participant BigQuery
    participant ReviewsTable as reviews table
    participant Redis
    
    Note over System,Redis: レビュー同期フロー
    
    rect rgb(191, 223, 255)
    Note right of System: 30分ごと
    System->>Reviews: 実行
    Reviews->>BigQuery: レビューデータ取得(最新レコード)
    BigQuery-->>Reviews: レビューデータを返す
    Reviews->>ReviewsTable: レビューレコードの挿入/更新
    Reviews->>Redis: 状態更新用のレビューIDを保存
    end

レビュー文同期

sequenceDiagram
    participant System
    participant Sentences as gcp:sync-review-sentences
    participant BigQuery
    participant ReviewSentencesTable as review_sentences table
    participant Redis
    
    Note over System,Redis: レビュー文同期フロー
    
    rect rgb(191, 223, 255)
    Note right of System: 30分ごと
    System->>Sentences: 実行
    Sentences->>BigQuery: レビュー文データ取得(最新レコード)
    BigQuery-->>Sentences: レビュー文データを返す
    Sentences->>ReviewSentencesTable: レビュー文レコードの挿入/更新
    Sentences->>Redis: 状態更新用の文IDを保存
    end

実装詳細

コマンド構造

すべてのコマンドはBaseBigQuerySyncCommandを継承し、以下を提供します:

  1. 最新レコードの条件付きクエリ構築
  2. バッチ処理のためのデータチャンク化
  3. エラーハンドリングとログ記録
  4. 状態更新のためのRedis追跡

パラメータ

  • --items-per-page: バッチごとに処理するレコード数(デフォルト: 500)
  • --all: 時間制約なしですべてのレコードを処理
  • --missed: null状態のレコードのみを処理

頻度

30分ごと

依存関係

  • Google Cloud Platformアクセス認証情報
  • BigQueryプロジェクトとデータセット設定
  • 処理済みID追跡のためのRedis
  • ローカルデータベース接続
  • バックグラウンド処理のためのキューシステム

出力

データベーススキーマ

erDiagram
    products {
        bigint id PK
        integer mall_id "モール識別子"
        string mall_product_id "モールシステムの商品ID"
        string jan_code "JANコード"
        string input_type "データソース"
        string unique_key "システム生成の一意キー"
        timestamp crawl_created_at "クローラーからのタイムスタンプ"
        timestamp bq_created_at "BigQueryからのタイムスタンプ"
        timestamp created_at
        timestamp updated_at
    }
    
    product_details {
        bigint id PK
        bigint product_id FK
        string image "商品画像URL"
        string product_url "商品ページURL"
        string title "商品タイトル"
        double rating "平均評価"
        integer num_reviews "レビュー数"
        string mall_shop_name "モールのショップ名"
        string maker_name "メーカー名"
        double base_price "基本価格"
        double shipping_fee "送料"
        double price "計算された最終価格"
        integer sales_one_month "月間売上"
        integer point "ポイント"
        json coupon "クーポン情報"
        string ranking_description "ランキング説明"
        string unique_key "システム生成の一意キー"
        timestamp crawl_created_at "クローラーからのタイムスタンプ"
        timestamp bq_created_at "BigQueryからのタイムスタンプ"
        timestamp created_at
        timestamp updated_at
    }
    
    reviews {
        bigint id PK
        integer mall_id "モール識別子"
        integer review_count "ユーザーによるレビュー数"
        integer vote "役立つ票"
        tinyint type "レビュータイプ (1=非購入者, 2=購入者, 3=Vine)"
        string variant "バリアントオプション"
        string age "購入者の年齢"
        string gender "購入者の性別"
        text content "レビューコンテンツ"
        string mall_product_id "モールシステムの商品ID"
        string jan_code "JANコード"
        double rating "数値評価"
        string shop_name "ショップ名"
        date post_date "レビュー投稿日"
        bigint crawl_review_id "クローラーからの一意レビューID"
        timestamp crawl_created_at "クローラーからのタイムスタンプ"
        timestamp bq_created_at "BigQueryからのタイムスタンプ"
        timestamp created_at
        timestamp updated_at
    }
    
    review_sentences {
        bigint id PK
        bigint crawl_review_id "関連レビューID"
        bigint sentence_id "一意文ID"
        string mall_product_id "モールシステムの商品ID"
        string jan_code "JANコード"
        longtext content "個別文コンテンツ"
        double sentiment_score "感情分析スコア"
        date post_date "レビュー投稿日"
        timestamp crawl_created_at "クローラーからのタイムスタンプ"
        timestamp bq_created_at "BigQueryからのタイムスタンプ"
        timestamp created_at
        timestamp updated_at
    }
    
    products ||--o{ product_details : "has many"
    reviews ||--o{ review_sentences : "contains"

処理フロー

  1. コマンドがスケジュールで実行される
  2. 適切な条件でBigQueryからデータが取得される
  3. データはバッチにチャンク化され、キュージョブで処理される
  4. 処理済みIDは状態追跡のためにRedisに保存される
  5. 状態更新コマンドが定期的にBigQuery状態を更新する

エラーハンドリング

ログ

  • ジョブ数とレコード数を含む詳細な開始/終了ログ
  • スタックトレース付きのエラーメッセージ
  • 監視のためのパフォーマンスメトリクス

Slack通知

  • 同期中の重要なエラーはSlackチャンネルに報告される
  • 要約統計付きの成功通知

トラブルシューティング

一般的な問題

  1. データ欠損: Redisキューと状態更新コマンドを確認
  2. 同期の遅延: バッチサイズとデータベースインデックスを検証
  3. クエリエラー: BigQueryテーブルスキーマが期待される形式と一致することを検証
  4. キューバックログ: キューワーカーを監視し、必要に応じて容量を増加

検証手順

  1. 正常なジョブ完了のログを確認
  2. Redisに状態更新待ちのIDが含まれていることを確認
  3. ローカルデータベースの更新されたレコードを確認
  4. 更新された状態フラグのBigQueryを監視