Skip to content

v4_warehouse Change Detection Sensor

Overview

The v4_warehouse_change_detection_sensor is a polling sensor that automatically detects changes in the v4_warehouse.service_data source table and triggers rematerialization of v3_v4_warehouse_service_data and all downstream assets.

Key Features

  • Polling Interval: 15 minutes (900 seconds)
  • Change Detection: Python-based checksum on critical columns (service_status_id, customer_id) plus row count
  • Partition-Aware: Operates independently per partner partition
  • Resilient: Partial failures don't block other partitions
  • Automatic Downstream Propagation: Uses AutoMaterializePolicy.eager() on downstream assets

Architecture

Data Flow

v4_warehouse.service_data (MySQL Source)
    │
    ├─ [Sensor polls every 15 min]
    │   └─ Computes row_count + SHA256 checksum
    │   └─ Compares with cursor state
    │
    ▼ (on change detected)
    │
v3_v4_warehouse_service_data (Partitioned by partner)
    │
    ├─ [AutoMaterializePolicy.eager()]
    │
    ▼ (automatic propagation)
    │
Downstream Assets (~50+ per partition):
    ├─ Report Transform Assets (9 types × _raw)
    │   └─ AutoMaterializePolicy.eager()
    ├─ PII Masking Assets
    │   └─ AutoMaterializePolicy.eager()
    ├─ Backfill Assets
    │   └─ AutoMaterializePolicy.eager()
    ├─ Load Assets (multi-env)
    │   └─ AutoMaterializePolicy.eager()
    ├─ v3_service_products
    │   └─ AutoMaterializePolicy.eager()
    ├─ Liability Assets (3 assets)
    │   └─ AutoMaterializePolicy.eager()
    └─ Customer Filter Assets (7 assets)

Change Detection Mechanism

Checksum Strategy

The sensor computes a SHA256 hash of critical columns to detect data changes:

Columns Monitored: - service_data_FK_service_id (for ordering) - service_data_FK_service_status_id (status changes) - service_data_FK_customer_id (customer reassignments)

Trigger Conditions: 1. Row count change: New services added or services deleted 2. Checksum change: Service status changed OR service moved to different customer

Implementation: compute_partition_checksum() in src/vanguard_wholesale/defs/sensors.py:96

Cursor State Management

The sensor maintains state using a JSON cursor:

{
  "partner-uuid-1": {
    "row_count": 12345,
    "checksum": "a1b2c3d4e5f6...",
    "last_check": "2026-01-19T15:30:00Z"
  },
  "partner-uuid-2": {...}
}

Cursor Location: Stored in Dagster's sensor evaluation context (MySQL-backed)


Configuration

Sensor Configuration

File: src/vanguard_wholesale/defs/sensors.py:135

Key Parameters:

@dg.sensor(
    name="v4_warehouse_change_detection_sensor",
    minimum_interval_seconds=900,  # 15 minutes
    asset_selection=dg.AssetSelection.keys("v3_v4_warehouse_service_data"),
)

Concurrency Limits

File: dagster.yaml:10

run_coordinator:
  config:
    max_concurrent_runs: 50  # Handles 10 partitions × 5 stages
    tag_concurrency_limits:
      - key: "dagster/concurrency"
        value: "warehouse_polling"
        limit: 10  # Max 10 simultaneous warehouse checks

Monitoring

Dagster UI

  1. Navigate to: Sensors → v4_warehouse_change_detection_sensor
  2. View:
  3. Sensor evaluation history
  4. Success/failure rate
  5. Cursor state (click "View cursor" on latest evaluation)
  6. Triggered runs per evaluation

Key Metrics to Monitor

Metric Location Expected Value Alert Threshold
Sensor evaluation success rate Dagster UI → Sensors >95% <90%
RunRequests per evaluation Sensor logs 0-5 (varies by day) >10 (investigate)
Sensor evaluation duration Sensor logs <5 minutes >10 minutes
Change detection frequency Sensor logs Varies by workload N/A

Log Levels

  • INFO: Normal change detection, partition summaries
  • WARNING: Partition query failures (non-fatal)
  • ERROR: Critical failures (entire sensor fails)
  • DEBUG: No-change confirmations per partition

