Skip to content

Usage Change Detection Sensor

Overview

The usage_change_detection_sensor is a polling sensor that automatically detects changes in the billing.usage source table and triggers rematerialization of affected partner partitions through the medallion architecture (bronze → silver → gold layers).

Key Features

  • Polling Interval: 5 minutes (300 seconds) - configurable
  • Change Detection: Timestamp-based on usage_modified column per dealer
  • Partition-Aware: Triggers only affected partner partitions in silver layer
  • Medallion Architecture: Bronze layer is simple and partitionless, silver layer is partitioned
  • Resilient: Database failures don't block sensor, partial failures logged as warnings
  • Automatic Downstream Propagation: Uses AutomationCondition.eager() on downstream assets

Architecture

Medallion Architecture Pattern

The usage sensor follows a medallion (bronze → silver → gold) architecture:

billing.usage (MySQL Source)
    │
    ├─ [Sensor polls every 5 min]
    │   └─ Queries MAX(usage_modified) per dealer_id
    │   └─ Maps dealer_ids to partner UUIDs
    │   └─ Compares with cursor state
    │
    ▼ (on change detected)
    │
billing_usage_raw (Bronze Layer - Partitionless)
    │   └─ Simple query pulling all usage data
    │   └─ Filters: system_id=3, service_status_id IN (-3,3,10,11)
    │
    ▼ (AutomationCondition.eager)
    │
billing_usage_aggregated (Silver Layer - Partitioned by Partner)
    │   └─ Maps dealer_id to partner UUID
    │   └─ Filters to partition's partner
    │   └─ Only affected partitions rematerialize
    │
    ▼ (AutomationCondition.eager)
    │
billing_monthly_usage_by_service (Silver Layer - Partitioned by Partner)
    │   └─ Monthly aggregation per service
    │
    ▼ (AutomationCondition.eager)
    │
Downstream Report Assets (~2-3 reports):
    ├─ usage_report (Partitioned by partner)
    │   └─ AutomationCondition.eager()
    ├─ active_services (Partitioned by partner)
    │   └─ AutomationCondition.eager()

Change Detection Mechanism

Timestamp-Based Strategy

The sensor uses a simple, efficient timestamp comparison approach:

Strategy: 1. Query MAX(usage_modified) grouped by dealer_id 2. Compare with previous cursor state per dealer 3. Map changed dealers to partner UUIDs 4. Trigger bronze (always) + affected silver partitions

Trigger Conditions: 1. First run: No cursor exists → trigger all 2. Dealer timestamp changed: MAX(usage_modified) for dealer changed → trigger that dealer's partner partition 3. New dealer: Dealer appears with data → trigger that partner partition

Implementation: usage_change_detection_sensor() in src/vanguard_wholesale/defs/sensors.py:318

Cursor State Management

The sensor maintains state using a JSON cursor:

{
  "global_last_modified": "2026-01-20T12:00:00Z",
  "dealer_last_modified": {
    "1": "2026-01-20T12:00:00Z",
    "2": "2026-01-20T11:30:00Z",
    "3": null
  }
}

Fields: - global_last_modified: Overall MAX(usage_modified) across all dealers (optimization check) - dealer_last_modified: Per-dealer timestamps for granular change detection

Cursor Location: Stored in Dagster's sensor evaluation context (MySQL-backed)


Configuration

Sensor Configuration

File: src/vanguard_wholesale/defs/sensors.py:314

Key Parameters:

# Configurable polling interval (default 5 minutes)
USAGE_SENSOR_INTERVAL_SECONDS = 300

@dg.sensor(
    name="usage_change_detection_sensor",
    minimum_interval_seconds=USAGE_SENSOR_INTERVAL_SECONDS,
    asset_selection=dg.AssetSelection.keys("billing_usage_aggregated"),
)

To adjust polling interval: Edit USAGE_SENSOR_INTERVAL_SECONDS constant

Query Filters

The sensor uses the same filters as billing_usage_raw asset:

File: src/vanguard_wholesale/defs/sensors.py:389

