BigQuery同期概要

説明

BigQuery同期コンポーネントは、Google BigQueryからgb_analyzerデータベースへのデータ同期を担当します。このコンポーネントは、商品データ、レビュー、レビュー文がローカルデータベースでBigQueryのデータと一致するように定期的に更新されることを保証します。システムは、データ抽出にBaseBigQuerySyncCommandを使用し、効率性のためのチャンク化ジョブ処理と、データ整合性を確保するための状態追跡を使用します。

システム概要図

---
config:
  theme: base
  layout: dagre
  flowchart:
    curve: linear
    htmlLabels: true
  themeVariables:
    edgeLabelBackground: "transparent"
---
flowchart TD
    subgraph BigQueryTables["BigQueryソーステーブル"]
        BQProduct[(products)]
        BQReview[(reviews)]
        BQReviewSentence[(reviews_sentences)]
    end
    
    Commands[BaseBigQuerySyncCommand]
    Jobs[BigQuery同期ジョブ]
    Redis[(Redisキャッシュ)]
    
    subgraph AnalyzerTables["gb_analyzerテーブル"]
        Products[(products)]
        ProductDetails[(product_details)]
        Reviews[(reviews)]
        ReviewSentences[(review_sentences)]
    end
    
    StatusUpdate[UpdateBigQueryStatus]
    
    BigQueryTables --- Step1[
        <div style='text-align: center'>
            <span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>1</span>
            <p style='margin-top: 8px'>データ抽出</p>
        </div>
    ]
    Step1 --> Commands
    
    Commands --- Step2[
        <div style='text-align: center'>
            <span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>2</span>
            <p style='margin-top: 8px'>ジョブ作成</p>
        </div>
    ]
    Step2 --> Jobs
    
    Jobs --- Step3[
        <div style='text-align: center'>
            <span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>3</span>
            <p style='margin-top: 8px'>データ保存</p>
        </div>
    ]
    Step3 --> AnalyzerTables
    
    Jobs --- Step4[
        <div style='text-align: center'>
            <span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>4</span>
            <p style='margin-top: 8px'>ID追跡</p>
        </div>
    ]
    Step4 --> Redis
    
    Redis --- Step5[
        <div style='text-align: center'>
            <span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>5</span>
            <p style='margin-top: 8px'>状態更新</p>
        </div>
    ]
    Step5 --> StatusUpdate
    
    StatusUpdate --- Step6[
        <div style='text-align: center'>
            <span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>6</span>
            <p style='margin-top: 8px'>同期完了マーク</p>
        </div>
    ]
    Step6 --> BigQueryTables
    
    %% Style definitions
    style BigQueryTables fill:#d2e3fc,stroke:#4285f4,stroke-width:2px
    style AnalyzerTables fill:#d9d9f2,stroke:#6666cc,stroke-width:2px
    style Commands fill:#fcd9d9,stroke:#cc6666,stroke-width:2px
    style Jobs fill:#fcd9d9,stroke:#cc6666,stroke-width:2px
    style Redis fill:#f2d9f2,stroke:#cc66cc,stroke-width:2px
    style StatusUpdate fill:#d9f2f2,stroke:#66cccc,stroke-width:2px
    style Step1 fill:transparent,stroke:transparent,stroke-width:1px
    style Step2 fill:transparent,stroke:transparent,stroke-width:1px
    style Step3 fill:transparent,stroke:transparent,stroke-width:1px
    style Step4 fill:transparent,stroke:transparent,stroke-width:1px
    style Step5 fill:transparent,stroke:transparent,stroke-width:1px
    style Step6 fill:transparent,stroke:transparent,stroke-width:1px

詳細データフロー依存関係

BigQuery同期コンポーネントは、Google BigQueryからローカルgb_analyzerデータベースへの信頼性の高いデータ転送を確保する洗練された6ステップのデータ同期パイプラインを調整します。このコンポーネントは、組み込みのエラーハンドリングと監視機能を備えたデータ抽出、変換、状態追跡の完全なライフサイクルを管理します。

データフローアーキテクチャ

BigQuery同期システムは、各ステップが前のステップの正常完了に明示的に依存する慎重に調整されたシーケンスを通じて動作します。アーキテクチャは、Redisベースの追跡、非同期ジョブ処理、包括的な状態管理を通じてデータ整合性を確保します。

主要コンポーネント:

  • BaseBigQuerySyncCommand: データ抽出のためのコアコマンドフレームワーク
  • 専門同期コマンド: SyncProduct, SyncReview, SyncReviewSentence
  • キュージョブシステム: SyncProductData, SyncReviewData, SyncReviewSentenceDataによる非同期データ処理
  • Redis追跡: 処理されたレコードIDの一時保存
  • 状態管理: 双方向同期確認のためのUpdateBigQueryStatusコマンド

