BigQuery Sync Overview
Description
The BigQuery Sync component is responsible for synchronizing data from Google BigQuery to the gb_analyzer database. This component ensures that product data, reviews, and review sentences are regularly updated in the local database to match the data in BigQuery. The system uses the BaseBigQuerySyncCommand for data extraction, chunked job processing for efficiency, and status tracking to ensure data integrity.
Overview System Diagram
---
config:
theme: base
layout: dagre
flowchart:
curve: linear
htmlLabels: true
themeVariables:
edgeLabelBackground: "transparent"
---
flowchart TD
subgraph BigQueryTables["BigQuery Source Tables"]
BQProduct[(products)]
BQReview[(reviews)]
BQReviewSentence[(reviews_sentences)]
end
Commands[BaseBigQuerySyncCommand]
Jobs[BigQuery Sync Jobs]
Redis[(Redis Cache)]
subgraph AnalyzerTables["gb_analyzer Tables"]
Products[(products)]
ProductDetails[(product_details)]
Reviews[(reviews)]
ReviewSentences[(review_sentences)]
end
StatusUpdate[UpdateBigQueryStatus]
BigQueryTables --- Step1[
<div style='text-align: center'>
<span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>1</span>
<p style='margin-top: 8px'>Extract Data</p>
</div>
]
Step1 --> Commands
Commands --- Step2[
<div style='text-align: center'>
<span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>2</span>
<p style='margin-top: 8px'>Create Jobs</p>
</div>
]
Step2 --> Jobs
Jobs --- Step3[
<div style='text-align: center'>
<span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>3</span>
<p style='margin-top: 8px'>Store Data</p>
</div>
]
Step3 --> AnalyzerTables
Jobs --- Step4[
<div style='text-align: center'>
<span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>4</span>
<p style='margin-top: 8px'>Track IDs</p>
</div>
]
Step4 --> Redis
Redis --- Step5[
<div style='text-align: center'>
<span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>5</span>
<p style='margin-top: 8px'>Update Status</p>
</div>
]
Step5 --> StatusUpdate
StatusUpdate --- Step6[
<div style='text-align: center'>
<span style='display: inline-block; background-color: #6699cc !important; color:white; width: 28px; height: 28px; line-height: 28px; border-radius: 50%; font-weight: bold'>6</span>
<p style='margin-top: 8px'>Mark Synced</p>
</div>
]
Step6 --> BigQueryTables
%% Style definitions
style BigQueryTables fill:#d2e3fc,stroke:#4285f4,stroke-width:2px
style AnalyzerTables fill:#d9d9f2,stroke:#6666cc,stroke-width:2px
style Commands fill:#fcd9d9,stroke:#cc6666,stroke-width:2px
style Jobs fill:#fcd9d9,stroke:#cc6666,stroke-width:2px
style Redis fill:#f2d9f2,stroke:#cc66cc,stroke-width:2px
style StatusUpdate fill:#d9f2f2,stroke:#66cccc,stroke-width:2px
style Step1 fill:transparent,stroke:transparent,stroke-width:1px
style Step2 fill:transparent,stroke:transparent,stroke-width:1px
style Step3 fill:transparent,stroke:transparent,stroke-width:1px
style Step4 fill:transparent,stroke:transparent,stroke-width:1px
style Step5 fill:transparent,stroke:transparent,stroke-width:1px
style Step6 fill:transparent,stroke:transparent,stroke-width:1px
Detail Dataflow Dependency
The BigQuery Sync component orchestrates a sophisticated 6-step data synchronization pipeline that ensures reliable data transfer from Google BigQuery to the local gb_analyzer database. This component manages the complete lifecycle of data extraction, transformation, and status tracking with built-in error handling and monitoring capabilities.
Data Flow Architecture
The BigQuery Sync system operates through a carefully orchestrated sequence where each step has explicit dependencies on the previous step's successful completion. The architecture ensures data integrity through Redis-based tracking, asynchronous job processing, and comprehensive status management.
Key Components:
- BaseBigQuerySyncCommand: Core command framework for data extraction
- Specialized Sync Commands:
SyncProduct,SyncReview,SyncReviewSentence - Queue Job System: Asynchronous data processing with
SyncProductData,SyncReviewData,SyncReviewSentenceData - Redis Tracking: Temporary storage for processed record IDs
- Status Management:
UpdateBigQueryStatuscommand for bidirectional sync confirmation
Step 1: Data Extraction from BigQuery
Flow: BigQuery Source Tables → BaseBigQuerySyncCommand
The synchronization process begins with the execution of specialized command classes that inherit from BaseBigQuerySyncCommand. Each command targets specific BigQuery tables and applies intelligent filtering to optimize data retrieval:
Command Execution:
gcp:sync-productsqueriesproducts(configured via GCP_BQ_TB_PRODUCT environment variable)gcp:sync-reviewsqueriesreviews(configured via GCP_BQ_TB_REVIEW environment variable)gcp:sync-review_sentencesqueriesreviews_sentences(configured via GCP_BQ_TB_REVIEW_SENTENCE environment variable)
Filtering Logic:
- Regular Sync (Every 30 minutes): Retrieves records created within the last sync interval using timestamp-based filtering
- Missed Data Sync (Daily with --missed flag): Targets records with
status IS NULLorstatus = 0to recover failed synchronizations - Batch Processing: Configurable batch sizes (default: 500 records) prevent memory overflow and optimize BigQuery API usage
Data Validation:
- Schema validation ensures BigQuery data matches expected format
- Null value handling for optional fields
- Data type conversion preparation for local database storage
Step 2: Job Creation and Queuing
Flow: BaseBigQuerySyncCommand → Specialized Queue Jobs
Extracted BigQuery data undergoes preprocessing and is segmented into manageable chunks for asynchronous processing. This step ensures system responsiveness and enables parallel processing of large datasets:
Job Dispatching:
- SyncProductData Jobs: Handle both
productsandproduct_detailstable synchronization fromproductsdata - SyncReviewData Jobs: Process review content, ratings, and metadata from
reviewsdata - SyncReviewSentenceData Jobs: Manage sentiment analysis data and sentence extraction from
reviews_sentencesdata
Queue Management:
- Jobs are dispatched to Laravel's queue system for asynchronous execution
- Configurable queue workers handle job processing with retry mechanisms
- Job prioritization ensures critical data types are processed first
- Memory management prevents worker exhaustion during large data processing
Error Handling:
- Failed jobs are automatically retried with exponential backoff
- Dead letter queues capture permanently failed jobs for manual review
- Comprehensive logging tracks job execution status and performance metrics
Step 3: Data Transformation and Storage
Flow: Queue Jobs → gb_analyzer Database Tables
Queue jobs execute sophisticated data transformation logic to convert BigQuery data formats into the local database schema. This step handles the core business logic of data normalization and relationship establishment:
Products Data Processing:
- Primary Table (
products): Core product information including mall_id, jan_code, input_type, and unique_key generation - Details Table (
product_details): Extended product metadata including pricing, ratings, shop information, and sales data - Relationship Establishment: Foreign key relationships between products and product_details tables
Reviews Data Processing:
- Content Processing: Review text normalization and encoding handling
- Metadata Extraction: User demographics, purchase verification, and review classification
- Rating Normalization: Standardization of rating scales across different mall systems
Review Sentences Data Processing:
- Sentence Segmentation: Individual sentence extraction from review content
- Sentiment Analysis: Processing of sentiment scores and classification
- Content Indexing: Preparation for search and analysis operations
Data Integrity Measures:
- Duplicate detection and handling using unique key constraints
- Foreign key validation ensures referential integrity
- Transaction management provides rollback capabilities for failed operations
Step 4: ID Tracking in Redis
Flow: Queue Jobs → Redis Cache
Successfully processed record IDs are systematically tracked in Redis to enable reliable status updates back to BigQuery. This parallel tracking system provides resilience against system failures and ensures no processed records are lost:
Redis Storage Structure:
- Key Format:
bigquery_sync:{data_type}:{batch_id} - Value Content: JSON arrays containing processed record IDs with metadata
- Expiration: Configurable TTL (default: 24 hours) prevents memory bloat
Tracking Metadata:
- Record ID from BigQuery source table
- Data type identifier (product, review, review_sentence)
- Processing timestamp for audit trails
- Batch identifier for grouped operations
- Success/failure status for each record
Reliability Features:
- Atomic operations ensure consistent Redis state
- Backup mechanisms handle Redis failures
- Monitoring alerts track Redis memory usage and performance
Step 5: Status Update Processing
Flow: Redis Cache → UpdateBigQueryStatus Command
The UpdateBigQueryStatus command operates on a 5-minute schedule to process accumulated record IDs from Redis and update the corresponding BigQuery source tables. This step completes the feedback loop ensuring BigQuery reflects the current synchronization state:
Batch Processing Logic:
- Retrieves batches of processed record IDs from Redis (configurable batch size: 1000 records)
- Groups records by data type for optimized BigQuery update operations
- Implements exponential backoff for BigQuery API rate limiting
Update Operations:
- Status Flag Updates: Sets
status = 1for successfully synchronized records - Timestamp Updates: Records processing completion time for audit purposes
- Batch Optimization: Minimizes BigQuery API calls through efficient batch updates
Error Recovery:
- Failed status updates are retried in subsequent command executions
- Persistent failures trigger alert notifications
- Redis entries are preserved until successful BigQuery updates are confirmed
Step 6: Synchronization Completion
Flow: UpdateBigQueryStatus Command → BigQuery Source Tables
The final step marks the completion of the synchronization cycle by updating BigQuery source tables and cleaning up temporary tracking data. This step ensures system cleanliness and prepares for the next synchronization cycle:
BigQuery Updates:
- Status Marking: Records are marked with
status = 1indicating successful synchronization - Audit Trail: Update timestamps provide synchronization history
- Query Optimization: Status flags enable efficient filtering for future sync operations
Cleanup Operations:
- Redis Cleanup: Successfully processed record IDs are removed from Redis cache
- Memory Management: Prevents Redis memory accumulation over time
- Log Generation: Comprehensive logging for operational monitoring and troubleshooting
Monitoring and Alerting:
- Success metrics are recorded for performance monitoring
- Failure notifications are sent to configured Slack channels
- System health dashboards reflect synchronization status and performance
Critical System Dependencies
Sequential Processing Requirements:
- Each step must complete successfully before the next step can initiate
- Failed steps trigger comprehensive error handling and retry mechanisms
- Data consistency is maintained through transaction management and rollback capabilities
Resource Management:
- Queue Workers: Sufficient worker capacity must be maintained for job processing
- Redis Memory: Adequate memory allocation prevents tracking data loss
- BigQuery Quotas: API rate limits and query quotas must be monitored and managed
Data Integrity Safeguards:
- Duplicate Prevention: Unique constraints and upsert operations prevent data duplication
- Referential Integrity: Foreign key constraints maintain relationship consistency
- Transaction Boundaries: Atomic operations ensure data consistency during failures
Monitoring and Observability:
- Performance Metrics: Processing times, throughput rates, and error frequencies
- System Health: Queue depth, Redis memory usage, and BigQuery quota consumption
- Alert Systems: Real-time notifications for failures, performance degradation, and capacity issues
Frequency Overview
Timeline
timeline
title BigQuery Sync Schedule
section Regular Operations
Every 30 minutes<br>(Ex. 08.00) : gcp sync-products
: gcp sync-reviews
: gcp sync-review_sentences
section Status Updates
Every 5 minutes<br>(Ex. 08.05) : update bigquery-status
section ...
section Daily Operations
Daily<br>(Ex. 00.00) : gcp sync-products --missed
: gcp sync-reviews --missed
: gcp sync-review_sentences --missed
Database Schema
erDiagram
products {
bigint id PK
integer mall_id "Mall identifier"
string mall_product_id "Product ID in mall system"
string jan_code "JAN code"
string input_type "Source of data"
string unique_key "System-generated unique key"
string ranking_description "Ranking description"
timestamp crawl_created_at "Timestamp from crawler"
timestamp bq_created_at "Timestamp from BigQuery"
timestamp created_at
timestamp updated_at
}
product_details {
bigint id PK
bigint product_id FK
string image "Product image URL"
string product_url "Product page URL"
string title "Product title"
double rating "Average rating"
integer num_reviews "Number of reviews"
string mall_shop_name "Shop name in mall"
string mall_shop_id "Shop ID in mall"
string maker_name "Manufacturer name"
double base_price "Base price"
double shipping_fee "Shipping cost"
double price "Calculated final price"
integer sales_one_month "Monthly sales"
integer point "Points"
json coupon "Coupon information"
string ranking_description "Ranking description"
string unique_key "System-generated unique key"
timestamp crawl_created_at "Timestamp from crawler"
timestamp bq_created_at "Timestamp from BigQuery"
timestamp created_at
timestamp updated_at
}
reviews {
bigint id PK
integer mall_id "Mall identifier"
integer review_count "Number of reviews by user"
integer vote "Helpful votes"
tinyint type "Review type (1=Non-buyer, 2=Buyer, 3=Vine)"
string variant "Variant options"
string age "Age of buyer"
string gender "Gender of buyer"
text content "Review content"
string mall_product_id "Product ID in mall system"
string jan_code "JAN code"
double rating "Numerical rating"
string shop_name "Shop name"
date post_date "Date of review"
bigint crawl_review_id "Unique review ID from crawler"
timestamp crawl_created_at "Timestamp from crawler"
timestamp bq_created_at "Timestamp from BigQuery"
timestamp created_at
timestamp updated_at
}
review_sentences {
bigint id PK
bigint crawl_review_id "Related review ID"
bigint sentence_id "Unique sentence ID"
string mall_product_id "Product ID in mall system"
string jan_code "JAN code"
longtext content "Individual sentence content"
double sentiment_score "Sentiment analysis score"
date post_date "Date of review"
timestamp crawl_created_at "Timestamp from crawler"
timestamp bq_created_at "Timestamp from BigQuery"
timestamp created_at
timestamp updated_at
}
products ||--o{ product_details : "has many"
reviews ||--o{ review_sentences : "contains"
Table Categories
BigQuery Source Tables
- products: Source product data with sync status tracking (configured via GCP_BQ_TB_PRODUCT)
- reviews: Source review data with sync status tracking (configured via GCP_BQ_TB_REVIEW)
- reviews_sentences: Source review sentence data with sync status tracking (configured via GCP_BQ_TB_REVIEW_SENTENCE)
Local Database Tables (gb_analyzer)
- products: Core product information synchronized from products
- product_details: Extended product details and pricing information synchronized from products
- reviews: Customer review data synchronized from reviews
- review_sentences: Individual sentences extracted from reviews synchronized from reviews_sentences
Expected Outcomes
When the BigQuery sync operations execute successfully, the system delivers:
Data Synchronization
- Product data from BigQuery is copied to local
productsandproduct_detailstables - Review data from BigQuery is copied to local
reviewstable - Review sentence data from BigQuery is copied to local
review_sentencestable - Status tracking ensures records are marked as synced in BigQuery after processing
System Operations
- Regular sync every 30 minutes keeps local database up-to-date with recent BigQuery data
- Daily missed data sync recovers any records that failed during regular sync
- Status updates every 5 minutes mark processed records as synced in BigQuery
- Redis tracking prevents data loss during the sync process
Batch List
| Batch | Description |
|---|---|
| Regular Sync | Commands that run every 30 minutes to sync data from BigQuery |
| Status Updates | Command that runs every 5 minutes to update sync status |
| Missed Data Sync | Commands that run daily to sync missed data from BigQuery |