FROM billing.usage AS usage_model
LEFT OUTER JOIN billing.v4_wh_service_data AS usageServiceData
    ON usageServiceData.service_data_FK_customer_id = usage_model.usage_FK_customer_id
        AND usageServiceData.service_data_FK_service_id = usage_model.usage_FK_service_id
        AND usageServiceData.service_data_FK_service_status_id IN (-3, 3, 10, 11)
        AND usageServiceData.service_data_csa_category_id IN (1, 2, 3)
WHERE usageServiceData.service_data_system_id = 3

Key Filters: - system_id = 3: Only Vanguard services - service_status_id IN (-3, 3, 10, 11): Active/relevant service statuses - csa_category_id IN (1, 2, 3): CSA category filtering


Asset Configuration

Bronze Layer (Partitionless)

Asset: billing_usage_raw File: src/vanguard_wholesale/defs/assets/billing_cdrs_bronze.py:42

@dg.asset(
    key="billing_usage_raw",
    io_manager_key="polars_io_manager",
    kinds=["mysql", "polars"],
    group_name="billing_bronze",
    # No partitions_def - simple bronze extract
)

Design Philosophy: - Keep bronze layer simple - just extract raw data - No partitioning at bronze - reduces query complexity - Let silver layer handle partitioning and dealer mapping

Silver Layer (Partitioned)

Asset: billing_usage_aggregated File: src/vanguard_wholesale/defs/assets/billing_cdrs_silver.py:29

@dg.asset(
    key="billing_usage_aggregated",
    partitions_def=partner_partition_def,
    automation_condition=dg.AutomationCondition.eager(),
    # Consumes billing_usage_raw (bronze)
)

Asset: billing_monthly_usage_by_service File: src/vanguard_wholesale/defs/assets/billing_cdrs_silver.py:255

@dg.asset(
    key="billing_monthly_usage_by_service",
    partitions_def=partner_partition_def,
    automation_condition=dg.AutomationCondition.eager(),
    # Consumes billing_usage_aggregated
)

Design Philosophy: - Silver layer filters bronze data by partner partition - Uses dealer_partner_mapping asset to map dealer_id → partner UUID - Only rematerializes affected partitions when triggered by sensor


Run Request Flow

Example: Two Dealers Updated

Scenario: Dealers 123 and 456 have new usage records

Sensor Logic: 1. Query detects MAX(usage_modified) changed for dealers 123, 456 2. Query maps: dealer 123 → partner-A, dealer 456 → partner-B 3. Create run requests: - billing_usage_raw (bronze, no partition) - billing_usage_aggregated[partner-A] (silver partition) - billing_usage_aggregated[partner-B] (silver partition)

Asset Execution: 1. billing_usage_raw materializes (pulls ALL usage data) 2. billing_usage_aggregated[partner-A] materializes (filters to partner-A) 3. billing_usage_aggregated[partner-B] materializes (filters to partner-B) 4. billing_monthly_usage_by_service[partner-A] auto-materializes (eager) 5. billing_monthly_usage_by_service[partner-B] auto-materializes (eager) 6. usage_report[partner-A] auto-materializes (eager) 7. usage_report[partner-B] auto-materializes (eager) 8. active_services[partner-A] auto-materializes (eager) 9. active_services[partner-B] auto-materializes (eager)

Efficiency: Only 2 partner partitions rematerialize, not all 10-20 partners


Monitoring

Dagster UI

  1. Navigate to: Sensors → usage_change_detection_sensor
  2. View:
  3. Sensor evaluation history
  4. Success/failure rate
  5. Cursor state (click "View cursor" on latest evaluation)
  6. Triggered runs per evaluation

Key Metrics to Monitor

Metric Location Expected Value Alert Threshold
Sensor evaluation success rate Dagster UI → Sensors >95% <90%
RunRequests per evaluation Sensor logs 0-5 (varies) >15 (investigate)
Sensor evaluation duration Sensor logs <30 seconds >2 minutes
Change detection frequency Sensor logs Varies by workload N/A