ステップ1: BigQueryからのデータ抽出

フロー: BigQueryソーステーブル → BaseBigQuerySyncCommand

同期プロセスは、BaseBigQuerySyncCommandを継承する専門コマンドクラスの実行から始まります。各コマンドは特定のBigQueryテーブルをターゲットとし、データ取得を最適化するためのインテリジェントフィルタリングを適用します:

コマンド実行:

  • gcp:sync-productsproductsをクエリ(GCP_BQ_TB_PRODUCT環境変数で設定)
  • gcp:sync-reviewsreviewsをクエリ(GCP_BQ_TB_REVIEW環境変数で設定)
  • gcp:sync-review_sentencesreviews_sentencesをクエリ(GCP_BQ_TB_REVIEW_SENTENCE環境変数で設定)

フィルタリングロジック:

  • 定期同期(30分ごと): タイムスタンプベースのフィルタリングを使用して最後の同期間隔内に作成されたレコードを取得
  • 欠損データ同期(--missedフラグ付きで日次): 失敗した同期を復旧するためにstatus IS NULLまたはstatus = 0のレコードをターゲット
  • バッチ処理: 設定可能なバッチサイズ(デフォルト: 500レコード)でメモリオーバーフローを防ぎ、BigQuery API使用を最適化

データ検証:

  • スキーマ検証により、BigQueryデータが期待される形式と一致することを確保
  • オプションフィールドのNull値処理
  • ローカルデータベース保存のためのデータ型変換準備

ステップ2: ジョブ作成とキューイング

フロー: BaseBigQuerySyncCommand → 専門キュージョブ

抽出されたBigQueryデータは前処理を受け、非同期処理のための管理可能なチャンクに分割されます。このステップは、システムの応答性を確保し、大規模データセットの並列処理を可能にします:

ジョブディスパッチ:

  • SyncProductDataジョブ: productsデータからproductsproduct_detailsテーブルの両方の同期を処理
  • SyncReviewDataジョブ: reviewsデータからレビューコンテンツ、評価、メタデータを処理
  • SyncReviewSentenceDataジョブ: reviews_sentencesデータから感情分析データと文抽出を管理

キュー管理:

  • ジョブは非同期実行のためにLaravelのキューシステムにディスパッチされる
  • 設定可能なキューワーカーが再試行メカニズムでジョブ処理を処理
  • ジョブ優先順位付けにより、重要なデータタイプが最初に処理される
  • メモリ管理により、大規模データ処理中のワーカー枯渇を防ぐ

エラーハンドリング:

  • 失敗したジョブは指数バックオフで自動的に再試行される
  • デッドレターキューは手動レビューのために永続的に失敗したジョブをキャプチャ
  • 包括的なログ記録により、ジョブ実行状態とパフォーマンスメトリクスを追跡

ステップ3: データ変換と保存

フロー: キュージョブ → gb_analyzerデータベーステーブル

キュージョブは、BigQueryデータ形式をローカルデータベーススキーマに変換するための洗練されたデータ変換ロジックを実行します。このステップは、データ正規化と関係確立のコアビジネスロジックを処理します:

商品データ処理:

  • プライマリテーブル(products: mall_id、jan_code、input_type、unique_key生成を含むコア商品情報
  • 詳細テーブル(product_details: 価格、評価、ショップ情報、売上データを含む拡張商品メタデータ
  • 関係確立: productsとproduct_detailsテーブル間の外部キー関係

レビューデータ処理:

  • コンテンツ処理: レビューテキストの正規化とエンコーディング処理
  • メタデータ抽出: ユーザー人口統計、購入確認、レビュー分類
  • 評価正規化: 異なるモールシステム間での評価スケールの標準化

レビュー文データ処理:

  • 文分割: レビューコンテンツからの個別文抽出
  • 感情分析: 感情スコアと分類の処理
  • コンテンツインデックス: 検索と分析操作の準備

データ整合性対策:

  • 一意キー制約を使用した重複検出と処理
  • 外部キー検証により参照整合性を確保
  • トランザクション管理により失敗した操作のロールバック機能を提供

ステップ4: RedisでのID追跡

フロー: キュージョブ → Redisキャッシュ

正常に処理されたレコードIDは、BigQueryへの信頼性の高い状態更新を可能にするためにRedisで体系的に追跡されます。この並列追跡システムは、システム障害に対する回復力を提供し、処理されたレコードが失われないことを保証します:

Redis保存構造:

  • キー形式: bigquery_sync:{data_type}:{batch_id}
  • 値コンテンツ: メタデータ付きの処理されたレコードIDを含むJSON配列
  • 有効期限: 設定可能なTTL(デフォルト: 24時間)でメモリ膨張を防ぐ

追跡メタデータ:

  • BigQueryソーステーブルからのレコードID
  • データタイプ識別子(product、review、review_sentence)
  • 監査証跡のための処理タイムスタンプ
  • グループ化された操作のためのバッチ識別子
  • 各レコードの成功/失敗状態

信頼性機能:

  • アトミック操作により一貫したRedis状態を確保
  • バックアップメカニズムによりRedis障害を処理
  • 監視アラートによりRedisメモリ使用量とパフォーマンスを追跡

ステップ5: 状態更新処理

フロー: Redisキャッシュ → UpdateBigQueryStatusコマンド

UpdateBigQueryStatusコマンドは5分スケジュールで動作し、Redisから蓄積されたレコードIDを処理し、対応するBigQueryソーステーブルを更新します。このステップは、BigQueryが現在の同期状態を反映することを確保するフィードバックループを完成させます:

バッチ処理ロジック:

  • Redisから処理されたレコードIDのバッチを取得(設定可能なバッチサイズ: 1000レコード)
  • 最適化されたBigQuery更新操作のためにデータタイプ別にレコードをグループ化
  • BigQuery APIレート制限のための指数バックオフを実装

更新操作:

  • 状態フラグ更新: 正常に同期されたレコードにstatus = 1を設定
  • タイムスタンプ更新: 監査目的で処理完了時間を記録
  • バッチ最適化: 効率的なバッチ更新によりBigQuery API呼び出しを最小化

エラー復旧:

  • 失敗した状態更新は後続のコマンド実行で再試行される
  • 持続的な失敗はアラート通知をトリガー
  • Redis エントリは正常なBigQuery更新が確認されるまで保持される

ステップ6: 同期完了

フロー: UpdateBigQueryStatusコマンド → BigQueryソーステーブル

最終ステップは、BigQueryソーステーブルを更新し、一時追跡データをクリーンアップすることで同期サイクルの完了をマークします。このステップは、システムの清潔性を確保し、次の同期サイクルの準備をします:

BigQuery更新:

  • 状態マーキング: レコードは正常な同期を示すstatus = 1でマークされる
  • 監査証跡: 更新タイムスタンプは同期履歴を提供
  • クエリ最適化: 状態フラグにより将来の同期操作の効率的なフィルタリングが可能

クリーンアップ操作:

  • Redisクリーンアップ: 正常に処理されたレコードIDがRedisキャッシュから削除される
  • メモリ管理: 時間の経過とともにRedisメモリの蓄積を防ぐ
  • ログ生成: 運用監視とトラブルシューティングのための包括的なログ記録

監視とアラート:

  • 成功メトリクスはパフォーマンス監視のために記録される
  • 失敗通知は設定されたSlackチャンネルに送信される
  • システムヘルスダッシュボードは同期状態とパフォーマンスを反映

重要なシステム依存関係

順次処理要件:

  • 各ステップは次のステップが開始される前に正常に完了する必要がある
  • 失敗したステップは包括的なエラーハンドリングと再試行メカニズムをトリガー
  • データ一貫性はトランザクション管理とロールバック機能により維持される

リソース管理:

  • キューワーカー: ジョブ処理のために十分なワーカー容量を維持する必要がある
  • Redisメモリ: 追跡データ損失を防ぐために適切なメモリ割り当てが必要
  • BigQueryクォータ: APIレート制限とクエリクォータを監視・管理する必要がある

データ整合性保護:

  • 重複防止: 一意制約とupsert操作によりデータ重複を防ぐ
  • 参照整合性: 外部キー制約により関係の一貫性を維持
  • トランザクション境界: アトミック操作により障害時のデータ一貫性を確保

監視と可観測性:

  • パフォーマンスメトリクス: 処理時間、スループット率、エラー頻度
  • システムヘルス: キュー深度、Redisメモリ使用量、BigQueryクォータ消費
  • アラートシステム: 障害、パフォーマンス低下、容量問題のリアルタイム通知

頻度概要

タイムライン

timeline
    title BigQuery同期スケジュール
    section 定期操作
        30分ごと<br>(例: 08.00) : gcp sync-products
                                         : gcp sync-reviews
                                         : gcp sync-review_sentences
    section 状態更新
        5分ごと<br>(例: 08.05) : update bigquery-status
    section ...
	
    section 日次操作
        日次<br>(例: 00.00) : gcp sync-products --missed
                              : gcp sync-reviews --missed
                              : gcp sync-review_sentences --missed

データベーススキーマ

erDiagram
    products {
        bigint id PK
        integer mall_id "モール識別子"
        string mall_product_id "モールシステムの商品ID"
        string jan_code "JANコード"
        string input_type "データソース"
        string unique_key "システム生成の一意キー"
        string ranking_description "ランキング説明"
        timestamp crawl_created_at "クローラーからのタイムスタンプ"
        timestamp bq_created_at "BigQueryからのタイムスタンプ"
        timestamp created_at
        timestamp updated_at
    }
    
    product_details {
        bigint id PK
        bigint product_id FK
        string image "商品画像URL"
        string product_url "商品ページURL"
        string title "商品タイトル"
        double rating "平均評価"
        integer num_reviews "レビュー数"
        string mall_shop_name "モールのショップ名"
        string mall_shop_id "モールのショップID"
        string maker_name "メーカー名"
        double base_price "基本価格"
        double shipping_fee "送料"
        double price "計算された最終価格"
        integer sales_one_month "月間売上"
        integer point "ポイント"
        json coupon "クーポン情報"
        string ranking_description "ランキング説明"
        string unique_key "システム生成の一意キー"
        timestamp crawl_created_at "クローラーからのタイムスタンプ"
        timestamp bq_created_at "BigQueryからのタイムスタンプ"
        timestamp created_at
        timestamp updated_at
    }
    
    reviews {
        bigint id PK
        integer mall_id "モール識別子"
        integer review_count "ユーザーによるレビュー数"
        integer vote "役立つ票"
        tinyint type "レビュータイプ (1=非購入者, 2=購入者, 3=Vine)"
        string variant "バリアントオプション"
        string age "購入者の年齢"
        string gender "購入者の性別"
        text content "レビューコンテンツ"
        string mall_product_id "モールシステムの商品ID"
        string jan_code "JANコード"
        double rating "数値評価"
        string shop_name "ショップ名"
        date post_date "レビュー投稿日"
        bigint crawl_review_id "クローラーからの一意レビューID"
        timestamp crawl_created_at "クローラーからのタイムスタンプ"
        timestamp bq_created_at "BigQueryからのタイムスタンプ"
        timestamp created_at
        timestamp updated_at
    }
    
    review_sentences {
        bigint id PK
        bigint crawl_review_id "関連レビューID"
        bigint sentence_id "一意文ID"
        string mall_product_id "モールシステムの商品ID"
        string jan_code "JANコード"
        longtext content "個別文コンテンツ"
        double sentiment_score "感情分析スコア"
        date post_date "レビュー投稿日"
        timestamp crawl_created_at "クローラーからのタイムスタンプ"
        timestamp bq_created_at "BigQueryからのタイムスタンプ"
        timestamp created_at
        timestamp updated_at
    }
    
    products ||--o{ product_details : "has many"
    reviews ||--o{ review_sentences : "contains"

テーブルカテゴリ

BigQueryソーステーブル

  • products: 同期状態追跡付きのソース商品データ(GCP_BQ_TB_PRODUCTで設定)
  • reviews: 同期状態追跡付きのソースレビューデータ(GCP_BQ_TB_REVIEWで設定)
  • reviews_sentences: 同期状態追跡付きのソースレビュー文データ(GCP_BQ_TB_REVIEW_SENTENCEで設定)

ローカルデータベーステーブル(gb_analyzer)

  • products: productsから同期されたコア商品情報
  • product_details: productsから同期された拡張商品詳細と価格情報
  • reviews: reviewsから同期された顧客レビューデータ
  • review_sentences: reviews_sentencesから同期されたレビューから抽出された個別文

期待される結果

BigQuery同期操作が正常に実行されると、システムは以下を提供します:

データ同期

  • BigQueryからの商品データがローカルのproductsproduct_detailsテーブルにコピーされる
  • BigQueryからのレビューデータがローカルのreviewsテーブルにコピーされる
  • BigQueryからのレビュー文データがローカルのreview_sentencesテーブルにコピーされる
  • 状態追跡により、処理後にレコードがBigQueryで同期済みとしてマークされる

システム操作

  • 30分ごとの定期同期により、ローカルデータベースが最新のBigQueryデータで最新状態に保たれる
  • 日次の欠損データ同期により、定期同期中に失敗したレコードが復旧される
  • 5分ごとの状態更新により、処理されたレコードがBigQueryで同期済みとしてマークされる
  • Redis追跡により、同期プロセス中のデータ損失が防がれる

バッチリスト

バッチ 説明
定期同期 BigQueryからデータを同期するために30分ごとに実行されるコマンド
状態更新 同期状態を更新するために5分ごとに実行されるコマンド
欠損データ同期 BigQueryから欠損データを同期するために日次で実行されるコマンド