Skip to main content

Data Chain

Data Chain is a powerful feature in TrackVision AI that enables automatic data propagation and synchronization across related collections. It ensures data consistency by automatically updating dependent records when source data changes, creating a reliable chain of data dependencies throughout your system.

Overview

Data Chain automates the process of maintaining data consistency across related collections by defining rules that specify how changes to one collection should cascade to other collections. This eliminates manual data synchronization tasks and reduces the risk of data inconsistencies.

Key benefits:

  • Automatic data synchronization across collections
  • Real-time propagation of changes
  • Configurable transformation rules
  • Conflict resolution strategies
  • Audit trails for all data changes
  • Performance optimization for large datasets

Core Concepts

Chain links define the relationship between source and target collections, specifying how data should be propagated.

  • Direct Copy: Copy field values directly from source to target
  • Computed Value: Calculate target values based on source data
  • Aggregation: Summarize data from multiple source records
  • Lookup: Retrieve related data from external sources
  • Transformation: Apply custom logic to modify data during propagation
{
"name": "Product to Inventory Sync",
"source_collection": "products",
"target_collection": "inventory",
"trigger_events": ["create", "update"],
"mapping": {
"product_id": "{{ source.id }}",
"product_name": "{{ source.name }}",
"category": "{{ source.category.name }}",
"reorder_level": "{{ source.minimum_stock }}",
"last_updated": "{{ now() }}"
},
"conditions": {
"source_filter": {
"status": "active",
"track_inventory": true
},
"target_filter": {
"type": "physical_product"
}
}
}

Chain Rules

Rules define the logic for data propagation, including when and how data should be synchronized.

Trigger Conditions

  • Field Changes: React to specific field modifications
  • Record States: Trigger based on record status or lifecycle
  • Business Logic: Custom conditions based on business rules
  • Time-based: Schedule-based synchronization
  • External Events: React to external system changes

Propagation Strategies

  • Immediate: Real-time synchronization as changes occur
  • Batched: Collect changes and synchronize in groups
  • Scheduled: Synchronize at predetermined intervals
  • Manual: User-initiated synchronization
  • Conditional: Synchronize only when specific conditions are met

Data Transformations

Apply transformations to data as it flows through the chain.

Built-in Transformations

  • Field Mapping: Map source fields to target fields
  • Data Type Conversion: Convert between different data types
  • String Manipulation: Format, concatenate, or extract string data
  • Mathematical Operations: Perform calculations on numeric data
  • Date/Time Formatting: Convert and format date/time values
  • Array Operations: Process array and list data

Custom Transformations

// Custom transformation function
function calculateDiscountedPrice(sourceData) {
const basePrice = sourceData.price;
const discountPercent = sourceData.discount_percentage || 0;
const customerTier = sourceData.customer?.tier || 'standard';

let finalDiscount = discountPercent;

// Apply tier-based additional discount
if (customerTier === 'premium') {
finalDiscount += 5;
} else if (customerTier === 'vip') {
finalDiscount += 10;
}

return {
original_price: basePrice,
discount_applied: finalDiscount,
final_price: basePrice * (1 - finalDiscount / 100)
};
}

Chain Configuration

Simple Chain Setup

Configure a basic data chain between two collections.

{
"chain_id": "customer_to_profile",
"description": "Sync customer data to user profiles",
"source": {
"collection": "customers",
"events": ["create", "update"],
"fields": ["name", "email", "phone", "company"]
},
"target": {
"collection": "user_profiles",
"action": "upsert",
"match_field": "customer_id"
},
"mapping": {
"customer_id": "{{ source.id }}",
"display_name": "{{ source.name }}",
"contact_email": "{{ source.email }}",
"phone_number": "{{ source.phone }}",
"organization": "{{ source.company.name }}"
},
"options": {
"create_if_missing": true,
"update_existing": true,
"delete_on_source_delete": false
}
}

Multi-Target Chains

Propagate data to multiple target collections from a single source.

{
"chain_id": "order_multi_sync",
"description": "Sync order data to multiple systems",
"source": {
"collection": "orders",
"events": ["create", "update"]
},
"targets": [
{
"collection": "accounting_entries",
"mapping": {
"order_id": "{{ source.id }}",
"amount": "{{ source.total_amount }}",
"transaction_date": "{{ source.created_at }}",
"account_code": "4000"
}
},
{
"collection": "inventory_transactions",
"mapping": {
"order_id": "{{ source.id }}",
"items": "{{ source.order_items }}",
"transaction_type": "sale"
}
},
{
"collection": "customer_analytics",
"mapping": {
"customer_id": "{{ source.customer_id }}",
"order_value": "{{ source.total_amount }}",
"order_date": "{{ source.created_at }}"
}
}
]
}