Example Log Messages

INFO: Checking 12 partner partitions for changes
INFO: Partition abc123: CHANGE DETECTED - rows 1234 → 1250 (Δ+16), checksum changed
INFO: Detected changes in 2 partition(s), triggering rematerialization
WARNING: Failed to check partition xyz789: ConnectionTimeout. Skipping this partition.

Troubleshooting

Symptom: Sensor not triggering despite data changes

Possible Causes: 1. Checksum not sensitive to changed columns 2. Sensor is disabled in Dagster UI 3. Changes filtered out by WHERE clause (system_id or module_type_id)

Diagnosis:

# Check sensor status
dagster sensor list

# Check sensor cursor
dagster sensor cursor v4_warehouse_change_detection_sensor

# Manually query v4_warehouse
mysql> SELECT COUNT(*), MAX(service_data_FK_service_id)
       FROM v4_warehouse.service_data
       WHERE service_data_system_id = 3
       AND service_data_FK_module_type_id IN (1, 8);

Solution: - Verify checksum columns include the changed field (edit compute_partition_checksum()) - Enable sensor in Dagster UI - Verify data matches filter criteria


Symptom: Too many false positive triggers

Possible Causes: 1. Non-deterministic query ordering causing checksum variations 2. Floating-point precision differences in checksum calculation

Diagnosis:

# In Dagster Python environment, test checksum stability:
from vanguard_wholesale.defs.sensors import compute_partition_checksum
import polars as pl

# Query same data twice
df1 = pl.read_database(query, conn)
df2 = pl.read_database(query, conn)

checksum1 = compute_partition_checksum(df1)
checksum2 = compute_partition_checksum(df2)

assert checksum1 == checksum2  # Should not fail

Solution: - Ensure deterministic sort in compute_partition_checksum() (line 115: sorted_df = df.sort("service_data_FK_service_id")) - Add additional sort columns if service_id is not unique


Symptom: Sensor evaluation taking too long (>5 min)

Possible Causes: 1. Too many partitions to query (50+) 2. Slow database queries (missing indexes, high load) 3. Network latency to V3 database

Diagnosis:

# Check partition count
dagster instance info | grep "Dynamic partitions"

# Check query performance
EXPLAIN SELECT service_data_FK_service_id, service_data_FK_service_status_id, service_data_FK_customer_id
FROM v4_warehouse.service_data sd
INNER JOIN dealer.dealer d ON d.dealer_id = sd.service_data_FK_dealer_id
WHERE service_data_system_id = 3
AND service_data_FK_module_type_id IN (1, 8)
AND d.dealer_uuid = 'abc123';

Solution: - Add indexes: CREATE INDEX idx_service_data_system_module ON v4_warehouse.service_data (service_data_system_id, service_data_FK_module_type_id); - Increase sensor timeout if needed (edit minimum_interval_seconds) - Consider batching partition queries (code modification required)


Symptom: Downstream assets not auto-materializing

Possible Causes: 1. AutoMaterializePolicy not configured on asset 2. Asset has errors preventing materialization 3. Dagster daemon not running

Diagnosis:

# Check asset configuration
dagster asset info v3_service_products | grep "auto_materialize"

# Check daemon status
dagster daemon liveness

# Check run queue
dagster run list --status QUEUED

Solution: - Verify auto_materialize_policy=dg.AutoMaterializePolicy.eager() in asset decorator - Fix asset errors (check Dagster UI → Runs → Failed) - Start Dagster daemon: dagster-daemon run


Symptom: Partial partition failures blocking entire sensor

Expected Behavior: Sensor should continue processing other partitions even if one fails.

Diagnosis:

# Check sensor logs for try-except handling
# Line 287-295 in sensors.py should catch exceptions per partition

Solution: - If sensor is crashing entirely, verify exception handling in sensor code - Check that failures are logged as WARNING, not ERROR - Failed partitions preserve previous cursor state


Operational Procedures

How to Manually Trigger Warehouse Rematerialization

