v4_warehouse Change Detection Sensor¶
Overview¶
The v4_warehouse_change_detection_sensor is a polling sensor that automatically detects changes in the v4_warehouse.service_data source table and triggers rematerialization of v3_v4_warehouse_service_data and all downstream assets.
Key Features¶
- Polling Interval: 15 minutes (900 seconds)
- Change Detection: Python-based checksum on critical columns (service_status_id, customer_id) plus row count
- Partition-Aware: Operates independently per partner partition
- Resilient: Partial failures don't block other partitions
- Automatic Downstream Propagation: Uses AutoMaterializePolicy.eager() on downstream assets
Architecture¶
Data Flow¶
v4_warehouse.service_data (MySQL Source)
│
├─ [Sensor polls every 15 min]
│ └─ Computes row_count + SHA256 checksum
│ └─ Compares with cursor state
│
▼ (on change detected)
│
v3_v4_warehouse_service_data (Partitioned by partner)
│
├─ [AutoMaterializePolicy.eager()]
│
▼ (automatic propagation)
│
Downstream Assets (~50+ per partition):
├─ Report Transform Assets (9 types × _raw)
│ └─ AutoMaterializePolicy.eager()
├─ PII Masking Assets
│ └─ AutoMaterializePolicy.eager()
├─ Backfill Assets
│ └─ AutoMaterializePolicy.eager()
├─ Load Assets (multi-env)
│ └─ AutoMaterializePolicy.eager()
├─ v3_service_products
│ └─ AutoMaterializePolicy.eager()
├─ Liability Assets (3 assets)
│ └─ AutoMaterializePolicy.eager()
└─ Customer Filter Assets (7 assets)
Change Detection Mechanism¶
Checksum Strategy¶
The sensor computes a SHA256 hash of critical columns to detect data changes:
Columns Monitored:
- service_data_FK_service_id (for ordering)
- service_data_FK_service_status_id (status changes)
- service_data_FK_customer_id (customer reassignments)
Trigger Conditions: 1. Row count change: New services added or services deleted 2. Checksum change: Service status changed OR service moved to different customer
Implementation: compute_partition_checksum() in src/vanguard_wholesale/defs/sensors.py:96
Cursor State Management¶
The sensor maintains state using a JSON cursor:
{
"partner-uuid-1": {
"row_count": 12345,
"checksum": "a1b2c3d4e5f6...",
"last_check": "2026-01-19T15:30:00Z"
},
"partner-uuid-2": {...}
}
Cursor Location: Stored in Dagster's sensor evaluation context (MySQL-backed)
Configuration¶
Sensor Configuration¶
File: src/vanguard_wholesale/defs/sensors.py:135
Key Parameters:
@dg.sensor(
name="v4_warehouse_change_detection_sensor",
minimum_interval_seconds=900, # 15 minutes
asset_selection=dg.AssetSelection.keys("v3_v4_warehouse_service_data"),
)
Concurrency Limits¶
File: dagster.yaml:10
run_coordinator:
config:
max_concurrent_runs: 50 # Handles 10 partitions × 5 stages
tag_concurrency_limits:
- key: "dagster/concurrency"
value: "warehouse_polling"
limit: 10 # Max 10 simultaneous warehouse checks
Monitoring¶
Dagster UI¶
- Navigate to: Sensors →
v4_warehouse_change_detection_sensor - View:
- Sensor evaluation history
- Success/failure rate
- Cursor state (click "View cursor" on latest evaluation)
- Triggered runs per evaluation
Key Metrics to Monitor¶
| Metric | Location | Expected Value | Alert Threshold |
|---|---|---|---|
| Sensor evaluation success rate | Dagster UI → Sensors | >95% | <90% |
| RunRequests per evaluation | Sensor logs | 0-5 (varies by day) | >10 (investigate) |
| Sensor evaluation duration | Sensor logs | <5 minutes | >10 minutes |
| Change detection frequency | Sensor logs | Varies by workload | N/A |
Log Levels¶
- INFO: Normal change detection, partition summaries
- WARNING: Partition query failures (non-fatal)
- ERROR: Critical failures (entire sensor fails)
- DEBUG: No-change confirmations per partition
Example Log Messages¶
INFO: Checking 12 partner partitions for changes
INFO: Partition abc123: CHANGE DETECTED - rows 1234 → 1250 (Δ+16), checksum changed
INFO: Detected changes in 2 partition(s), triggering rematerialization
WARNING: Failed to check partition xyz789: ConnectionTimeout. Skipping this partition.
Troubleshooting¶
Symptom: Sensor not triggering despite data changes¶
Possible Causes: 1. Checksum not sensitive to changed columns 2. Sensor is disabled in Dagster UI 3. Changes filtered out by WHERE clause (system_id or module_type_id)
Diagnosis:
# Check sensor status
dagster sensor list
# Check sensor cursor
dagster sensor cursor v4_warehouse_change_detection_sensor
# Manually query v4_warehouse
mysql> SELECT COUNT(*), MAX(service_data_FK_service_id)
FROM v4_warehouse.service_data
WHERE service_data_system_id = 3
AND service_data_FK_module_type_id IN (1, 8);
Solution:
- Verify checksum columns include the changed field (edit compute_partition_checksum())
- Enable sensor in Dagster UI
- Verify data matches filter criteria
Symptom: Too many false positive triggers¶
Possible Causes: 1. Non-deterministic query ordering causing checksum variations 2. Floating-point precision differences in checksum calculation
Diagnosis:
# In Dagster Python environment, test checksum stability:
from vanguard_wholesale.defs.sensors import compute_partition_checksum
import polars as pl
# Query same data twice
df1 = pl.read_database(query, conn)
df2 = pl.read_database(query, conn)
checksum1 = compute_partition_checksum(df1)
checksum2 = compute_partition_checksum(df2)
assert checksum1 == checksum2 # Should not fail
Solution:
- Ensure deterministic sort in compute_partition_checksum() (line 115: sorted_df = df.sort("service_data_FK_service_id"))
- Add additional sort columns if service_id is not unique
Symptom: Sensor evaluation taking too long (>5 min)¶
Possible Causes: 1. Too many partitions to query (50+) 2. Slow database queries (missing indexes, high load) 3. Network latency to V3 database
Diagnosis:
# Check partition count
dagster instance info | grep "Dynamic partitions"
# Check query performance
EXPLAIN SELECT service_data_FK_service_id, service_data_FK_service_status_id, service_data_FK_customer_id
FROM v4_warehouse.service_data sd
INNER JOIN dealer.dealer d ON d.dealer_id = sd.service_data_FK_dealer_id
WHERE service_data_system_id = 3
AND service_data_FK_module_type_id IN (1, 8)
AND d.dealer_uuid = 'abc123';
Solution:
- Add indexes: CREATE INDEX idx_service_data_system_module ON v4_warehouse.service_data (service_data_system_id, service_data_FK_module_type_id);
- Increase sensor timeout if needed (edit minimum_interval_seconds)
- Consider batching partition queries (code modification required)
Symptom: Downstream assets not auto-materializing¶
Possible Causes: 1. AutoMaterializePolicy not configured on asset 2. Asset has errors preventing materialization 3. Dagster daemon not running
Diagnosis:
# Check asset configuration
dagster asset info v3_service_products | grep "auto_materialize"
# Check daemon status
dagster daemon liveness
# Check run queue
dagster run list --status QUEUED
Solution:
- Verify auto_materialize_policy=dg.AutoMaterializePolicy.eager() in asset decorator
- Fix asset errors (check Dagster UI → Runs → Failed)
- Start Dagster daemon: dagster-daemon run
Symptom: Partial partition failures blocking entire sensor¶
Expected Behavior: Sensor should continue processing other partitions even if one fails.
Diagnosis:
# Check sensor logs for try-except handling
# Line 287-295 in sensors.py should catch exceptions per partition
Solution: - If sensor is crashing entirely, verify exception handling in sensor code - Check that failures are logged as WARNING, not ERROR - Failed partitions preserve previous cursor state
Operational Procedures¶
How to Manually Trigger Warehouse Rematerialization¶
Option 1: Dagster UI
1. Navigate to Assets → v3_v4_warehouse_service_data
2. Click "Materialize"
3. Select partition(s) to materialize
4. Click "Launch run"
Option 2: CLI
dagster asset materialize \
--select v3_v4_warehouse_service_data \
--partition partner-uuid-123
How to Disable Sensor Temporarily¶
Dagster UI:
1. Navigate to Sensors → v4_warehouse_change_detection_sensor
2. Click "Stop sensor"
3. Confirm
Note: Sensor will not evaluate while stopped. No changes will be detected.
How to Reset Sensor Cursor¶
Use Case: Cursor corrupted or want to force re-check all partitions
# Clear cursor (triggers first-run behavior for all partitions)
dagster sensor cursor v4_warehouse_change_detection_sensor --delete
# Manually set cursor (advanced)
dagster sensor cursor v4_warehouse_change_detection_sensor --set '{"partner-uuid-1": {"row_count": 0, "checksum": "", "last_check": "2026-01-01T00:00:00Z"}}'
Warning: Clearing cursor may trigger RunRequests for ALL partitions on next evaluation if data has changed since last cursor state.
How to Add New Columns to Change Detection¶
File: src/vanguard_wholesale/defs/sensors.py:118
Steps:
1. Edit checksum_columns list in compute_partition_checksum():
checksum_columns = [
"service_data_FK_service_id",
"service_data_FK_service_status_id",
"service_data_FK_customer_id",
"service_data_tariff", # NEW COLUMN
]
-
Update sensor query (line 205) to SELECT the new column:
SELECT sd.service_data_FK_service_id, sd.service_data_FK_service_status_id, sd.service_data_FK_customer_id, sd.service_data_tariff -- NEW COLUMN FROM v4_warehouse.service_data sd ... -
Test checksum function:
pytest tests/test_v4_warehouse_sensor.py::test_compute_partition_checksum_deterministic -v -
Deploy changes and clear cursor to force fresh checksums
Configuration Reference¶
Adjusting Polling Interval¶
File: src/vanguard_wholesale/defs/sensors.py:137
@dg.sensor(
minimum_interval_seconds=1800, # Change to 30 minutes
...
)
Trade-offs: - Shorter interval: More responsive, higher database load - Longer interval: Less responsive, lower database load
Recommended Range: 5-60 minutes depending on data change frequency
Modifying Concurrency Limits¶
File: dagster.yaml:10
run_coordinator:
config:
max_concurrent_runs: 75 # Increase if server has more capacity
Calculation:
max_concurrent_runs = (max_partitions_changing × stages_per_partition) × safety_factor
= (10 × 5) × 1.5
= 75
Monitor: Dagster UI → Runs → Queued (should be minimal)
Testing¶
Unit Tests¶
File: tests/test_v4_warehouse_sensor.py
Run Tests:
# All sensor tests
pytest tests/test_v4_warehouse_sensor.py -v
# Specific test
pytest tests/test_v4_warehouse_sensor.py::test_sensor_detects_new_rows -v
# Coverage report
pytest tests/test_v4_warehouse_sensor.py --cov=vanguard_wholesale.defs.sensors --cov-report=term-missing
Test Coverage: 90%+ of sensor logic
Integration Testing (Dev Environment)¶
Prerequisites: - Access to dev V3 database - At least 1 partner partition exists - Dagster daemon running
Test Procedure:
-
Baseline Check:
# Enable sensor dagster sensor start v4_warehouse_change_detection_sensor # Wait for evaluation dagster sensor list # Check "Last tick" -
Simulate Change:
-- Insert new row into v4_warehouse.service_data INSERT INTO v4_warehouse.service_data ( service_data_FK_service_id, service_data_FK_customer_id, service_data_FK_dealer_id, service_data_FK_service_status_id, service_data_system_id, service_data_FK_module_type_id, ... ) VALUES (99999, 1, 1, 3, 3, 1, ...); -
Wait 15 Minutes (or trigger sensor manually via Dagster UI)
-
Verify:
# Check sensor created RunRequest dagster sensor list # Check warehouse asset materialized dagster asset list --select v3_v4_warehouse_service_data # Check downstream auto-materialized dagster run list --limit 20 # Should see cascade of runs -
Cleanup:
DELETE FROM v4_warehouse.service_data WHERE service_data_FK_service_id = 99999;
Performance Characteristics¶
Sensor Evaluation Time¶
| Partitions | Avg Rows/Partition | Evaluation Time |
|---|---|---|
| 5 | 1,000 | ~15 seconds |
| 10 | 1,000 | ~30 seconds |
| 20 | 1,000 | ~60 seconds |
| 50 | 1,000 | ~2.5 minutes |
Bottleneck: Database query time (network + MySQL execution)
Checksum Calculation Performance¶
| Rows | Checksum Time |
|---|---|
| 100 | <0.1 seconds |
| 1,000 | ~0.3 seconds |
| 10,000 | ~2 seconds |
| 100,000 | ~15 seconds |
Note: In-memory Polars + SHA256, very efficient
Downstream Materialization Impact¶
Typical Cascade (per partition):
- 1x v3_v4_warehouse_service_data (2-5 minutes)
- 9x Report _raw assets (1-3 minutes each)
- 9x Report _masked assets (<1 minute each)
- 9x Report _backfilled assets (1-2 minutes each)
- 9x Report _load_prod assets (1-3 minutes each)
- 3x Liability assets (1-2 minutes each)
Total Time (sequential): 60-90 minutes per partition Total Time (parallel, max_concurrent_runs=50): 10-15 minutes per partition
Rollback Procedure¶
If issues arise after deployment:
-
Disable Sensor:
# Dagster UI: Sensors → Stop # OR CLI: dagster sensor stop v4_warehouse_change_detection_sensor -
Revert AutoMaterializePolicy (if needed):
git revert <commit-hash> # Redeploy Dagster -
Revert dagster.yaml (if concurrency issues):
git checkout HEAD~1 dagster.yaml # Restart Dagster daemon dagster-daemon restart -
Manual Triggering: Use Dagster UI to manually materialize assets as needed
FAQs¶
Q: How do I know if the sensor is working?
A: Check Dagster UI → Sensors → v4_warehouse_change_detection_sensor. "Last tick" should update every 15 minutes. If "Status" shows errors, investigate logs.
Q: Will the sensor detect changes made outside of Dagster?
A: Yes! The sensor polls the v4_warehouse.service_data table directly, detecting ANY changes (manual SQL, external ETL, etc.).
Q: What happens if multiple partitions change simultaneously?
A: The sensor creates separate RunRequests for each changed partition. Dagster's run coordinator queues them and executes up to max_concurrent_runs (50) in parallel.
Q: Can I backfill historical data without triggering the sensor?
A: Yes, manually materialize v3_v4_warehouse_service_data in Dagster UI. The sensor only reacts to its own polling schedule, not manual runs.
Q: How do I test the sensor locally?
A: Run unit tests with pytest tests/test_v4_warehouse_sensor.py. For integration testing, deploy to dev environment and simulate data changes.
Q: What if a downstream asset fails?
A: Other assets in the same partition continue materializing (thanks to can_subset=True). Failed asset logs a Failure but doesn't block siblings. You can retry the failed asset manually.
Q: Does the sensor use a lot of database resources?
A: Minimal impact. Every 15 minutes, it runs lightweight SELECT queries (3 columns, indexed filters) on ~10-20 partitions. Execution time <5 minutes total.
Support & Contact¶
Documentation: This file + IMPLEMENTATION_PLAN_V4_WAREHOUSE_POLLING.md
Code References:
- Sensor implementation: src/vanguard_wholesale/defs/sensors.py:135
- Checksum function: src/vanguard_wholesale/defs/sensors.py:96
- Tests: tests/test_v4_warehouse_sensor.py
Monitoring:
- Dagster UI: http://<dagster-host>:<port>/sensors/v4_warehouse_change_detection_sensor
- Logs: Dagster UI → Sensors → Evaluation logs
Last Updated: 2026-01-19 Version: 1.0.0