Conditional Chains

Apply different rules based on data conditions.

{
"chain_id": "product_tier_sync",
"description": "Sync products to different collections based on category",
"source": {
"collection": "products",
"events": ["create", "update"]
},
"conditional_targets": [
{
"condition": "{{ source.category === 'electronics' }}",
"target": {
"collection": "electronics_catalog",
"mapping": {
"product_id": "{{ source.id }}",
"model_number": "{{ source.sku }}",
"specifications": "{{ source.tech_specs }}",
"warranty_period": "{{ source.warranty_months }}"
}
}
},
{
"condition": "{{ source.category === 'clothing' }}",
"target": {
"collection": "fashion_catalog",
"mapping": {
"product_id": "{{ source.id }}",
"style_code": "{{ source.sku }}",
"sizes_available": "{{ source.size_options }}",
"care_instructions": "{{ source.care_info }}"
}
}
}
]
}

Advanced Features

Aggregation Chains

Automatically calculate and maintain aggregate values across collections.

Sum Aggregation

{
"chain_id": "order_total_aggregation",
"description": "Calculate customer total orders",
"source": {
"collection": "orders",
"events": ["create", "update", "delete"]
},
"target": {
"collection": "customers",
"match_field": "id",
"match_expression": "{{ source.customer_id }}"
},
"aggregation": {
"type": "sum",
"field": "total_amount",
"target_field": "total_order_value",
"filter": {
"status": "completed"
}
}
}

Count Aggregation

{
"chain_id": "product_review_count",
"description": "Maintain product review counts",
"source": {
"collection": "product_reviews",
"events": ["create", "delete"]
},
"target": {
"collection": "products",
"match_field": "id",
"match_expression": "{{ source.product_id }}"
},
"aggregation": {
"type": "count",
"target_field": "review_count",
"filter": {
"status": "approved"
}
}
}

Average Aggregation

{
"chain_id": "product_rating_average",
"description": "Calculate average product ratings",
"source": {
"collection": "product_reviews",
"events": ["create", "update", "delete"]
},
"target": {
"collection": "products",
"match_field": "id",
"match_expression": "{{ source.product_id }}"
},
"aggregation": {
"type": "average",
"field": "rating",
"target_field": "average_rating",
"filter": {
"status": "approved"
}
}
}

Lookup Chains

Enrich data by looking up related information from other collections or external sources.

Internal Lookup

{
"chain_id": "order_customer_enrichment",
"description": "Enrich orders with customer information",
"source": {
"collection": "orders",
"events": ["create"]
},
"target": {
"collection": "order_summaries"
},
"lookups": [
{
"source_field": "customer_id",
"lookup_collection": "customers",
"lookup_field": "id",
"return_fields": ["name", "tier", "discount_rate"],
"target_prefix": "customer_"
},
{
"source_field": "shipping_address_id",
"lookup_collection": "addresses",
"lookup_field": "id",
"return_fields": ["city", "state", "country"],
"target_prefix": "shipping_"
}
]
}

External Lookup

{
"chain_id": "product_external_enrichment",
"description": "Enrich products with external data",
"source": {
"collection": "products",
"events": ["create", "update"]
},
"target": {
"collection": "enriched_products"
},
"external_lookups": [
{
"api_endpoint": "https://api.supplier.com/products/{{ source.supplier_sku }}",
"method": "GET",
"headers": {
"Authorization": "Bearer {{ env.SUPPLIER_API_KEY }}"
},
"mapping": {
"supplier_price": "{{ response.wholesale_price }}",
"availability": "{{ response.stock_status }}",
"lead_time": "{{ response.delivery_days }}"
}
}
]
}

Cascade Chains

Create hierarchical data synchronization with parent-child relationships.

