BigQuery Sync Overview

Description

The BigQuery Sync component is responsible for synchronizing data from Google BigQuery to the gb_analyzer database. This component ensures that product data, reviews, and review sentences are regularly updated in the local database to match the data in BigQuery. The system uses the BaseBigQuerySyncCommand for data extraction, chunked job processing for efficiency, and status tracking to ensure data integrity.

Overview System Diagram

---
config:
  theme: base
  layout: dagre
  flowchart:
    curve: linear
    htmlLabels: true
  themeVariables:
    edgeLabelBackground: "transparent"
---
flowchart TD
    subgraph BigQueryTables["BigQuery Source Tables"]
        BQProduct[(products)]
        BQReview[(reviews)]
        BQReviewSentence[(reviews_sentences)]
    end
    
    Commands[BaseBigQuerySyncCommand]
    Jobs[BigQuery Sync Jobs]
    Redis[(Redis Cache)]
    
    subgraph AnalyzerTables["gb_analyzer Tables"]
        Products[(products)]
        ProductDetails[(product_details)]
        Reviews[(reviews)]
        ReviewSentences[(review_sentences)]
    end
    
    StatusUpdate[UpdateBigQueryStatus]
    
    BigQueryTables --- Step1[
        <div style='text-align: center'>
            <span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>1</span>
            <p style='margin-top: 8px'>Extract Data</p>
        </div>
    ]
    Step1 --> Commands
    
    Commands --- Step2[
        <div style='text-align: center'>
            <span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>2</span>
            <p style='margin-top: 8px'>Create Jobs</p>
        </div>
    ]
    Step2 --> Jobs
    
    Jobs --- Step3[
        <div style='text-align: center'>
            <span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>3</span>
            <p style='margin-top: 8px'>Store Data</p>
        </div>
    ]
    Step3 --> AnalyzerTables
    
    Jobs --- Step4[
        <div style='text-align: center'>
            <span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>4</span>
            <p style='margin-top: 8px'>Track IDs</p>
        </div>
    ]
    Step4 --> Redis
    
    Redis --- Step5[
        <div style='text-align: center'>
            <span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>5</span>
            <p style='margin-top: 8px'>Update Status</p>
        </div>
    ]
    Step5 --> StatusUpdate
    
    StatusUpdate --- Step6[
        <div style='text-align: center'>
            <span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>6</span>
            <p style='margin-top: 8px'>Mark Synced</p>
        </div>
    ]
    Step6 --> BigQueryTables
    
    %% Style definitions
    style BigQueryTables fill:#d2e3fc,stroke:#4285f4,stroke-width:2px
    style AnalyzerTables fill:#d9d9f2,stroke:#6666cc,stroke-width:2px
    style Commands fill:#fcd9d9,stroke:#cc6666,stroke-width:2px
    style Jobs fill:#fcd9d9,stroke:#cc6666,stroke-width:2px
    style Redis fill:#f2d9f2,stroke:#cc66cc,stroke-width:2px
    style StatusUpdate fill:#d9f2f2,stroke:#66cccc,stroke-width:2px
    style Step1 fill:transparent,stroke:transparent,stroke-width:1px
    style Step2 fill:transparent,stroke:transparent,stroke-width:1px
    style Step3 fill:transparent,stroke:transparent,stroke-width:1px
    style Step4 fill:transparent,stroke:transparent,stroke-width:1px
    style Step5 fill:transparent,stroke:transparent,stroke-width:1px
    style Step6 fill:transparent,stroke:transparent,stroke-width:1px

Detail Dataflow Dependency

The BigQuery Sync component orchestrates a sophisticated 6-step data synchronization pipeline that ensures reliable data transfer from Google BigQuery to the local gb_analyzer database. This component manages the complete lifecycle of data extraction, transformation, and status tracking with built-in error handling and monitoring capabilities.

Data Flow Architecture

The BigQuery Sync system operates through a carefully orchestrated sequence where each step has explicit dependencies on the previous step's successful completion. The architecture ensures data integrity through Redis-based tracking, asynchronous job processing, and comprehensive status management.

