Analyzer統合概要

説明

Analyzer統合コンポーネントは、Consoleデータベース(gb_console)とAnalyzerデータベース(gb_analyzer)間の通信を管理します。クローラーに送信されたレコードがAnalyzerデータベース内での存在を確認することで正常に処理されたかを検証し、Consoleデータベース内のステータスを更新します。この双方向検証フローにより、Trend Viewerバックエンドエコシステム全体で正確なステータス追跡が保証されます。

システム概要図

---
config:
  theme: base
  layout: dagre
  flowchart:
    curve: linear
    htmlLabels: true
  themeVariables:
    edgeLabelBackground: "transparent"
---
flowchart TD
    subgraph ConsoleTables["<div style='width: 300px'>Consoleデータベース (gb_console)</div>"]
        direction TB
        
        subgraph SummaryTables["サマリーウィッシュリストテーブル"]
            direction LR
            SummaryProducts[(summary_wishlist_products)]
            SummaryProductReviews[(summary_wishlist_product_reviews)]
            SummaryCategories[(summary_wishlist_categories)]
            SummarySearchQueries[(summary_wishlist_search_queries)]
        end
    end
    
    subgraph AnalyzerTables["<div style='width: 300px'>Analyzerデータベース (gb_analyzer)</div>"]
        direction TB
        
        subgraph ProductTables["製品データテーブル"]
            direction LR
            Products[(products)]
            Reviews[(reviews)]
        end
        
        subgraph RankingTables["ランキングテーブル"]
            direction LR
            TempCategoryRankings[(t_category_rankings)]
            TempSearchQueryRankings[(t_sq_rankings)]
        end
    end
    
    SyncSuccessCmd[analyzerdb:sync-crawl-success-from-analyzer]
    
    SyncSuccessCmd --- SyncSuccessCmdStep1[
        <div style='text-align: center'>
            <span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>1</span>
            <p style='margin-top: 8px'>レコード取得</p>
        </div>
    ]
    SyncSuccessCmdStep1 --> ConsoleTables
    
    SyncSuccessCmd --- SyncSuccessCmdStep2[
        <div style='text-align: center'>
            <span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>2</span>
            <p style='margin-top: 8px'>データクエリ</p>
        </div>
    ]
    SyncSuccessCmdStep2 --> AnalyzerTables
    
    SyncSuccessCmd --- SyncSuccessCmdStep3[
        <div style='text-align: center'>
            <span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>3</span>
            <p style='margin-top: 8px'>成功ステータス更新</p>
        </div>
    ]
    SyncSuccessCmdStep3 --> ConsoleTables
    
    style SyncSuccessCmd fill:#d9f2d9
    style ConsoleTables fill:#fcf3d2,stroke:#339933,stroke-width:1px
    style AnalyzerTables fill:#d2e3fc,stroke:#3333cc,stroke-width:1px
    style SummaryTables fill:#fff9db
    style ProductTables fill:#e6f0ff
    style RankingTables fill:#e6f0ff
    style SyncSuccessCmdStep1 fill:transparent,stroke:transparent,stroke-width:1px
    style SyncSuccessCmdStep2 fill:transparent,stroke:transparent,stroke-width:1px
    style SyncSuccessCmdStep3 fill:transparent,stroke:transparent,stroke-width:1px

詳細データフロー依存関係

Analyzer統合コンポーネントは、図の番号付きステップと一致する3段階の検証フローに従います:

1. レコード取得

最初のステップでは、検証が必要なConsoleデータベースからレコードを取得します:

  • analyzerdb:sync-crawl-success-from-analyzerコマンドが5分毎にトリガーされます
  • 処理するデータのタイプを指定する--data-typeパラメータを受け取ります:
    • SummaryProductsummary_wishlist_productsからレコードを取得
    • SummaryProductReviewsummary_wishlist_product_reviewsからレコードを取得
    • SummaryCategorysummary_wishlist_categoriesからレコードを取得
    • SummarySearchQuerysummary_wishlist_search_queriesからレコードを取得
  • レコードはcrawl_statusでフィルタリングされます(通常は「Crawling」ステータスのレコード)
  • コマンドはリポジトリインターフェースを使用してデータベーステーブルにアクセスします:
    • SummaryWishlistProductRepositoryInterface
    • SummaryWishlistProductReviewRepositoryInterface
    • SummaryWishlistCategoryRepositoryInterface
    • SummaryWishlistSearchQueryRepositoryInterface
  • レコードは設定可能なバッチで取得されます(デフォルト:100)

2. データ検証