{
"chain_id": "category_product_cascade",
"description": "Cascade category changes to products",
"source": {
"collection": "categories",
"events": ["update"]
},
"cascade_targets": [
{
"collection": "products",
"relationship_field": "category_id",
"mapping": {
"category_name": "{{ source.name }}",
"category_path": "{{ source.full_path }}",
"tax_rate": "{{ source.default_tax_rate }}"
},
"propagate_to_children": true,
"child_collections": [
{
"collection": "product_variants",
"relationship_field": "product_id",
"mapping": {
"category_tax_rate": "{{ source.default_tax_rate }}"
}
}
]
}
]
}

Conflict Resolution

Handle conflicts when data synchronization encounters issues.

Conflict Types

Data Conflicts

  • Concurrent Updates: Multiple sources updating the same target
  • Data Type Mismatches: Source and target field type incompatibilities
  • Validation Failures: Target data fails validation rules
  • Missing Dependencies: Required related records don't exist
  • Circular Dependencies: Chains create infinite loops

Resolution Strategies

Last Writer Wins
{
"conflict_resolution": {
"strategy": "last_writer_wins",
"timestamp_field": "updated_at",
"log_conflicts": true
}
}
Field Priority
{
"conflict_resolution": {
"strategy": "field_priority",
"priorities": {
"name": ["manual_entry", "import", "api_sync"],
"price": ["pricing_system", "manual_entry", "supplier_api"],
"inventory": ["warehouse_system", "manual_adjustment"]
}
}
}
Custom Resolution
function resolveConflict(sourceData, targetData, conflictField) {
const sourceValue = sourceData[conflictField];
const targetValue = targetData[conflictField];

// Custom business logic for resolution
if (conflictField === 'price') {
// Always use the higher price for safety
return Math.max(sourceValue, targetValue);
} else if (conflictField === 'status') {
// Use priority order: active > pending > inactive
const statusPriority = { active: 3, pending: 2, inactive: 1 };
return statusPriority[sourceValue] >= statusPriority[targetValue]
? sourceValue
: targetValue;
}

// Default to source value
return sourceValue;
}

Error Handling

Manage errors that occur during data chain execution.

Retry Mechanisms

{
"error_handling": {
"retry_policy": {
"max_attempts": 3,
"delay_seconds": [1, 5, 15],
"retry_conditions": [
"network_timeout",
"temporary_lock",
"rate_limit_exceeded"
]
},
"fallback_actions": [
{
"condition": "validation_error",
"action": "log_and_skip"
},
{
"condition": "missing_dependency",
"action": "create_placeholder"
}
]
}
}

Dead Letter Queue

{
"error_handling": {
"dead_letter_queue": {
"enabled": true,
"collection": "failed_chain_operations",
"retention_days": 30,
"notification": {
"email": "admin@company.com",
"threshold": 10
}
}
}
}

Performance Optimization

Batching and Queuing

Optimize performance for high-volume data synchronization.

Batch Processing

{
"performance": {
"batching": {
"enabled": true,
"batch_size": 100,
"batch_timeout_seconds": 30,
"parallel_batches": 5
}
}
}

Queue Management

{
"performance": {
"queue": {
"priority_levels": 3,
"max_queue_size": 10000,
"processing_threads": 10,
"priority_rules": [
{
"condition": "{{ source.priority === 'critical' }}",
"priority": 1
},
{
"condition": "{{ source.collection === 'orders' }}",
"priority": 2
}
]
}
}
}

Incremental Synchronization

Optimize by only synchronizing changed data.

{
"incremental_sync": {
"enabled": true,
"change_tracking": {
"method": "timestamp",
"field": "updated_at"
},
"checkpoint": {
"storage": "database",
"table": "sync_checkpoints"
},
"full_sync_schedule": "0 2 * * 0"
}
}

Indexing Strategy

Optimize database performance for chain operations.

-- Indexes for chain source collections
CREATE INDEX idx_products_updated_at ON products(updated_at);
CREATE INDEX idx_orders_customer_id_status ON orders(customer_id, status);

-- Indexes for chain target collections
CREATE INDEX idx_inventory_product_id ON inventory(product_id);
CREATE INDEX idx_customer_analytics_customer_id ON customer_analytics(customer_id);

-- Composite indexes for complex queries
CREATE INDEX idx_chain_operations_status_created ON chain_operations(status, created_at);

Monitoring and Analytics

Chain Performance Metrics

Track the performance and health of data chains.

Key Metrics

  • Synchronization Latency: Time between source change and target update
  • Throughput: Number of records processed per unit time
  • Error Rate: Percentage of failed synchronization attempts
  • Queue Depth: Number of pending operations
  • Resource Utilization: CPU, memory, and database usage