Log Levels

  • INFO: Normal change detection, run request creation
  • WARNING: Query failures (non-fatal), corrupted cursor recovery
  • ERROR: Critical failures (entire sensor fails)
  • DEBUG: No-change confirmations

Example Log Messages

INFO: Checking dealer timestamps for usage changes
INFO: Found usage data for 12 dealer(s)
INFO: Detected changes for 2 dealer(s): [123, 456]
INFO: Mapped to 2 affected partner partition(s)
INFO: Creating 3 run requests: 1 bronze (billing_usage_raw) + 2 silver partitions
WARNING: Failed to query usage timestamps: ConnectionTimeout

Troubleshooting

Symptom: Sensor not triggering despite data changes

Possible Causes: 1. usage_modified column not updating in source table 2. Sensor is disabled in Dagster UI 3. Data filtered out by WHERE clause (system_id, service_status_id) 4. Dealer not mapped to any partner UUID

Diagnosis:

# Check sensor status
dagster sensor list

# Check sensor cursor
dagster sensor cursor usage_change_detection_sensor

# Manually query billing.usage
mysql> SELECT dealer_id, MAX(usage_modified)
       FROM billing.usage AS usage_model
       JOIN billing.v4_wh_service_data AS sd ...
       WHERE sd.service_data_system_id = 3
       GROUP BY dealer_id;

Solution: - Verify usage_modified timestamps are updating in source - Enable sensor in Dagster UI - Verify data matches filter criteria - Check dealer_partner_mapping asset has correct mappings


Symptom: Sensor triggering too frequently (false positives)

Possible Causes: 1. Timestamp precision issues causing minor fluctuations 2. usage_modified updating on every row touch (not just data changes)

Diagnosis:

# In Dagster Python environment, check timestamp stability:
from vanguard_wholesale.defs.sensors import usage_change_detection_sensor

# Query usage table multiple times and compare timestamps

Solution: - Consider adding threshold (only trigger if changed > X minutes ago) - Review source table's usage_modified update logic - May need to add checksum validation in addition to timestamp


Symptom: Bronze rematerializing but silver partitions not auto-materializing

Possible Causes: 1. AutomationCondition.eager() not configured on silver assets 2. Asset has errors preventing materialization 3. Dagster daemon not running

Diagnosis:

# Check asset configuration
dagster asset info billing_usage_aggregated | grep "automation"

# Check daemon status
dagster daemon liveness

# Check run queue
dagster run list --status QUEUED

Solution: - Verify automation_condition=dg.AutomationCondition.eager() in asset decorator - Fix asset errors (check Dagster UI → Runs → Failed) - Start Dagster daemon: dagster-daemon run


Symptom: Sensor evaluation taking too long (>1 min)

Possible Causes: 1. Slow query on billing.usage table (missing indexes) 2. Large number of dealers (50+) 3. Network latency to billing database

Diagnosis:

# Check query performance
EXPLAIN SELECT dealer_id, MAX(usage_modified)
FROM billing.usage AS usage_model
JOIN billing.v4_wh_service_data ...
GROUP BY dealer_id;

# Check index on usage_modified
SHOW INDEX FROM billing.usage;

Solution: - Add index: CREATE INDEX idx_usage_modified ON billing.usage (usage_modified); - Add composite index: CREATE INDEX idx_usage_service ON billing.usage (usage_FK_customer_id, usage_FK_service_id); - Increase sensor timeout if needed (edit minimum_interval_seconds)


Symptom: All partitions triggering instead of just changed ones

Possible Causes: 1. Bronze layer triggering downstream incorrectly 2. Sensor creating RunRequests without partition_key 3. AutomationCondition misconfigured

Diagnosis: - Check sensor logs for RunRequest details - Verify partition_key is set in silver layer RunRequests - Check asset_selection in RunRequest

Solution: - Ensure sensor creates separate RunRequests: one for bronze (no partition), individual for each affected silver partition - Verify silver assets have partitions_def=partner_partition_def


Operational Procedures

How to Manually Trigger Usage Rematerialization

