BigQuery同期概要
説明
BigQuery同期コンポーネントは、Google BigQueryからgb_analyzerデータベースへのデータ同期を担当します。このコンポーネントは、商品データ、レビュー、レビュー文がローカルデータベースでBigQueryのデータと一致するように定期的に更新されることを保証します。システムは、データ抽出にBaseBigQuerySyncCommandを使用し、効率性のためのチャンク化ジョブ処理と、データ整合性を確保するための状態追跡を使用します。
システム概要図
---
config:
theme: base
layout: dagre
flowchart:
curve: linear
htmlLabels: true
themeVariables:
edgeLabelBackground: "transparent"
---
flowchart TD
subgraph BigQueryTables["BigQueryソーステーブル"]
BQProduct[(products)]
BQReview[(reviews)]
BQReviewSentence[(reviews_sentences)]
end
Commands[BaseBigQuerySyncCommand]
Jobs[BigQuery同期ジョブ]
Redis[(Redisキャッシュ)]
subgraph AnalyzerTables["gb_analyzerテーブル"]
Products[(products)]
ProductDetails[(product_details)]
Reviews[(reviews)]
ReviewSentences[(review_sentences)]
end
StatusUpdate[UpdateBigQueryStatus]
BigQueryTables --- Step1[
<div style='text-align: center'>
<span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>1</span>
<p style='margin-top: 8px'>データ抽出</p>
</div>
]
Step1 --> Commands
Commands --- Step2[
<div style='text-align: center'>
<span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>2</span>
<p style='margin-top: 8px'>ジョブ作成</p>
</div>
]
Step2 --> Jobs
Jobs --- Step3[
<div style='text-align: center'>
<span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>3</span>
<p style='margin-top: 8px'>データ保存</p>
</div>
]
Step3 --> AnalyzerTables
Jobs --- Step4[
<div style='text-align: center'>
<span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>4</span>
<p style='margin-top: 8px'>ID追跡</p>
</div>
]
Step4 --> Redis
Redis --- Step5[
<div style='text-align: center'>
<span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>5</span>
<p style='margin-top: 8px'>状態更新</p>
</div>
]
Step5 --> StatusUpdate
StatusUpdate --- Step6[
<div style='text-align: center'>
<span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>6</span>
<p style='margin-top: 8px'>同期完了マーク</p>
</div>
]
Step6 --> BigQueryTables
%% Style definitions
style BigQueryTables fill:#d2e3fc,stroke:#4285f4,stroke-width:2px
style AnalyzerTables fill:#d9d9f2,stroke:#6666cc,stroke-width:2px
style Commands fill:#fcd9d9,stroke:#cc6666,stroke-width:2px
style Jobs fill:#fcd9d9,stroke:#cc6666,stroke-width:2px
style Redis fill:#f2d9f2,stroke:#cc66cc,stroke-width:2px
style StatusUpdate fill:#d9f2f2,stroke:#66cccc,stroke-width:2px
style Step1 fill:transparent,stroke:transparent,stroke-width:1px
style Step2 fill:transparent,stroke:transparent,stroke-width:1px
style Step3 fill:transparent,stroke:transparent,stroke-width:1px
style Step4 fill:transparent,stroke:transparent,stroke-width:1px
style Step5 fill:transparent,stroke:transparent,stroke-width:1px
style Step6 fill:transparent,stroke:transparent,stroke-width:1px
詳細データフロー依存関係
BigQuery同期コンポーネントは、Google BigQueryからローカルgb_analyzerデータベースへの信頼性の高いデータ転送を確保する洗練された6ステップのデータ同期パイプラインを調整します。このコンポーネントは、組み込みのエラーハンドリングと監視機能を備えたデータ抽出、変換、状態追跡の完全なライフサイクルを管理します。
データフローアーキテクチャ
BigQuery同期システムは、各ステップが前のステップの正常完了に明示的に依存する慎重に調整されたシーケンスを通じて動作します。アーキテクチャは、Redisベースの追跡、非同期ジョブ処理、包括的な状態管理を通じてデータ整合性を確保します。
主要コンポーネント:
- BaseBigQuerySyncCommand: データ抽出のためのコアコマンドフレームワーク
- 専門同期コマンド:
SyncProduct,SyncReview,SyncReviewSentence - キュージョブシステム:
SyncProductData,SyncReviewData,SyncReviewSentenceDataによる非同期データ処理 - Redis追跡: 処理されたレコードIDの一時保存
- 状態管理: 双方向同期確認のための
UpdateBigQueryStatusコマンド
ステップ1: BigQueryからのデータ抽出
フロー: BigQueryソーステーブル → BaseBigQuerySyncCommand
同期プロセスは、BaseBigQuerySyncCommandを継承する専門コマンドクラスの実行から始まります。各コマンドは特定のBigQueryテーブルをターゲットとし、データ取得を最適化するためのインテリジェントフィルタリングを適用します:
コマンド実行:
gcp:sync-productsはproductsをクエリ(GCP_BQ_TB_PRODUCT環境変数で設定)gcp:sync-reviewsはreviewsをクエリ(GCP_BQ_TB_REVIEW環境変数で設定)gcp:sync-review_sentencesはreviews_sentencesをクエリ(GCP_BQ_TB_REVIEW_SENTENCE環境変数で設定)
フィルタリングロジック:
- 定期同期(30分ごと): タイムスタンプベースのフィルタリングを使用して最後の同期間隔内に作成されたレコードを取得
- 欠損データ同期(--missedフラグ付きで日次): 失敗した同期を復旧するために
status IS NULLまたはstatus = 0のレコードをターゲット - バッチ処理: 設定可能なバッチサイズ(デフォルト: 500レコード)でメモリオーバーフローを防ぎ、BigQuery API使用を最適化
データ検証:
- スキーマ検証により、BigQueryデータが期待される形式と一致することを確保
- オプションフィールドのNull値処理
- ローカルデータベース保存のためのデータ型変換準備
ステップ2: ジョブ作成とキューイング
フロー: BaseBigQuerySyncCommand → 専門キュージョブ
抽出されたBigQueryデータは前処理を受け、非同期処理のための管理可能なチャンクに分割されます。このステップは、システムの応答性を確保し、大規模データセットの並列処理を可能にします:
ジョブディスパッチ:
- SyncProductDataジョブ:
productsデータからproductsとproduct_detailsテーブルの両方の同期を処理 - SyncReviewDataジョブ:
reviewsデータからレビューコンテンツ、評価、メタデータを処理 - SyncReviewSentenceDataジョブ:
reviews_sentencesデータから感情分析データと文抽出を管理
キュー管理:
- ジョブは非同期実行のためにLaravelのキューシステムにディスパッチされる
- 設定可能なキューワーカーが再試行メカニズムでジョブ処理を処理
- ジョブ優先順位付けにより、重要なデータタイプが最初に処理される
- メモリ管理により、大規模データ処理中のワーカー枯渇を防ぐ
エラーハンドリング:
- 失敗したジョブは指数バックオフで自動的に再試行される
- デッドレターキューは手動レビューのために永続的に失敗したジョブをキャプチャ
- 包括的なログ記録により、ジョブ実行状態とパフォーマンスメトリクスを追跡
ステップ3: データ変換と保存
フロー: キュージョブ → gb_analyzerデータベーステーブル
キュージョブは、BigQueryデータ形式をローカルデータベーススキーマに変換するための洗練されたデータ変換ロジックを実行します。このステップは、データ正規化と関係確立のコアビジネスロジックを処理します:
商品データ処理:
- プライマリテーブル(
products): mall_id、jan_code、input_type、unique_key生成を含むコア商品情報 - 詳細テーブル(
product_details): 価格、評価、ショップ情報、売上データを含む拡張商品メタデータ - 関係確立: productsとproduct_detailsテーブル間の外部キー関係
レビューデータ処理:
- コンテンツ処理: レビューテキストの正規化とエンコーディング処理
- メタデータ抽出: ユーザー人口統計、購入確認、レビュー分類
- 評価正規化: 異なるモールシステム間での評価スケールの標準化
レビュー文データ処理:
- 文分割: レビューコンテンツからの個別文抽出
- 感情分析: 感情スコアと分類の処理
- コンテンツインデックス: 検索と分析操作の準備
データ整合性対策:
- 一意キー制約を使用した重複検出と処理
- 外部キー検証により参照整合性を確保
- トランザクション管理により失敗した操作のロールバック機能を提供
ステップ4: RedisでのID追跡
フロー: キュージョブ → Redisキャッシュ
正常に処理されたレコードIDは、BigQueryへの信頼性の高い状態更新を可能にするためにRedisで体系的に追跡されます。この並列追跡システムは、システム障害に対する回復力を提供し、処理されたレコードが失われないことを保証します:
Redis保存構造:
- キー形式:
bigquery_sync:{data_type}:{batch_id} - 値コンテンツ: メタデータ付きの処理されたレコードIDを含むJSON配列
- 有効期限: 設定可能なTTL(デフォルト: 24時間)でメモリ膨張を防ぐ
追跡メタデータ:
- BigQueryソーステーブルからのレコードID
- データタイプ識別子(product、review、review_sentence)
- 監査証跡のための処理タイムスタンプ
- グループ化された操作のためのバッチ識別子
- 各レコードの成功/失敗状態
信頼性機能:
- アトミック操作により一貫したRedis状態を確保
- バックアップメカニズムによりRedis障害を処理
- 監視アラートによりRedisメモリ使用量とパフォーマンスを追跡
ステップ5: 状態更新処理
フロー: Redisキャッシュ → UpdateBigQueryStatusコマンド
UpdateBigQueryStatusコマンドは5分スケジュールで動作し、Redisから蓄積されたレコードIDを処理し、対応するBigQueryソーステーブルを更新します。このステップは、BigQueryが現在の同期状態を反映することを確保するフィードバックループを完成させます:
バッチ処理ロジック:
- Redisから処理されたレコードIDのバッチを取得(設定可能なバッチサイズ: 1000レコード)
- 最適化されたBigQuery更新操作のためにデータタイプ別にレコードをグループ化
- BigQuery APIレート制限のための指数バックオフを実装
更新操作:
- 状態フラグ更新: 正常に同期されたレコードに
status = 1を設定 - タイムスタンプ更新: 監査目的で処理完了時間を記録
- バッチ最適化: 効率的なバッチ更新によりBigQuery API呼び出しを最小化
エラー復旧:
- 失敗した状態更新は後続のコマンド実行で再試行される
- 持続的な失敗はアラート通知をトリガー
- Redis エントリは正常なBigQuery更新が確認されるまで保持される
ステップ6: 同期完了
フロー: UpdateBigQueryStatusコマンド → BigQueryソーステーブル
最終ステップは、BigQueryソーステーブルを更新し、一時追跡データをクリーンアップすることで同期サイクルの完了をマークします。このステップは、システムの清潔性を確保し、次の同期サイクルの準備をします:
BigQuery更新:
- 状態マーキング: レコードは正常な同期を示す
status = 1でマークされる - 監査証跡: 更新タイムスタンプは同期履歴を提供
- クエリ最適化: 状態フラグにより将来の同期操作の効率的なフィルタリングが可能
クリーンアップ操作:
- Redisクリーンアップ: 正常に処理されたレコードIDがRedisキャッシュから削除される
- メモリ管理: 時間の経過とともにRedisメモリの蓄積を防ぐ
- ログ生成: 運用監視とトラブルシューティングのための包括的なログ記録
監視とアラート:
- 成功メトリクスはパフォーマンス監視のために記録される
- 失敗通知は設定されたSlackチャンネルに送信される
- システムヘルスダッシュボードは同期状態とパフォーマンスを反映
重要なシステム依存関係
順次処理要件:
- 各ステップは次のステップが開始される前に正常に完了する必要がある
- 失敗したステップは包括的なエラーハンドリングと再試行メカニズムをトリガー
- データ一貫性はトランザクション管理とロールバック機能により維持される
リソース管理:
- キューワーカー: ジョブ処理のために十分なワーカー容量を維持する必要がある
- Redisメモリ: 追跡データ損失を防ぐために適切なメモリ割り当てが必要
- BigQueryクォータ: APIレート制限とクエリクォータを監視・管理する必要がある
データ整合性保護:
- 重複防止: 一意制約とupsert操作によりデータ重複を防ぐ
- 参照整合性: 外部キー制約により関係の一貫性を維持
- トランザクション境界: アトミック操作により障害時のデータ一貫性を確保
監視と可観測性:
- パフォーマンスメトリクス: 処理時間、スループット率、エラー頻度
- システムヘルス: キュー深度、Redisメモリ使用量、BigQueryクォータ消費
- アラートシステム: 障害、パフォーマンス低下、容量問題のリアルタイム通知
頻度概要
タイムライン
timeline
title BigQuery同期スケジュール
section 定期操作
30分ごと<br>(例: 08.00) : gcp sync-products
: gcp sync-reviews
: gcp sync-review_sentences
section 状態更新
5分ごと<br>(例: 08.05) : update bigquery-status
section ...
section 日次操作
日次<br>(例: 00.00) : gcp sync-products --missed
: gcp sync-reviews --missed
: gcp sync-review_sentences --missed
データベーススキーマ
erDiagram
products {
bigint id PK
integer mall_id "モール識別子"
string mall_product_id "モールシステムの商品ID"
string jan_code "JANコード"
string input_type "データソース"
string unique_key "システム生成の一意キー"
string ranking_description "ランキング説明"
timestamp crawl_created_at "クローラーからのタイムスタンプ"
timestamp bq_created_at "BigQueryからのタイムスタンプ"
timestamp created_at
timestamp updated_at
}
product_details {
bigint id PK
bigint product_id FK
string image "商品画像URL"
string product_url "商品ページURL"
string title "商品タイトル"
double rating "平均評価"
integer num_reviews "レビュー数"
string mall_shop_name "モールのショップ名"
string mall_shop_id "モールのショップID"
string maker_name "メーカー名"
double base_price "基本価格"
double shipping_fee "送料"
double price "計算された最終価格"
integer sales_one_month "月間売上"
integer point "ポイント"
json coupon "クーポン情報"
string ranking_description "ランキング説明"
string unique_key "システム生成の一意キー"
timestamp crawl_created_at "クローラーからのタイムスタンプ"
timestamp bq_created_at "BigQueryからのタイムスタンプ"
timestamp created_at
timestamp updated_at
}
reviews {
bigint id PK
integer mall_id "モール識別子"
integer review_count "ユーザーによるレビュー数"
integer vote "役立つ票"
tinyint type "レビュータイプ (1=非購入者, 2=購入者, 3=Vine)"
string variant "バリアントオプション"
string age "購入者の年齢"
string gender "購入者の性別"
text content "レビューコンテンツ"
string mall_product_id "モールシステムの商品ID"
string jan_code "JANコード"
double rating "数値評価"
string shop_name "ショップ名"
date post_date "レビュー投稿日"
bigint crawl_review_id "クローラーからの一意レビューID"
timestamp crawl_created_at "クローラーからのタイムスタンプ"
timestamp bq_created_at "BigQueryからのタイムスタンプ"
timestamp created_at
timestamp updated_at
}
review_sentences {
bigint id PK
bigint crawl_review_id "関連レビューID"
bigint sentence_id "一意文ID"
string mall_product_id "モールシステムの商品ID"
string jan_code "JANコード"
longtext content "個別文コンテンツ"
double sentiment_score "感情分析スコア"
date post_date "レビュー投稿日"
timestamp crawl_created_at "クローラーからのタイムスタンプ"
timestamp bq_created_at "BigQueryからのタイムスタンプ"
timestamp created_at
timestamp updated_at
}
products ||--o{ product_details : "has many"
reviews ||--o{ review_sentences : "contains"
テーブルカテゴリ
BigQueryソーステーブル
- products: 同期状態追跡付きのソース商品データ(GCP_BQ_TB_PRODUCTで設定)
- reviews: 同期状態追跡付きのソースレビューデータ(GCP_BQ_TB_REVIEWで設定)
- reviews_sentences: 同期状態追跡付きのソースレビュー文データ(GCP_BQ_TB_REVIEW_SENTENCEで設定)
ローカルデータベーステーブル(gb_analyzer)
- products: productsから同期されたコア商品情報
- product_details: productsから同期された拡張商品詳細と価格情報
- reviews: reviewsから同期された顧客レビューデータ
- review_sentences: reviews_sentencesから同期されたレビューから抽出された個別文
期待される結果
BigQuery同期操作が正常に実行されると、システムは以下を提供します:
データ同期
- BigQueryからの商品データがローカルの
productsとproduct_detailsテーブルにコピーされる - BigQueryからのレビューデータがローカルの
reviewsテーブルにコピーされる - BigQueryからのレビュー文データがローカルの
review_sentencesテーブルにコピーされる - 状態追跡により、処理後にレコードがBigQueryで同期済みとしてマークされる
システム操作
- 30分ごとの定期同期により、ローカルデータベースが最新のBigQueryデータで最新状態に保たれる
- 日次の欠損データ同期により、定期同期中に失敗したレコードが復旧される
- 5分ごとの状態更新により、処理されたレコードがBigQueryで同期済みとしてマークされる
- Redis追跡により、同期プロセス中のデータ損失が防がれる
バッチリスト
| バッチ | 説明 |
|---|---|
| 定期同期 | BigQueryからデータを同期するために30分ごとに実行されるコマンド |
| 状態更新 | 同期状態を更新するために5分ごとに実行されるコマンド |
| 欠損データ同期 | BigQueryから欠損データを同期するために日次で実行されるコマンド |