Performance Dashboard

{
"dashboard": {
"widgets": [
{
"type": "metric",
"title": "Average Sync Latency",
"query": "SELECT AVG(processing_time_ms) FROM chain_operations WHERE created_at >= NOW() - INTERVAL 1 HOUR"
},
{
"type": "chart",
"title": "Sync Volume Over Time",
"query": "SELECT DATE_TRUNC('minute', created_at) as time, COUNT(*) as operations FROM chain_operations GROUP BY time ORDER BY time"
},
{
"type": "alert",
"title": "Failed Operations",
"query": "SELECT COUNT(*) FROM chain_operations WHERE status = 'failed' AND created_at >= NOW() - INTERVAL 15 MINUTES",
"threshold": 10
}
]
}
}

Data Lineage Tracking

Maintain visibility into data flow and transformations.

Lineage Records

{
"lineage_record": {
"operation_id": "chain_op_12345",
"chain_id": "customer_to_profile",
"source": {
"collection": "customers",
"record_id": "cust_789",
"version": 5
},
"targets": [
{
"collection": "user_profiles",
"record_id": "profile_456",
"operation": "update"
}
],
"transformations": [
{
"field": "display_name",
"source_value": "John Smith",
"target_value": "John Smith",
"transformation": "direct_copy"
}
],
"execution_time": "2024-01-15T10:30:00Z",
"duration_ms": 125
}
}

Audit and Compliance

Maintain comprehensive audit trails for regulatory compliance.

Audit Configuration

{
"audit": {
"enabled": true,
"retention_days": 2555,
"fields_to_track": [
"all_changes",
"source_data",
"target_data",
"transformation_rules",
"user_context"
],
"compliance_tags": [
"gdpr",
"hipaa",
"sox"
]
}
}

Best Practices

Chain Design

Single Responsibility

  1. Focused Purpose: Each chain should serve one specific business need
  2. Clear Boundaries: Define clear source and target scope
  3. Minimal Dependencies: Reduce complex interdependencies between chains
  4. Modular Design: Create reusable chain components
  5. Documentation: Maintain clear documentation of chain purpose and logic

Data Consistency

  1. Idempotent Operations: Ensure chains can be safely re-executed
  2. Transaction Boundaries: Define appropriate transaction scope
  3. Rollback Procedures: Plan for handling failed operations
  4. Data Validation: Validate data at each transformation step
  5. Consistency Checks: Implement regular data consistency verification

Performance and Scalability

Optimization Strategies

  1. Batch Processing: Group operations for efficiency
  2. Async Execution: Use asynchronous processing for non-critical chains
  3. Resource Management: Monitor and optimize resource usage
  4. Index Optimization: Ensure proper database indexing
  5. Caching: Cache frequently accessed data and transformation rules

Scalability Planning

  1. Load Testing: Test chains with realistic data volumes
  2. Capacity Planning: Plan for data growth and increased load
  3. Horizontal Scaling: Design chains to scale across multiple servers
  4. Resource Monitoring: Monitor resource usage and performance trends
  5. Bottleneck Identification: Identify and address performance bottlenecks

Security and Compliance

Data Protection

  1. Access Controls: Implement proper user permissions for chains
  2. Data Encryption: Encrypt sensitive data during transmission
  3. Audit Logging: Maintain comprehensive audit trails
  4. Data Minimization: Only synchronize necessary data
  5. Retention Policies: Implement appropriate data retention policies

Compliance Requirements

  1. Regulatory Compliance: Follow relevant data protection regulations
  2. Data Governance: Implement data governance policies
  3. Change Management: Document and approve chain changes
  4. Security Reviews: Regular security assessments of chains
  5. Incident Response: Plan for handling security incidents

Maintenance and Operations

Monitoring and Alerting

  1. Health Monitoring: Monitor chain health and performance
  2. Error Alerting: Set up alerts for chain failures
  3. Performance Tracking: Track key performance metrics
  4. Capacity Alerts: Alert on resource usage thresholds
  5. Business Impact Monitoring: Monitor business impact of chain operations

Change Management

  1. Version Control: Maintain versions of chain configurations
  2. Testing Procedures: Test chain changes in development environments
  3. Rollback Plans: Plan for reverting problematic changes
  4. Documentation Updates: Keep documentation current with changes
  5. Impact Assessment: Assess impact of changes before deployment