Usage Change Detection Sensor¶
Overview¶
The usage_change_detection_sensor is a polling sensor that automatically detects changes in the billing.usage source table and triggers rematerialization of affected partner partitions through the medallion architecture (bronze → silver → gold layers).
Key Features¶
- Polling Interval: 5 minutes (300 seconds) - configurable
- Change Detection: Timestamp-based on
usage_modifiedcolumn per dealer - Partition-Aware: Triggers only affected partner partitions in silver layer
- Medallion Architecture: Bronze layer is simple and partitionless, silver layer is partitioned
- Resilient: Database failures don't block sensor, partial failures logged as warnings
- Automatic Downstream Propagation: Uses AutomationCondition.eager() on downstream assets
Architecture¶
Medallion Architecture Pattern¶
The usage sensor follows a medallion (bronze → silver → gold) architecture:
billing.usage (MySQL Source)
│
├─ [Sensor polls every 5 min]
│ └─ Queries MAX(usage_modified) per dealer_id
│ └─ Maps dealer_ids to partner UUIDs
│ └─ Compares with cursor state
│
▼ (on change detected)
│
billing_usage_raw (Bronze Layer - Partitionless)
│ └─ Simple query pulling all usage data
│ └─ Filters: system_id=3, service_status_id IN (-3,3,10,11)
│
▼ (AutomationCondition.eager)
│
billing_usage_aggregated (Silver Layer - Partitioned by Partner)
│ └─ Maps dealer_id to partner UUID
│ └─ Filters to partition's partner
│ └─ Only affected partitions rematerialize
│
▼ (AutomationCondition.eager)
│
billing_monthly_usage_by_service (Silver Layer - Partitioned by Partner)
│ └─ Monthly aggregation per service
│
▼ (AutomationCondition.eager)
│
Downstream Report Assets (~2-3 reports):
├─ usage_report (Partitioned by partner)
│ └─ AutomationCondition.eager()
├─ active_services (Partitioned by partner)
│ └─ AutomationCondition.eager()
Change Detection Mechanism¶
Timestamp-Based Strategy¶
The sensor uses a simple, efficient timestamp comparison approach:
Strategy:
1. Query MAX(usage_modified) grouped by dealer_id
2. Compare with previous cursor state per dealer
3. Map changed dealers to partner UUIDs
4. Trigger bronze (always) + affected silver partitions
Trigger Conditions:
1. First run: No cursor exists → trigger all
2. Dealer timestamp changed: MAX(usage_modified) for dealer changed → trigger that dealer's partner partition
3. New dealer: Dealer appears with data → trigger that partner partition
Implementation: usage_change_detection_sensor() in src/vanguard_wholesale/defs/sensors.py:318
Cursor State Management¶
The sensor maintains state using a JSON cursor:
{
"global_last_modified": "2026-01-20T12:00:00Z",
"dealer_last_modified": {
"1": "2026-01-20T12:00:00Z",
"2": "2026-01-20T11:30:00Z",
"3": null
}
}
Fields:
- global_last_modified: Overall MAX(usage_modified) across all dealers (optimization check)
- dealer_last_modified: Per-dealer timestamps for granular change detection
Cursor Location: Stored in Dagster's sensor evaluation context (MySQL-backed)
Configuration¶
Sensor Configuration¶
File: src/vanguard_wholesale/defs/sensors.py:314
Key Parameters:
# Configurable polling interval (default 5 minutes)
USAGE_SENSOR_INTERVAL_SECONDS = 300
@dg.sensor(
name="usage_change_detection_sensor",
minimum_interval_seconds=USAGE_SENSOR_INTERVAL_SECONDS,
asset_selection=dg.AssetSelection.keys("billing_usage_aggregated"),
)
To adjust polling interval: Edit USAGE_SENSOR_INTERVAL_SECONDS constant
Query Filters¶
The sensor uses the same filters as billing_usage_raw asset:
File: src/vanguard_wholesale/defs/sensors.py:389
FROM billing.usage AS usage_model
LEFT OUTER JOIN billing.v4_wh_service_data AS usageServiceData
ON usageServiceData.service_data_FK_customer_id = usage_model.usage_FK_customer_id
AND usageServiceData.service_data_FK_service_id = usage_model.usage_FK_service_id
AND usageServiceData.service_data_FK_service_status_id IN (-3, 3, 10, 11)
AND usageServiceData.service_data_csa_category_id IN (1, 2, 3)
WHERE usageServiceData.service_data_system_id = 3
Key Filters:
- system_id = 3: Only Vanguard services
- service_status_id IN (-3, 3, 10, 11): Active/relevant service statuses
- csa_category_id IN (1, 2, 3): CSA category filtering
Asset Configuration¶
Bronze Layer (Partitionless)¶
Asset: billing_usage_raw
File: src/vanguard_wholesale/defs/assets/billing_cdrs_bronze.py:42
@dg.asset(
key="billing_usage_raw",
io_manager_key="polars_io_manager",
kinds=["mysql", "polars"],
group_name="billing_bronze",
# No partitions_def - simple bronze extract
)
Design Philosophy: - Keep bronze layer simple - just extract raw data - No partitioning at bronze - reduces query complexity - Let silver layer handle partitioning and dealer mapping
Silver Layer (Partitioned)¶
Asset: billing_usage_aggregated
File: src/vanguard_wholesale/defs/assets/billing_cdrs_silver.py:29
@dg.asset(
key="billing_usage_aggregated",
partitions_def=partner_partition_def,
automation_condition=dg.AutomationCondition.eager(),
# Consumes billing_usage_raw (bronze)
)
Asset: billing_monthly_usage_by_service
File: src/vanguard_wholesale/defs/assets/billing_cdrs_silver.py:255
@dg.asset(
key="billing_monthly_usage_by_service",
partitions_def=partner_partition_def,
automation_condition=dg.AutomationCondition.eager(),
# Consumes billing_usage_aggregated
)
Design Philosophy:
- Silver layer filters bronze data by partner partition
- Uses dealer_partner_mapping asset to map dealer_id → partner UUID
- Only rematerializes affected partitions when triggered by sensor
Run Request Flow¶
Example: Two Dealers Updated¶
Scenario: Dealers 123 and 456 have new usage records
Sensor Logic:
1. Query detects MAX(usage_modified) changed for dealers 123, 456
2. Query maps: dealer 123 → partner-A, dealer 456 → partner-B
3. Create run requests:
- billing_usage_raw (bronze, no partition)
- billing_usage_aggregated[partner-A] (silver partition)
- billing_usage_aggregated[partner-B] (silver partition)
Asset Execution:
1. billing_usage_raw materializes (pulls ALL usage data)
2. billing_usage_aggregated[partner-A] materializes (filters to partner-A)
3. billing_usage_aggregated[partner-B] materializes (filters to partner-B)
4. billing_monthly_usage_by_service[partner-A] auto-materializes (eager)
5. billing_monthly_usage_by_service[partner-B] auto-materializes (eager)
6. usage_report[partner-A] auto-materializes (eager)
7. usage_report[partner-B] auto-materializes (eager)
8. active_services[partner-A] auto-materializes (eager)
9. active_services[partner-B] auto-materializes (eager)
Efficiency: Only 2 partner partitions rematerialize, not all 10-20 partners
Monitoring¶
Dagster UI¶
- Navigate to: Sensors →
usage_change_detection_sensor - View:
- Sensor evaluation history
- Success/failure rate
- Cursor state (click "View cursor" on latest evaluation)
- Triggered runs per evaluation
Key Metrics to Monitor¶
| Metric | Location | Expected Value | Alert Threshold |
|---|---|---|---|
| Sensor evaluation success rate | Dagster UI → Sensors | >95% | <90% |
| RunRequests per evaluation | Sensor logs | 0-5 (varies) | >15 (investigate) |
| Sensor evaluation duration | Sensor logs | <30 seconds | >2 minutes |
| Change detection frequency | Sensor logs | Varies by workload | N/A |
Log Levels¶
- INFO: Normal change detection, run request creation
- WARNING: Query failures (non-fatal), corrupted cursor recovery
- ERROR: Critical failures (entire sensor fails)
- DEBUG: No-change confirmations
Example Log Messages¶
INFO: Checking dealer timestamps for usage changes
INFO: Found usage data for 12 dealer(s)
INFO: Detected changes for 2 dealer(s): [123, 456]
INFO: Mapped to 2 affected partner partition(s)
INFO: Creating 3 run requests: 1 bronze (billing_usage_raw) + 2 silver partitions
WARNING: Failed to query usage timestamps: ConnectionTimeout
Troubleshooting¶
Symptom: Sensor not triggering despite data changes¶
Possible Causes:
1. usage_modified column not updating in source table
2. Sensor is disabled in Dagster UI
3. Data filtered out by WHERE clause (system_id, service_status_id)
4. Dealer not mapped to any partner UUID
Diagnosis:
# Check sensor status
dagster sensor list
# Check sensor cursor
dagster sensor cursor usage_change_detection_sensor
# Manually query billing.usage
mysql> SELECT dealer_id, MAX(usage_modified)
FROM billing.usage AS usage_model
JOIN billing.v4_wh_service_data AS sd ...
WHERE sd.service_data_system_id = 3
GROUP BY dealer_id;
Solution:
- Verify usage_modified timestamps are updating in source
- Enable sensor in Dagster UI
- Verify data matches filter criteria
- Check dealer_partner_mapping asset has correct mappings
Symptom: Sensor triggering too frequently (false positives)¶
Possible Causes:
1. Timestamp precision issues causing minor fluctuations
2. usage_modified updating on every row touch (not just data changes)
Diagnosis:
# In Dagster Python environment, check timestamp stability:
from vanguard_wholesale.defs.sensors import usage_change_detection_sensor
# Query usage table multiple times and compare timestamps
Solution:
- Consider adding threshold (only trigger if changed > X minutes ago)
- Review source table's usage_modified update logic
- May need to add checksum validation in addition to timestamp
Symptom: Bronze rematerializing but silver partitions not auto-materializing¶
Possible Causes:
1. AutomationCondition.eager() not configured on silver assets
2. Asset has errors preventing materialization
3. Dagster daemon not running
Diagnosis:
# Check asset configuration
dagster asset info billing_usage_aggregated | grep "automation"
# Check daemon status
dagster daemon liveness
# Check run queue
dagster run list --status QUEUED
Solution:
- Verify automation_condition=dg.AutomationCondition.eager() in asset decorator
- Fix asset errors (check Dagster UI → Runs → Failed)
- Start Dagster daemon: dagster-daemon run
Symptom: Sensor evaluation taking too long (>1 min)¶
Possible Causes:
1. Slow query on billing.usage table (missing indexes)
2. Large number of dealers (50+)
3. Network latency to billing database
Diagnosis:
# Check query performance
EXPLAIN SELECT dealer_id, MAX(usage_modified)
FROM billing.usage AS usage_model
JOIN billing.v4_wh_service_data ...
GROUP BY dealer_id;
# Check index on usage_modified
SHOW INDEX FROM billing.usage;
Solution:
- Add index: CREATE INDEX idx_usage_modified ON billing.usage (usage_modified);
- Add composite index: CREATE INDEX idx_usage_service ON billing.usage (usage_FK_customer_id, usage_FK_service_id);
- Increase sensor timeout if needed (edit minimum_interval_seconds)
Symptom: All partitions triggering instead of just changed ones¶
Possible Causes: 1. Bronze layer triggering downstream incorrectly 2. Sensor creating RunRequests without partition_key 3. AutomationCondition misconfigured
Diagnosis:
- Check sensor logs for RunRequest details
- Verify partition_key is set in silver layer RunRequests
- Check asset_selection in RunRequest
Solution:
- Ensure sensor creates separate RunRequests: one for bronze (no partition), individual for each affected silver partition
- Verify silver assets have partitions_def=partner_partition_def
Operational Procedures¶
How to Manually Trigger Usage Rematerialization¶
Option 1: Dagster UI (Recommended)
1. Navigate to Assets → billing_usage_raw
2. Click "Materialize"
3. Click "Launch run"
4. Downstream partitions will auto-materialize via AutomationCondition
Option 2: CLI
# Materialize bronze (triggers cascade)
dagster asset materialize --select billing_usage_raw
# Or materialize specific silver partition
dagster asset materialize \
--select billing_usage_aggregated \
--partition partner-uuid-123
How to Disable Sensor Temporarily¶
Dagster UI:
1. Navigate to Sensors → usage_change_detection_sensor
2. Click "Stop sensor"
3. Confirm
Note: Sensor will not evaluate while stopped. No changes will be detected.
How to Reset Sensor Cursor¶
Use Case: Cursor corrupted or want to force re-check all dealers
# Clear cursor (triggers first-run behavior for all dealers)
dagster sensor cursor usage_change_detection_sensor --delete
# Manually set cursor (advanced)
dagster sensor cursor usage_change_detection_sensor --set '{
"global_last_modified": null,
"dealer_last_modified": {}
}'
Warning: Clearing cursor will trigger bronze + ALL partner partitions on next evaluation if any data exists.
How to Adjust Polling Interval¶
File: src/vanguard_wholesale/defs/sensors.py:314
# Change from 5 minutes to 10 minutes
USAGE_SENSOR_INTERVAL_SECONDS = 600
# Change to 1 minute (aggressive polling)
USAGE_SENSOR_INTERVAL_SECONDS = 60
Trade-offs: - Shorter interval: More responsive, higher database load, more Dagster evaluations - Longer interval: Less responsive, lower database load, fewer evaluations
Recommended Range: 2-15 minutes depending on data change frequency
After changing: Redeploy Dagster code
Performance Characteristics¶
Sensor Evaluation Time¶
| Dealers | Avg Rows/Dealer | Evaluation Time |
|---|---|---|
| 5 | 1,000 | ~5 seconds |
| 10 | 1,000 | ~10 seconds |
| 20 | 1,000 | ~20 seconds |
| 50 | 1,000 | ~45 seconds |
Bottleneck: Database query time for MAX(usage_modified) grouped by dealer
Optimization: Index on usage_modified column
Bronze Materialization Impact¶
Typical Run (for billing_usage_raw): - Query time: 30-120 seconds (depends on usage table size) - Data transfer: 10-100MB - Processing time: 5-15 seconds
Note: Bronze pulls ALL usage data, not just changed dealers (by design - keeps query simple)
Silver Partition Materialization Impact¶
Typical Cascade (per affected partition):
- billing_usage_raw (bronze - once): 1-2 minutes
- billing_usage_aggregated (silver partition): 10-30 seconds
- billing_monthly_usage_by_service (silver partition): 5-10 seconds
- usage_report (gold): 30-60 seconds
- active_services (gold): 30-60 seconds
Total Time (sequential per partition): 2-4 minutes per partition Total Time (parallel, max_concurrent_runs=50): Same as longest partition
Efficiency: Only affected partitions rematerialize, not all 10-20 partners
Comparison with v4_warehouse Sensor¶
| Feature | v4_warehouse Sensor | usage Sensor |
|---|---|---|
| Polling Interval | 15 minutes | 5 minutes (configurable) |
| Change Detection | Checksum + row count | Timestamp-based |
| Bronze Layer | Partitioned | Partitionless (medallion) |
| Query Complexity | Medium (per-partition WHERE) | Low (simple GROUP BY) |
| Partition Trigger | Direct bronze partition | Bronze + mapped silver partitions |
| Data Volume | High (v4_warehouse large) | Medium (usage monthly wipe) |
| Downstream Assets | ~50 per partition | ~2-3 per partition |
| Use Case | Infrequent warehouse changes | Frequent usage updates |
Testing¶
Unit Tests¶
File: tests/test_usage_sensor.py
Run Tests:
# All sensor tests
pytest tests/test_usage_sensor.py -v
# Specific test
pytest tests/test_usage_sensor.py::test_sensor_detects_changes_for_one_dealer -v
# Coverage report
pytest tests/test_usage_sensor.py --cov=vanguard_wholesale.defs.sensors --cov-report=term-missing
Test Coverage: 90%+ of sensor logic
Key Test Cases: - First run with no cursor - No changes detected - Changes for specific dealers - Multiple dealers same partner - Empty usage table - Database error handling - Null timestamp handling - Corrupted cursor recovery
Integration Testing (Dev Environment)¶
Prerequisites: - Access to dev billing_cdrs database - At least 1 partner partition exists - Dagster daemon running
Test Procedure:
-
Baseline Check:
# Enable sensor dagster sensor start usage_change_detection_sensor # Wait for evaluation (5 minutes) dagster sensor list -
Simulate Change:
-- Update usage_modified for some records UPDATE billing.usage SET usage_modified = NOW() WHERE usage_FK_service_id IN (SELECT service_id FROM ... LIMIT 10); -
Wait 5 Minutes (or trigger sensor manually via Dagster UI)
-
Verify:
# Check sensor created RunRequests dagster sensor list # Check bronze asset materialized dagster asset list --select billing_usage_raw # Check silver partitions auto-materialized dagster run list --limit 20 -
Cleanup: Revert test data if needed
Configuration Reference¶
Environment Variables¶
The sensor requires these resources (configured in definitions.py):
Billing CDRs Database:
BILLING_CDRS_DB_HOST=billing-db.example.com
BILLING_CDRS_DB_PORT=3306
BILLING_CDRS_DB_USER=dagster
BILLING_CDRS_DB_PASSWORD=***
BILLING_CDRS_DB_DATABASE=billing
V3 Database (for dealer mapping):
V3_DB_HOST=v3-db.example.com
V3_DB_PORT=3306
V3_DB_USER=dagster
V3_DB_PASSWORD=***
FAQs¶
Q: How do I know if the sensor is working?
A: Check Dagster UI → Sensors → usage_change_detection_sensor. "Last tick" should update every 5 minutes. If "Status" shows errors, investigate logs.
Q: Will the sensor detect changes made outside of Dagster?
A: Yes! The sensor polls the billing.usage table directly, detecting ANY changes (manual SQL, external ETL, etc.) as long as usage_modified timestamp updates.
Q: What happens if multiple dealers change simultaneously?
A: The sensor maps all changed dealers to partner UUIDs and creates one bronze RunRequest + one RunRequest per affected partner partition. Dagster executes up to max_concurrent_runs in parallel.
Q: Why is bronze partitionless but silver partitioned?
A: Medallion architecture principle - keep bronze simple (just extract raw data), handle complexity in silver (partitioning, filtering, mapping). This reduces query complexity and follows best practices.
Q: Can I backfill historical usage data without triggering the sensor?
A: Yes, manually materialize billing_usage_raw in Dagster UI. The sensor only reacts to its own polling schedule, not manual runs.
Q: How do I test the sensor locally?
A: Run unit tests with pytest tests/test_usage_sensor.py. For integration testing, deploy to dev environment and simulate data changes in billing.usage table.
Q: What if a downstream asset fails?
A: Failed assets log errors but don't block siblings. You can retry the failed asset manually in Dagster UI. AutomationCondition will retry automatically on next upstream change.
Q: Does the sensor use a lot of database resources?
A: Minimal impact. Every 5 minutes, it runs a lightweight SELECT MAX(usage_modified) GROUP BY dealer_id query. Execution time <30 seconds with proper indexes.
Support & Contact¶
Documentation: This file + src/vanguard_wholesale/defs/sensors.py
Code References:
- Sensor implementation: src/vanguard_wholesale/defs/sensors.py:318
- Bronze asset: src/vanguard_wholesale/defs/assets/billing_cdrs_bronze.py:42
- Silver assets: src/vanguard_wholesale/defs/assets/billing_cdrs_silver.py
- Tests: tests/test_usage_sensor.py
Monitoring:
- Dagster UI: http://<dagster-host>:<port>/sensors/usage_change_detection_sensor
- Logs: Dagster UI → Sensors → Evaluation logs
Last Updated: 2026-01-20 Version: 1.0.0