Key Components:

  • BaseBigQuerySyncCommand: Core command framework for data extraction
  • Specialized Sync Commands: SyncProduct, SyncReview, SyncReviewSentence
  • Queue Job System: Asynchronous data processing with SyncProductData, SyncReviewData, SyncReviewSentenceData
  • Redis Tracking: Temporary storage for processed record IDs
  • Status Management: UpdateBigQueryStatus command for bidirectional sync confirmation

Step 1: Data Extraction from BigQuery

Flow: BigQuery Source Tables → BaseBigQuerySyncCommand

The synchronization process begins with the execution of specialized command classes that inherit from BaseBigQuerySyncCommand. Each command targets specific BigQuery tables and applies intelligent filtering to optimize data retrieval:

Command Execution:

  • gcp:sync-products queries products (configured via GCP_BQ_TB_PRODUCT environment variable)
  • gcp:sync-reviews queries reviews (configured via GCP_BQ_TB_REVIEW environment variable)
  • gcp:sync-review_sentences queries reviews_sentences (configured via GCP_BQ_TB_REVIEW_SENTENCE environment variable)

Filtering Logic:

  • Regular Sync (Every 30 minutes): Retrieves records created within the last sync interval using timestamp-based filtering
  • Missed Data Sync (Daily with --missed flag): Targets records with status IS NULL or status = 0 to recover failed synchronizations
  • Batch Processing: Configurable batch sizes (default: 500 records) prevent memory overflow and optimize BigQuery API usage

Data Validation:

  • Schema validation ensures BigQuery data matches expected format
  • Null value handling for optional fields
  • Data type conversion preparation for local database storage

Step 2: Job Creation and Queuing

Flow: BaseBigQuerySyncCommand → Specialized Queue Jobs

Extracted BigQuery data undergoes preprocessing and is segmented into manageable chunks for asynchronous processing. This step ensures system responsiveness and enables parallel processing of large datasets:

Job Dispatching:

  • SyncProductData Jobs: Handle both products and product_details table synchronization from products data
  • SyncReviewData Jobs: Process review content, ratings, and metadata from reviews data
  • SyncReviewSentenceData Jobs: Manage sentiment analysis data and sentence extraction from reviews_sentences data

Queue Management:

  • Jobs are dispatched to Laravel's queue system for asynchronous execution
  • Configurable queue workers handle job processing with retry mechanisms
  • Job prioritization ensures critical data types are processed first
  • Memory management prevents worker exhaustion during large data processing

Error Handling:

  • Failed jobs are automatically retried with exponential backoff
  • Dead letter queues capture permanently failed jobs for manual review
  • Comprehensive logging tracks job execution status and performance metrics

Step 3: Data Transformation and Storage

Flow: Queue Jobs → gb_analyzer Database Tables

Queue jobs execute sophisticated data transformation logic to convert BigQuery data formats into the local database schema. This step handles the core business logic of data normalization and relationship establishment:

Products Data Processing:

  • Primary Table (products): Core product information including mall_id, jan_code, input_type, and unique_key generation
  • Details Table (product_details): Extended product metadata including pricing, ratings, shop information, and sales data
  • Relationship Establishment: Foreign key relationships between products and product_details tables

Reviews Data Processing:

  • Content Processing: Review text normalization and encoding handling
  • Metadata Extraction: User demographics, purchase verification, and review classification
  • Rating Normalization: Standardization of rating scales across different mall systems

Review Sentences Data Processing:

  • Sentence Segmentation: Individual sentence extraction from review content
  • Sentiment Analysis: Processing of sentiment scores and classification
  • Content Indexing: Preparation for search and analysis operations

Data Integrity Measures:

  • Duplicate detection and handling using unique key constraints
  • Foreign key validation ensures referential integrity
  • Transaction management provides rollback capabilities for failed operations

Step 4: ID Tracking in Redis

Flow: Queue Jobs → Redis Cache

Successfully processed record IDs are systematically tracked in Redis to enable reliable status updates back to BigQuery. This parallel tracking system provides resilience against system failures and ensures no processed records are lost:

Redis Storage Structure:

  • Key Format: bigquery_sync:{data_type}:{batch_id}
  • Value Content: JSON arrays containing processed record IDs with metadata
  • Expiration: Configurable TTL (default: 24 hours) prevents memory bloat

Tracking Metadata:

  • Record ID from BigQuery source table
  • Data type identifier (product, review, review_sentence)
  • Processing timestamp for audit trails
  • Batch identifier for grouped operations
  • Success/failure status for each record

Reliability Features:

  • Atomic operations ensure consistent Redis state
  • Backup mechanisms handle Redis failures
  • Monitoring alerts track Redis memory usage and performance

Step 5: Status Update Processing

Flow: Redis Cache → UpdateBigQueryStatus Command

The UpdateBigQueryStatus command operates on a 5-minute schedule to process accumulated record IDs from Redis and update the corresponding BigQuery source tables. This step completes the feedback loop ensuring BigQuery reflects the current synchronization state:

Batch Processing Logic:

  • Retrieves batches of processed record IDs from Redis (configurable batch size: 1000 records)
  • Groups records by data type for optimized BigQuery update operations
  • Implements exponential backoff for BigQuery API rate limiting

Update Operations:

  • Status Flag Updates: Sets status = 1 for successfully synchronized records
  • Timestamp Updates: Records processing completion time for audit purposes
  • Batch Optimization: Minimizes BigQuery API calls through efficient batch updates

Error Recovery:

  • Failed status updates are retried in subsequent command executions
  • Persistent failures trigger alert notifications
  • Redis entries are preserved until successful BigQuery updates are confirmed

Step 6: Synchronization Completion

Flow: UpdateBigQueryStatus Command → BigQuery Source Tables

The final step marks the completion of the synchronization cycle by updating BigQuery source tables and cleaning up temporary tracking data. This step ensures system cleanliness and prepares for the next synchronization cycle:

BigQuery Updates:

  • Status Marking: Records are marked with status = 1 indicating successful synchronization
  • Audit Trail: Update timestamps provide synchronization history
  • Query Optimization: Status flags enable efficient filtering for future sync operations

Cleanup Operations:

  • Redis Cleanup: Successfully processed record IDs are removed from Redis cache
  • Memory Management: Prevents Redis memory accumulation over time
  • Log Generation: Comprehensive logging for operational monitoring and troubleshooting

Monitoring and Alerting:

  • Success metrics are recorded for performance monitoring
  • Failure notifications are sent to configured Slack channels
  • System health dashboards reflect synchronization status and performance

Critical System Dependencies

Sequential Processing Requirements:

  • Each step must complete successfully before the next step can initiate
  • Failed steps trigger comprehensive error handling and retry mechanisms
  • Data consistency is maintained through transaction management and rollback capabilities

Resource Management:

  • Queue Workers: Sufficient worker capacity must be maintained for job processing
  • Redis Memory: Adequate memory allocation prevents tracking data loss
  • BigQuery Quotas: API rate limits and query quotas must be monitored and managed

Data Integrity Safeguards:

  • Duplicate Prevention: Unique constraints and upsert operations prevent data duplication
  • Referential Integrity: Foreign key constraints maintain relationship consistency
  • Transaction Boundaries: Atomic operations ensure data consistency during failures

Monitoring and Observability:

  • Performance Metrics: Processing times, throughput rates, and error frequencies
  • System Health: Queue depth, Redis memory usage, and BigQuery quota consumption
  • Alert Systems: Real-time notifications for failures, performance degradation, and capacity issues

Frequency Overview

Timeline

timeline
    title BigQuery Sync Schedule
    section Regular Operations
        Every 30 minutes<br>(Ex. 08.00) : gcp sync-products
                                         : gcp sync-reviews
                                         : gcp sync-review_sentences
    section Status Updates
        Every 5 minutes<br>(Ex. 08.05) : update bigquery-status
    section ...
	
    section Daily Operations
        Daily<br>(Ex. 00.00) : gcp sync-products --missed
                              : gcp sync-reviews --missed
                              : gcp sync-review_sentences --missed

Database Schema