レコードが取得されると、システムはAnalyzerデータベース内での存在を検証します:

  • 各レコードバッチに対して、専用のジョブがディスパッチされます:
    • SummaryProductJob:Analyzerデータベースのproductsテーブルをチェック
    • SummaryProductReviewJob:Analyzerデータベースのreviewsテーブルをチェック
    • SummaryCategoryJob:Analyzerデータベースのt_category_rankingsテーブルをチェック
    • SummarySearchQueryJob:Analyzerデータベースのt_sq_rankingsテーブルをチェック
  • 各ジョブはレコードをマッチングするための専用ルックアップキーを作成します:
    • 製品の場合:{mall_id}_{input_type}_{input}(input_typeはJan、Asin、またはRakutenId)
    • レビューの場合:関連製品の識別子に基づくキー
    • カテゴリの場合:{mall_id}_{category_id}
    • 検索クエリの場合:{mall_id}_{keyword}
  • データの新鮮さを保証するために時間制約が適用されます:
    • Consoleレコードのupdated_at後のAnalyzerデータベースのタイムスタンプを持つレコードのみが有効とみなされます
    • これにより、ウィッシュリストレコードが更新された後に処理されたデータのみがマッチとみなされることが保証されます

3. ステータス更新

検証後、システムは検証されたレコードのステータスを更新します:

  • Analyzerデータベースで見つかったレコードに対して、ジョブは以下を更新します:
    • 関連するサマリーウィッシュリストテーブルのcrawl_statusフィールドを2 (Success)
    • これはupdateByConditionsupdateBySummaryWishlistProductIdsなどのリポジトリメソッドを使用して行われます
  • 効率性のために複数のレコードをバッチ操作で更新できます
  • ステータス更新は一貫性を保証するためにデータベーストランザクション内で適用されます
  • 信頼性のためにデッドロック検出と再試行メカニズムが実装されています
  • 成功とエラー通知がSlackを通じて送信されます
  • 処理および更新されたレコード数に関する詳細情報を含むログが生成されます

この3段階のプロセスにより、2つのデータベース間で正確なステータス追跡が保証され、システムはクローラーとアナライザーコンポーネントによって正常に処理されたレコードを知ることができます。

データタイプとテーブル

Consoleデータベーステーブル(gb_console)

  • summary_wishlist_products:クローラーに送信された製品データを追跡

    • キーフィールド:id、mall_id、input_type、input、sending_status、crawl_status
    • 更新者SummaryProductJob
    • ルックアップキー:mall_id、input_type(Jan、Asin、RakutenId)、input値に基づく
  • summary_wishlist_product_reviews:製品レビューデータを追跡

    • キーフィールド:id、summary_wishlist_product_id、sending_status、crawl_status
    • 更新者SummaryProductReviewJob
    • 関係:summary_wishlist_productsへの外部キー
  • summary_wishlist_categories:カテゴリデータを追跡

    • キーフィールド:id、mall_id、category_id、sending_status、crawl_status
    • 更新者SummaryCategoryJob
    • ルックアップキー:mall_idとcategory_idに基づく
  • summary_wishlist_search_queries:検索クエリデータを追跡

    • キーフィールド:id、mall_id、keyword、sending_status、crawl_status
    • 更新者SummarySearchQueryJob
    • ルックアップキー:mall_idとkeywordに基づく

Analyzerデータベーステーブル(gb_analyzer)

  • products:処理された製品データを含む

    • キーフィールド:mall_id、mall_product_id、jan_code、input_type、unique_key
    • クエリ者SummaryProductJob
    • 時間制約:crawl_created_atはsummary_wishlist_product.updated_at後である必要
  • reviews:処理されたレビューデータを含む

    • キーフィールド:mall_id、mall_product_id/jan_code
    • クエリ者SummaryProductReviewJob
    • 時間制約:crawl_created_atはsummary_wishlist_product_review.updated_at後である必要
  • t_category_rankings:クローラーからの一時的なカテゴリランキング

    • キーフィールド:mall_id、category_id
    • クエリ者SummaryCategoryJob
    • 時間制約:crawl_created_atはsummary_wishlist_category.updated_at後である必要
  • t_sq_rankings:クローラーからの一時的な検索クエリランキング

    • キーフィールド:mall_id、keyword
    • クエリ者SummarySearchQueryJob
    • 時間制約:crawl_created_atはsummary_wishlist_search_query.updated_at後である必要

ステータス管理

システムは処理状態を追跡するためにCrawlStatus enumを使用します:

  • NotCrawled:新しいレコードの初期状態
  • Crawling:レコードが現在クローラーによって処理中
  • Success:処理が正常に完了(このコンポーネントによって更新)
  • Failed:処理がエラーで失敗(Crawler統合コンポーネントによって処理)
  • Error:処理中にシステムエラーが発生