Option 1: Dagster UI (Recommended) 1. Navigate to Assets → billing_usage_raw 2. Click "Materialize" 3. Click "Launch run" 4. Downstream partitions will auto-materialize via AutomationCondition

Option 2: CLI

# Materialize bronze (triggers cascade)
dagster asset materialize --select billing_usage_raw

# Or materialize specific silver partition
dagster asset materialize \
  --select billing_usage_aggregated \
  --partition partner-uuid-123


How to Disable Sensor Temporarily

Dagster UI: 1. Navigate to Sensors → usage_change_detection_sensor 2. Click "Stop sensor" 3. Confirm

Note: Sensor will not evaluate while stopped. No changes will be detected.


How to Reset Sensor Cursor

Use Case: Cursor corrupted or want to force re-check all dealers

# Clear cursor (triggers first-run behavior for all dealers)
dagster sensor cursor usage_change_detection_sensor --delete

# Manually set cursor (advanced)
dagster sensor cursor usage_change_detection_sensor --set '{
  "global_last_modified": null,
  "dealer_last_modified": {}
}'

Warning: Clearing cursor will trigger bronze + ALL partner partitions on next evaluation if any data exists.


How to Adjust Polling Interval

File: src/vanguard_wholesale/defs/sensors.py:314

# Change from 5 minutes to 10 minutes
USAGE_SENSOR_INTERVAL_SECONDS = 600

# Change to 1 minute (aggressive polling)
USAGE_SENSOR_INTERVAL_SECONDS = 60

Trade-offs: - Shorter interval: More responsive, higher database load, more Dagster evaluations - Longer interval: Less responsive, lower database load, fewer evaluations

Recommended Range: 2-15 minutes depending on data change frequency

After changing: Redeploy Dagster code


Performance Characteristics

Sensor Evaluation Time

Dealers Avg Rows/Dealer Evaluation Time
5 1,000 ~5 seconds
10 1,000 ~10 seconds
20 1,000 ~20 seconds
50 1,000 ~45 seconds

Bottleneck: Database query time for MAX(usage_modified) grouped by dealer

Optimization: Index on usage_modified column


Bronze Materialization Impact

Typical Run (for billing_usage_raw): - Query time: 30-120 seconds (depends on usage table size) - Data transfer: 10-100MB - Processing time: 5-15 seconds

Note: Bronze pulls ALL usage data, not just changed dealers (by design - keeps query simple)


Silver Partition Materialization Impact

Typical Cascade (per affected partition): - billing_usage_raw (bronze - once): 1-2 minutes - billing_usage_aggregated (silver partition): 10-30 seconds - billing_monthly_usage_by_service (silver partition): 5-10 seconds - usage_report (gold): 30-60 seconds - active_services (gold): 30-60 seconds

Total Time (sequential per partition): 2-4 minutes per partition Total Time (parallel, max_concurrent_runs=50): Same as longest partition

Efficiency: Only affected partitions rematerialize, not all 10-20 partners


Comparison with v4_warehouse Sensor

Feature v4_warehouse Sensor usage Sensor
Polling Interval 15 minutes 5 minutes (configurable)
Change Detection Checksum + row count Timestamp-based
Bronze Layer Partitioned Partitionless (medallion)
Query Complexity Medium (per-partition WHERE) Low (simple GROUP BY)
Partition Trigger Direct bronze partition Bronze + mapped silver partitions
Data Volume High (v4_warehouse large) Medium (usage monthly wipe)
Downstream Assets ~50 per partition ~2-3 per partition
Use Case Infrequent warehouse changes Frequent usage updates

Testing

Unit Tests

File: tests/test_usage_sensor.py

Run Tests:

# All sensor tests
pytest tests/test_usage_sensor.py -v

# Specific test
pytest tests/test_usage_sensor.py::test_sensor_detects_changes_for_one_dealer -v

# Coverage report
pytest tests/test_usage_sensor.py --cov=vanguard_wholesale.defs.sensors --cov-report=term-missing

Test Coverage: 90%+ of sensor logic