Option 1: Dagster UI 1. Navigate to Assets → v3_v4_warehouse_service_data 2. Click "Materialize" 3. Select partition(s) to materialize 4. Click "Launch run"

Option 2: CLI

dagster asset materialize \
  --select v3_v4_warehouse_service_data \
  --partition partner-uuid-123


How to Disable Sensor Temporarily

Dagster UI: 1. Navigate to Sensors → v4_warehouse_change_detection_sensor 2. Click "Stop sensor" 3. Confirm

Note: Sensor will not evaluate while stopped. No changes will be detected.


How to Reset Sensor Cursor

Use Case: Cursor corrupted or want to force re-check all partitions

# Clear cursor (triggers first-run behavior for all partitions)
dagster sensor cursor v4_warehouse_change_detection_sensor --delete

# Manually set cursor (advanced)
dagster sensor cursor v4_warehouse_change_detection_sensor --set '{"partner-uuid-1": {"row_count": 0, "checksum": "", "last_check": "2026-01-01T00:00:00Z"}}'

Warning: Clearing cursor may trigger RunRequests for ALL partitions on next evaluation if data has changed since last cursor state.


How to Add New Columns to Change Detection

File: src/vanguard_wholesale/defs/sensors.py:118

Steps: 1. Edit checksum_columns list in compute_partition_checksum():

checksum_columns = [
    "service_data_FK_service_id",
    "service_data_FK_service_status_id",
    "service_data_FK_customer_id",
    "service_data_tariff",  # NEW COLUMN
]

  1. Update sensor query (line 205) to SELECT the new column:

    SELECT
        sd.service_data_FK_service_id,
        sd.service_data_FK_service_status_id,
        sd.service_data_FK_customer_id,
        sd.service_data_tariff  -- NEW COLUMN
    FROM v4_warehouse.service_data sd
    ...
    

  2. Test checksum function:

    pytest tests/test_v4_warehouse_sensor.py::test_compute_partition_checksum_deterministic -v
    

  3. Deploy changes and clear cursor to force fresh checksums


Configuration Reference

Adjusting Polling Interval

File: src/vanguard_wholesale/defs/sensors.py:137

@dg.sensor(
    minimum_interval_seconds=1800,  # Change to 30 minutes
    ...
)

Trade-offs: - Shorter interval: More responsive, higher database load - Longer interval: Less responsive, lower database load

Recommended Range: 5-60 minutes depending on data change frequency


Modifying Concurrency Limits

File: dagster.yaml:10

run_coordinator:
  config:
    max_concurrent_runs: 75  # Increase if server has more capacity

Calculation:

max_concurrent_runs = (max_partitions_changing × stages_per_partition) × safety_factor
                    = (10 × 5) × 1.5
                    = 75

Monitor: Dagster UI → Runs → Queued (should be minimal)


Testing

Unit Tests

File: tests/test_v4_warehouse_sensor.py

Run Tests:

# All sensor tests
pytest tests/test_v4_warehouse_sensor.py -v

# Specific test
pytest tests/test_v4_warehouse_sensor.py::test_sensor_detects_new_rows -v

# Coverage report
pytest tests/test_v4_warehouse_sensor.py --cov=vanguard_wholesale.defs.sensors --cov-report=term-missing

Test Coverage: 90%+ of sensor logic


Integration Testing (Dev Environment)

Prerequisites: - Access to dev V3 database - At least 1 partner partition exists - Dagster daemon running

Test Procedure:

  1. Baseline Check:

    # Enable sensor
    dagster sensor start v4_warehouse_change_detection_sensor
    
    # Wait for evaluation
    dagster sensor list  # Check "Last tick"
    

  2. Simulate Change:

    -- Insert new row into v4_warehouse.service_data
    INSERT INTO v4_warehouse.service_data (
        service_data_FK_service_id,
        service_data_FK_customer_id,
        service_data_FK_dealer_id,
        service_data_FK_service_status_id,
        service_data_system_id,
        service_data_FK_module_type_id,
        ...
    ) VALUES (99999, 1, 1, 3, 3, 1, ...);
    

  3. Wait 15 Minutes (or trigger sensor manually via Dagster UI)

  4. Verify:

    # Check sensor created RunRequest
    dagster sensor list
    
    # Check warehouse asset materialized
    dagster asset list --select v3_v4_warehouse_service_data
    
    # Check downstream auto-materialized
    dagster run list --limit 20  # Should see cascade of runs
    

  5. Cleanup:

    DELETE FROM v4_warehouse.service_data WHERE service_data_FK_service_id = 99999;
    