これらのステータス値により以下が可能になります:

  • データ処理ステータスの正確なダッシュボードレポート
  • トラブルシューティングのための問題のあるレコードの特定
  • パイプライン全体でのデータ整合性検証
  • ダウンストリーム操作の適切なシーケンシング

データベーススキーマ

Consoleデータベーステーブル(gb_console)

erDiagram
    summary_wishlist_products {
        bigint id PK
        string input "製品の入力"
        string input_type "入力のタイプ: jan, asin, rakuten_id"
        bigint mall_id FK "mallsテーブルへの外部キー"
        integer crawl_status "クロールのステータス(デフォルト: New)"
        integer status "製品のステータス(デフォルト: New)"
        timestamp updated_at
    }
    
    summary_wishlist_product_reviews {
        bigint id PK
        bigint summary_wishlist_product_id FK "summary_wishlist_productsへの外部キー"
        integer crawl_status "クロールのステータス(デフォルト: New)"
        integer status "製品のステータス(デフォルト: New)"
        timestamp updated_at
    }
    
    summary_wishlist_categories {
        bigint id PK
        string category_id "モール内のカテゴリID"
        bigint mall_id FK "mallsテーブルへの外部キー"
        integer crawl_status "クロールのステータス(デフォルト: New)"
        integer status "製品のステータス(デフォルト: New)"
        timestamp updated_at
    }
    
    summary_wishlist_search_queries {
        bigint id PK
        bigint mall_id FK "モールのID"
        string keyword "検索キーワード"
        integer crawl_status "クロールのステータス(デフォルト: New)"
        integer status "製品のステータス(デフォルト: New)"
        timestamp updated_at
    }
    
    summary_wishlist_products ||--o| summary_wishlist_product_reviews : "has one"

Analyzerデータベーステーブル(gb_analyzer)

erDiagram
    products {
        bigint id PK
        integer mall_id "モール識別子"
        string mall_product_id "モールからの製品ID"
        string jan_code "JANコード(nullable)"
        string input_type "入力のタイプ: asin, jan, rakuten_id(デフォルト: asin)"
        timestamp crawl_created_at "製品がクロールされた時刻(nullable)"
    }
    
    reviews {
        bigint id PK
        integer mall_id "モール識別子(nullable)"
        string mall_product_id "モールからの製品ID(nullable)"
        string jan_code "JANコード(nullable、最大50文字)"
        timestamp crawl_created_at "レビューがクロールされた時刻(nullable)"
    }
    
    t_category_rankings {
        bigint id PK
        integer mall_id "モール識別子(nullable)"
        string category_id "カテゴリ識別子(nullable)"
        timestamp crawl_created_at "ランキングがクロールされた時刻(nullable)"
    }
    
    t_sq_rankings {
        bigint id PK
        integer mall_id "モール識別子(nullable)"
        string keyword "検索キーワード(nullable、最大100文字)"
        timestamp crawl_created_at "ランキングがクロールされた時刻(nullable)"
    }

頻度概要

タイムライン

timeline
    title Analyzer統合スケジュール
    section 成功同期操作
        5分毎<br>(例: 08.00, 08.05, など) : analyzerdb sync crawl success from analyzer

注意: このコマンドは各データタイプ(SummaryProduct、SummaryProductReview、SummaryCategory、SummarySearchQuery)に対して実行されます

期待される結果

これらのコマンドが正常に実行されると、システムは以下を提供します:

  • 自動ステータス検証:ConsoleとAnalyzerデータベース間でのクロール成功ステータスのリアルタイム検証により、正確な処理状態追跡を保証
  • クロスデータベースデータ整合性:時間制約検証を伴うgb_consoleとgb_analyzerデータベース間の一貫性を維持する双方向検証フロー
  • 効率的なバッチ処理:設定可能なバッチサイズと各データタイプ用の専用ジョブクラスを持つ非同期ジョブキュー処理
  • 堅牢なエラーハンドリング:指数バックオフ、デッドロック検出、運用信頼性のための包括的ログを伴う自動再試行メカニズム
  • ステータス同期:CrawlingからSuccessステートへの正確なcrawl_status更新により、適切なダウンストリーム処理とデータセット作成を可能に
  • マルチデータタイプサポート:タイプ固有のルックアップキー生成を伴う製品、レビュー、カテゴリ、検索クエリの同時処理
  • パフォーマンス最適化:効率的なリソース利用のためのバッチ更新操作とデータベーストランザクション管理を伴うリポジトリパターン実装

バッチリスト

名前 説明
成功同期 レコードがAnalyzerシステムによって正常に処理されたかを検証し、そのステータスを更新するために5分毎に実行されるコマンド