Key Test Cases: - First run with no cursor - No changes detected - Changes for specific dealers - Multiple dealers same partner - Empty usage table - Database error handling - Null timestamp handling - Corrupted cursor recovery


Integration Testing (Dev Environment)

Prerequisites: - Access to dev billing_cdrs database - At least 1 partner partition exists - Dagster daemon running

Test Procedure:

  1. Baseline Check:

    # Enable sensor
    dagster sensor start usage_change_detection_sensor
    
    # Wait for evaluation (5 minutes)
    dagster sensor list
    

  2. Simulate Change:

    -- Update usage_modified for some records
    UPDATE billing.usage
    SET usage_modified = NOW()
    WHERE usage_FK_service_id IN (SELECT service_id FROM ... LIMIT 10);
    

  3. Wait 5 Minutes (or trigger sensor manually via Dagster UI)

  4. Verify:

    # Check sensor created RunRequests
    dagster sensor list
    
    # Check bronze asset materialized
    dagster asset list --select billing_usage_raw
    
    # Check silver partitions auto-materialized
    dagster run list --limit 20
    

  5. Cleanup: Revert test data if needed


Configuration Reference

Environment Variables

The sensor requires these resources (configured in definitions.py):

Billing CDRs Database:

BILLING_CDRS_DB_HOST=billing-db.example.com
BILLING_CDRS_DB_PORT=3306
BILLING_CDRS_DB_USER=dagster
BILLING_CDRS_DB_PASSWORD=***
BILLING_CDRS_DB_DATABASE=billing

V3 Database (for dealer mapping):

V3_DB_HOST=v3-db.example.com
V3_DB_PORT=3306
V3_DB_USER=dagster
V3_DB_PASSWORD=***


FAQs

Q: How do I know if the sensor is working?

A: Check Dagster UI → Sensors → usage_change_detection_sensor. "Last tick" should update every 5 minutes. If "Status" shows errors, investigate logs.


Q: Will the sensor detect changes made outside of Dagster?

A: Yes! The sensor polls the billing.usage table directly, detecting ANY changes (manual SQL, external ETL, etc.) as long as usage_modified timestamp updates.


Q: What happens if multiple dealers change simultaneously?

A: The sensor maps all changed dealers to partner UUIDs and creates one bronze RunRequest + one RunRequest per affected partner partition. Dagster executes up to max_concurrent_runs in parallel.


Q: Why is bronze partitionless but silver partitioned?

A: Medallion architecture principle - keep bronze simple (just extract raw data), handle complexity in silver (partitioning, filtering, mapping). This reduces query complexity and follows best practices.


Q: Can I backfill historical usage data without triggering the sensor?

A: Yes, manually materialize billing_usage_raw in Dagster UI. The sensor only reacts to its own polling schedule, not manual runs.


Q: How do I test the sensor locally?

A: Run unit tests with pytest tests/test_usage_sensor.py. For integration testing, deploy to dev environment and simulate data changes in billing.usage table.


Q: What if a downstream asset fails?

A: Failed assets log errors but don't block siblings. You can retry the failed asset manually in Dagster UI. AutomationCondition will retry automatically on next upstream change.


Q: Does the sensor use a lot of database resources?

A: Minimal impact. Every 5 minutes, it runs a lightweight SELECT MAX(usage_modified) GROUP BY dealer_id query. Execution time <30 seconds with proper indexes.


Support & Contact

Documentation: This file + src/vanguard_wholesale/defs/sensors.py

Code References: - Sensor implementation: src/vanguard_wholesale/defs/sensors.py:318 - Bronze asset: src/vanguard_wholesale/defs/assets/billing_cdrs_bronze.py:42 - Silver assets: src/vanguard_wholesale/defs/assets/billing_cdrs_silver.py - Tests: tests/test_usage_sensor.py

Monitoring: - Dagster UI: http://<dagster-host>:<port>/sensors/usage_change_detection_sensor - Logs: Dagster UI → Sensors → Evaluation logs


Last Updated: 2026-01-20 Version: 1.0.0