erDiagram
    products {
        bigint id PK
        integer mall_id "Mall identifier"
        string mall_product_id "Product ID in mall system"
        string jan_code "JAN code"
        string input_type "Source of data"
        string unique_key "System-generated unique key"
        string ranking_description "Ranking description"
        timestamp crawl_created_at "Timestamp from crawler"
        timestamp bq_created_at "Timestamp from BigQuery"
        timestamp created_at
        timestamp updated_at
    }
    
    product_details {
        bigint id PK
        bigint product_id FK
        string image "Product image URL"
        string product_url "Product page URL"
        string title "Product title"
        double rating "Average rating"
        integer num_reviews "Number of reviews"
        string mall_shop_name "Shop name in mall"
        string mall_shop_id "Shop ID in mall"
        string maker_name "Manufacturer name"
        double base_price "Base price"
        double shipping_fee "Shipping cost"
        double price "Calculated final price"
        integer sales_one_month "Monthly sales"
        integer point "Points"
        json coupon "Coupon information"
        string ranking_description "Ranking description"
        string unique_key "System-generated unique key"
        timestamp crawl_created_at "Timestamp from crawler"
        timestamp bq_created_at "Timestamp from BigQuery"
        timestamp created_at
        timestamp updated_at
    }
    
    reviews {
        bigint id PK
        integer mall_id "Mall identifier"
        integer review_count "Number of reviews by user"
        integer vote "Helpful votes"
        tinyint type "Review type (1=Non-buyer, 2=Buyer, 3=Vine)"
        string variant "Variant options"
        string age "Age of buyer"
        string gender "Gender of buyer"
        text content "Review content"
        string mall_product_id "Product ID in mall system"
        string jan_code "JAN code"
        double rating "Numerical rating"
        string shop_name "Shop name"
        date post_date "Date of review"
        bigint crawl_review_id "Unique review ID from crawler"
        timestamp crawl_created_at "Timestamp from crawler"
        timestamp bq_created_at "Timestamp from BigQuery"
        timestamp created_at
        timestamp updated_at
    }
    
    review_sentences {
        bigint id PK
        bigint crawl_review_id "Related review ID"
        bigint sentence_id "Unique sentence ID"
        string mall_product_id "Product ID in mall system"
        string jan_code "JAN code"
        longtext content "Individual sentence content"
        double sentiment_score "Sentiment analysis score"
        date post_date "Date of review"
        timestamp crawl_created_at "Timestamp from crawler"
        timestamp bq_created_at "Timestamp from BigQuery"
        timestamp created_at
        timestamp updated_at
    }
    
    products ||--o{ product_details : "has many"
    reviews ||--o{ review_sentences : "contains"

Table Categories

BigQuery Source Tables

  • products: Source product data with sync status tracking (configured via GCP_BQ_TB_PRODUCT)
  • reviews: Source review data with sync status tracking (configured via GCP_BQ_TB_REVIEW)
  • reviews_sentences: Source review sentence data with sync status tracking (configured via GCP_BQ_TB_REVIEW_SENTENCE)

Local Database Tables (gb_analyzer)

  • products: Core product information synchronized from products
  • product_details: Extended product details and pricing information synchronized from products
  • reviews: Customer review data synchronized from reviews
  • review_sentences: Individual sentences extracted from reviews synchronized from reviews_sentences

Expected Outcomes

When the BigQuery sync operations execute successfully, the system delivers:

Data Synchronization

  • Product data from BigQuery is copied to local products and product_details tables
  • Review data from BigQuery is copied to local reviews table
  • Review sentence data from BigQuery is copied to local review_sentences table
  • Status tracking ensures records are marked as synced in BigQuery after processing

System Operations

  • Regular sync every 30 minutes keeps local database up-to-date with recent BigQuery data
  • Daily missed data sync recovers any records that failed during regular sync
  • Status updates every 5 minutes mark processed records as synced in BigQuery
  • Redis tracking prevents data loss during the sync process

Batch List

Batch Description
Regular Sync Commands that run every 30 minutes to sync data from BigQuery
Status Updates Command that runs every 5 minutes to update sync status
Missed Data Sync Commands that run daily to sync missed data from BigQuery