Performance Characteristics

Sensor Evaluation Time

Partitions Avg Rows/Partition Evaluation Time
5 1,000 ~15 seconds
10 1,000 ~30 seconds
20 1,000 ~60 seconds
50 1,000 ~2.5 minutes

Bottleneck: Database query time (network + MySQL execution)


Checksum Calculation Performance

Rows Checksum Time
100 <0.1 seconds
1,000 ~0.3 seconds
10,000 ~2 seconds
100,000 ~15 seconds

Note: In-memory Polars + SHA256, very efficient


Downstream Materialization Impact

Typical Cascade (per partition): - 1x v3_v4_warehouse_service_data (2-5 minutes) - 9x Report _raw assets (1-3 minutes each) - 9x Report _masked assets (<1 minute each) - 9x Report _backfilled assets (1-2 minutes each) - 9x Report _load_prod assets (1-3 minutes each) - 3x Liability assets (1-2 minutes each)

Total Time (sequential): 60-90 minutes per partition Total Time (parallel, max_concurrent_runs=50): 10-15 minutes per partition


Rollback Procedure

If issues arise after deployment:

  1. Disable Sensor:

    # Dagster UI: Sensors → Stop
    # OR CLI:
    dagster sensor stop v4_warehouse_change_detection_sensor
    

  2. Revert AutoMaterializePolicy (if needed):

    git revert <commit-hash>
    # Redeploy Dagster
    

  3. Revert dagster.yaml (if concurrency issues):

    git checkout HEAD~1 dagster.yaml
    # Restart Dagster daemon
    dagster-daemon restart
    

  4. Manual Triggering: Use Dagster UI to manually materialize assets as needed


FAQs

Q: How do I know if the sensor is working?

A: Check Dagster UI → Sensors → v4_warehouse_change_detection_sensor. "Last tick" should update every 15 minutes. If "Status" shows errors, investigate logs.


Q: Will the sensor detect changes made outside of Dagster?

A: Yes! The sensor polls the v4_warehouse.service_data table directly, detecting ANY changes (manual SQL, external ETL, etc.).


Q: What happens if multiple partitions change simultaneously?

A: The sensor creates separate RunRequests for each changed partition. Dagster's run coordinator queues them and executes up to max_concurrent_runs (50) in parallel.


Q: Can I backfill historical data without triggering the sensor?

A: Yes, manually materialize v3_v4_warehouse_service_data in Dagster UI. The sensor only reacts to its own polling schedule, not manual runs.


Q: How do I test the sensor locally?

A: Run unit tests with pytest tests/test_v4_warehouse_sensor.py. For integration testing, deploy to dev environment and simulate data changes.


Q: What if a downstream asset fails?

A: Other assets in the same partition continue materializing (thanks to can_subset=True). Failed asset logs a Failure but doesn't block siblings. You can retry the failed asset manually.


Q: Does the sensor use a lot of database resources?

A: Minimal impact. Every 15 minutes, it runs lightweight SELECT queries (3 columns, indexed filters) on ~10-20 partitions. Execution time <5 minutes total.


Support & Contact

Documentation: This file + IMPLEMENTATION_PLAN_V4_WAREHOUSE_POLLING.md

Code References: - Sensor implementation: src/vanguard_wholesale/defs/sensors.py:135 - Checksum function: src/vanguard_wholesale/defs/sensors.py:96 - Tests: tests/test_v4_warehouse_sensor.py

Monitoring: - Dagster UI: http://<dagster-host>:<port>/sensors/v4_warehouse_change_detection_sensor - Logs: Dagster UI → Sensors → Evaluation logs


Last Updated: 2026-01-19 Version: 1.0.0