Data Chain
Data Chain is a powerful feature in TrackVision AI that enables automatic data propagation and synchronization across related collections. It ensures data consistency by automatically updating dependent records when source data changes, creating a reliable chain of data dependencies throughout your system.
Overview
Data Chain automates the process of maintaining data consistency across related collections by defining rules that specify how changes to one collection should cascade to other collections. This eliminates manual data synchronization tasks and reduces the risk of data inconsistencies.
Key benefits:
- Automatic data synchronization across collections
- Real-time propagation of changes
- Configurable transformation rules
- Conflict resolution strategies
- Audit trails for all data changes
- Performance optimization for large datasets
Core Concepts
Chain Links
Chain links define the relationship between source and target collections, specifying how data should be propagated.
Link Types
- Direct Copy: Copy field values directly from source to target
- Computed Value: Calculate target values based on source data
- Aggregation: Summarize data from multiple source records
- Lookup: Retrieve related data from external sources
- Transformation: Apply custom logic to modify data during propagation
Link Configuration
{
"name": "Product to Inventory Sync",
"source_collection": "products",
"target_collection": "inventory",
"trigger_events": ["create", "update"],
"mapping": {
"product_id": "{{ source.id }}",
"product_name": "{{ source.name }}",
"category": "{{ source.category.name }}",
"reorder_level": "{{ source.minimum_stock }}",
"last_updated": "{{ now() }}"
},
"conditions": {
"source_filter": {
"status": "active",
"track_inventory": true
},
"target_filter": {
"type": "physical_product"
}
}
}
Chain Rules
Rules define the logic for data propagation, including when and how data should be synchronized.
Trigger Conditions
- Field Changes: React to specific field modifications
- Record States: Trigger based on record status or lifecycle
- Business Logic: Custom conditions based on business rules
- Time-based: Schedule-based synchronization
- External Events: React to external system changes
Propagation Strategies
- Immediate: Real-time synchronization as changes occur
- Batched: Collect changes and synchronize in groups
- Scheduled: Synchronize at predetermined intervals
- Manual: User-initiated synchronization
- Conditional: Synchronize only when specific conditions are met
Data Transformations
Apply transformations to data as it flows through the chain.
Built-in Transformations
- Field Mapping: Map source fields to target fields
- Data Type Conversion: Convert between different data types
- String Manipulation: Format, concatenate, or extract string data
- Mathematical Operations: Perform calculations on numeric data
- Date/Time Formatting: Convert and format date/time values
- Array Operations: Process array and list data
Custom Transformations
// Custom transformation function
function calculateDiscountedPrice(sourceData) {
const basePrice = sourceData.price;
const discountPercent = sourceData.discount_percentage || 0;
const customerTier = sourceData.customer?.tier || 'standard';
let finalDiscount = discountPercent;
// Apply tier-based additional discount
if (customerTier === 'premium') {
finalDiscount += 5;
} else if (customerTier === 'vip') {
finalDiscount += 10;
}
return {
original_price: basePrice,
discount_applied: finalDiscount,
final_price: basePrice * (1 - finalDiscount / 100)
};
}
Chain Configuration
Simple Chain Setup
Configure a basic data chain between two collections.
{
"chain_id": "customer_to_profile",
"description": "Sync customer data to user profiles",
"source": {
"collection": "customers",
"events": ["create", "update"],
"fields": ["name", "email", "phone", "company"]
},
"target": {
"collection": "user_profiles",
"action": "upsert",
"match_field": "customer_id"
},
"mapping": {
"customer_id": "{{ source.id }}",
"display_name": "{{ source.name }}",
"contact_email": "{{ source.email }}",
"phone_number": "{{ source.phone }}",
"organization": "{{ source.company.name }}"
},
"options": {
"create_if_missing": true,
"update_existing": true,
"delete_on_source_delete": false
}
}
Multi-Target Chains
Propagate data to multiple target collections from a single source.
{
"chain_id": "order_multi_sync",
"description": "Sync order data to multiple systems",
"source": {
"collection": "orders",
"events": ["create", "update"]
},
"targets": [
{
"collection": "accounting_entries",
"mapping": {
"order_id": "{{ source.id }}",
"amount": "{{ source.total_amount }}",
"transaction_date": "{{ source.created_at }}",
"account_code": "4000"
}
},
{
"collection": "inventory_transactions",
"mapping": {
"order_id": "{{ source.id }}",
"items": "{{ source.order_items }}",
"transaction_type": "sale"
}
},
{
"collection": "customer_analytics",
"mapping": {
"customer_id": "{{ source.customer_id }}",
"order_value": "{{ source.total_amount }}",
"order_date": "{{ source.created_at }}"
}
}
]
}
Conditional Chains
Apply different rules based on data conditions.
{
"chain_id": "product_tier_sync",
"description": "Sync products to different collections based on category",
"source": {
"collection": "products",
"events": ["create", "update"]
},
"conditional_targets": [
{
"condition": "{{ source.category === 'electronics' }}",
"target": {
"collection": "electronics_catalog",
"mapping": {
"product_id": "{{ source.id }}",
"model_number": "{{ source.sku }}",
"specifications": "{{ source.tech_specs }}",
"warranty_period": "{{ source.warranty_months }}"
}
}
},
{
"condition": "{{ source.category === 'clothing' }}",
"target": {
"collection": "fashion_catalog",
"mapping": {
"product_id": "{{ source.id }}",
"style_code": "{{ source.sku }}",
"sizes_available": "{{ source.size_options }}",
"care_instructions": "{{ source.care_info }}"
}
}
}
]
}
Advanced Features
Aggregation Chains
Automatically calculate and maintain aggregate values across collections.
Sum Aggregation
{
"chain_id": "order_total_aggregation",
"description": "Calculate customer total orders",
"source": {
"collection": "orders",
"events": ["create", "update", "delete"]
},
"target": {
"collection": "customers",
"match_field": "id",
"match_expression": "{{ source.customer_id }}"
},
"aggregation": {
"type": "sum",
"field": "total_amount",
"target_field": "total_order_value",
"filter": {
"status": "completed"
}
}
}
Count Aggregation
{
"chain_id": "product_review_count",
"description": "Maintain product review counts",
"source": {
"collection": "product_reviews",
"events": ["create", "delete"]
},
"target": {
"collection": "products",
"match_field": "id",
"match_expression": "{{ source.product_id }}"
},
"aggregation": {
"type": "count",
"target_field": "review_count",
"filter": {
"status": "approved"
}
}
}
Average Aggregation
{
"chain_id": "product_rating_average",
"description": "Calculate average product ratings",
"source": {
"collection": "product_reviews",
"events": ["create", "update", "delete"]
},
"target": {
"collection": "products",
"match_field": "id",
"match_expression": "{{ source.product_id }}"
},
"aggregation": {
"type": "average",
"field": "rating",
"target_field": "average_rating",
"filter": {
"status": "approved"
}
}
}
Lookup Chains
Enrich data by looking up related information from other collections or external sources.
Internal Lookup
{
"chain_id": "order_customer_enrichment",
"description": "Enrich orders with customer information",
"source": {
"collection": "orders",
"events": ["create"]
},
"target": {
"collection": "order_summaries"
},
"lookups": [
{
"source_field": "customer_id",
"lookup_collection": "customers",
"lookup_field": "id",
"return_fields": ["name", "tier", "discount_rate"],
"target_prefix": "customer_"
},
{
"source_field": "shipping_address_id",
"lookup_collection": "addresses",
"lookup_field": "id",
"return_fields": ["city", "state", "country"],
"target_prefix": "shipping_"
}
]
}
External Lookup
{
"chain_id": "product_external_enrichment",
"description": "Enrich products with external data",
"source": {
"collection": "products",
"events": ["create", "update"]
},
"target": {
"collection": "enriched_products"
},
"external_lookups": [
{
"api_endpoint": "https://api.supplier.com/products/{{ source.supplier_sku }}",
"method": "GET",
"headers": {
"Authorization": "Bearer {{ env.SUPPLIER_API_KEY }}"
},
"mapping": {
"supplier_price": "{{ response.wholesale_price }}",
"availability": "{{ response.stock_status }}",
"lead_time": "{{ response.delivery_days }}"
}
}
]
}
Cascade Chains
Create hierarchical data synchronization with parent-child relationships.
{
"chain_id": "category_product_cascade",
"description": "Cascade category changes to products",
"source": {
"collection": "categories",
"events": ["update"]
},
"cascade_targets": [
{
"collection": "products",
"relationship_field": "category_id",
"mapping": {
"category_name": "{{ source.name }}",
"category_path": "{{ source.full_path }}",
"tax_rate": "{{ source.default_tax_rate }}"
},
"propagate_to_children": true,
"child_collections": [
{
"collection": "product_variants",
"relationship_field": "product_id",
"mapping": {
"category_tax_rate": "{{ source.default_tax_rate }}"
}
}
]
}
]
}
Conflict Resolution
Handle conflicts when data synchronization encounters issues.
Conflict Types
Data Conflicts
- Concurrent Updates: Multiple sources updating the same target
- Data Type Mismatches: Source and target field type incompatibilities
- Validation Failures: Target data fails validation rules
- Missing Dependencies: Required related records don't exist
- Circular Dependencies: Chains create infinite loops
Resolution Strategies
Last Writer Wins
{
"conflict_resolution": {
"strategy": "last_writer_wins",
"timestamp_field": "updated_at",
"log_conflicts": true
}
}
Field Priority
{
"conflict_resolution": {
"strategy": "field_priority",
"priorities": {
"name": ["manual_entry", "import", "api_sync"],
"price": ["pricing_system", "manual_entry", "supplier_api"],
"inventory": ["warehouse_system", "manual_adjustment"]
}
}
}
Custom Resolution
function resolveConflict(sourceData, targetData, conflictField) {
const sourceValue = sourceData[conflictField];
const targetValue = targetData[conflictField];
// Custom business logic for resolution
if (conflictField === 'price') {
// Always use the higher price for safety
return Math.max(sourceValue, targetValue);
} else if (conflictField === 'status') {
// Use priority order: active > pending > inactive
const statusPriority = { active: 3, pending: 2, inactive: 1 };
return statusPriority[sourceValue] >= statusPriority[targetValue]
? sourceValue
: targetValue;
}
// Default to source value
return sourceValue;
}
Error Handling
Manage errors that occur during data chain execution.
Retry Mechanisms
{
"error_handling": {
"retry_policy": {
"max_attempts": 3,
"delay_seconds": [1, 5, 15],
"retry_conditions": [
"network_timeout",
"temporary_lock",
"rate_limit_exceeded"
]
},
"fallback_actions": [
{
"condition": "validation_error",
"action": "log_and_skip"
},
{
"condition": "missing_dependency",
"action": "create_placeholder"
}
]
}
}
Dead Letter Queue
{
"error_handling": {
"dead_letter_queue": {
"enabled": true,
"collection": "failed_chain_operations",
"retention_days": 30,
"notification": {
"email": "admin@company.com",
"threshold": 10
}
}
}
}
Performance Optimization
Batching and Queuing
Optimize performance for high-volume data synchronization.
Batch Processing
{
"performance": {
"batching": {
"enabled": true,
"batch_size": 100,
"batch_timeout_seconds": 30,
"parallel_batches": 5
}
}
}
Queue Management
{
"performance": {
"queue": {
"priority_levels": 3,
"max_queue_size": 10000,
"processing_threads": 10,
"priority_rules": [
{
"condition": "{{ source.priority === 'critical' }}",
"priority": 1
},
{
"condition": "{{ source.collection === 'orders' }}",
"priority": 2
}
]
}
}
}
Incremental Synchronization
Optimize by only synchronizing changed data.
{
"incremental_sync": {
"enabled": true,
"change_tracking": {
"method": "timestamp",
"field": "updated_at"
},
"checkpoint": {
"storage": "database",
"table": "sync_checkpoints"
},
"full_sync_schedule": "0 2 * * 0"
}
}
Indexing Strategy
Optimize database performance for chain operations.
-- Indexes for chain source collections
CREATE INDEX idx_products_updated_at ON products(updated_at);
CREATE INDEX idx_orders_customer_id_status ON orders(customer_id, status);
-- Indexes for chain target collections
CREATE INDEX idx_inventory_product_id ON inventory(product_id);
CREATE INDEX idx_customer_analytics_customer_id ON customer_analytics(customer_id);
-- Composite indexes for complex queries
CREATE INDEX idx_chain_operations_status_created ON chain_operations(status, created_at);
Monitoring and Analytics
Chain Performance Metrics
Track the performance and health of data chains.
Key Metrics
- Synchronization Latency: Time between source change and target update
- Throughput: Number of records processed per unit time
- Error Rate: Percentage of failed synchronization attempts
- Queue Depth: Number of pending operations
- Resource Utilization: CPU, memory, and database usage
Performance Dashboard
{
"dashboard": {
"widgets": [
{
"type": "metric",
"title": "Average Sync Latency",
"query": "SELECT AVG(processing_time_ms) FROM chain_operations WHERE created_at >= NOW() - INTERVAL 1 HOUR"
},
{
"type": "chart",
"title": "Sync Volume Over Time",
"query": "SELECT DATE_TRUNC('minute', created_at) as time, COUNT(*) as operations FROM chain_operations GROUP BY time ORDER BY time"
},
{
"type": "alert",
"title": "Failed Operations",
"query": "SELECT COUNT(*) FROM chain_operations WHERE status = 'failed' AND created_at >= NOW() - INTERVAL 15 MINUTES",
"threshold": 10
}
]
}
}
Data Lineage Tracking
Maintain visibility into data flow and transformations.
Lineage Records
{
"lineage_record": {
"operation_id": "chain_op_12345",
"chain_id": "customer_to_profile",
"source": {
"collection": "customers",
"record_id": "cust_789",
"version": 5
},
"targets": [
{
"collection": "user_profiles",
"record_id": "profile_456",
"operation": "update"
}
],
"transformations": [
{
"field": "display_name",
"source_value": "John Smith",
"target_value": "John Smith",
"transformation": "direct_copy"
}
],
"execution_time": "2024-01-15T10:30:00Z",
"duration_ms": 125
}
}
Audit and Compliance
Maintain comprehensive audit trails for regulatory compliance.
Audit Configuration
{
"audit": {
"enabled": true,
"retention_days": 2555,
"fields_to_track": [
"all_changes",
"source_data",
"target_data",
"transformation_rules",
"user_context"
],
"compliance_tags": [
"gdpr",
"hipaa",
"sox"
]
}
}
Best Practices
Chain Design
Single Responsibility
- Focused Purpose: Each chain should serve one specific business need
- Clear Boundaries: Define clear source and target scope
- Minimal Dependencies: Reduce complex interdependencies between chains
- Modular Design: Create reusable chain components
- Documentation: Maintain clear documentation of chain purpose and logic
Data Consistency
- Idempotent Operations: Ensure chains can be safely re-executed
- Transaction Boundaries: Define appropriate transaction scope
- Rollback Procedures: Plan for handling failed operations
- Data Validation: Validate data at each transformation step
- Consistency Checks: Implement regular data consistency verification
Performance and Scalability
Optimization Strategies
- Batch Processing: Group operations for efficiency
- Async Execution: Use asynchronous processing for non-critical chains
- Resource Management: Monitor and optimize resource usage
- Index Optimization: Ensure proper database indexing
- Caching: Cache frequently accessed data and transformation rules
Scalability Planning
- Load Testing: Test chains with realistic data volumes
- Capacity Planning: Plan for data growth and increased load
- Horizontal Scaling: Design chains to scale across multiple servers
- Resource Monitoring: Monitor resource usage and performance trends
- Bottleneck Identification: Identify and address performance bottlenecks
Security and Compliance
Data Protection
- Access Controls: Implement proper user permissions for chains
- Data Encryption: Encrypt sensitive data during transmission
- Audit Logging: Maintain comprehensive audit trails
- Data Minimization: Only synchronize necessary data
- Retention Policies: Implement appropriate data retention policies
Compliance Requirements
- Regulatory Compliance: Follow relevant data protection regulations
- Data Governance: Implement data governance policies
- Change Management: Document and approve chain changes
- Security Reviews: Regular security assessments of chains
- Incident Response: Plan for handling security incidents
Maintenance and Operations
Monitoring and Alerting
- Health Monitoring: Monitor chain health and performance
- Error Alerting: Set up alerts for chain failures
- Performance Tracking: Track key performance metrics
- Capacity Alerts: Alert on resource usage thresholds
- Business Impact Monitoring: Monitor business impact of chain operations
Change Management
- Version Control: Maintain versions of chain configurations
- Testing Procedures: Test chain changes in development environments
- Rollback Plans: Plan for reverting problematic changes
- Documentation Updates: Keep documentation current with changes
- Impact Assessment: Assess impact of changes before deployment