Skip to content

IO Managers: Why They Matter for the Merged Stack

IO managers are the architectural seam between "what an asset computes" and "where the data lives." The Rev-Sci stack uses them consistently. The proposed datalake does not appear to use them at all. This is the single most impactful pattern difference between the two systems, and getting it right determines how maintainable the merged stack will be.


What IO managers do

In Dagster, an IO manager handles three operations for every asset:

  1. Store — When asset A produces output, the IO manager serialises it (DataFrame → Parquet) and writes it to a location determined by the asset key, partition, and config.
  2. Load — When asset B declares a dependency on asset A, the IO manager reads A's output (Parquet → DataFrame) and passes it as a function argument to B.
  3. Partition resolution — For partitioned assets, the IO manager maps the run's partition key to the correct file path (e.g., {base_dir}/{asset_key}/{partition_key}.parquet).

Rev-Sci example

# definitions.py — storage location configured once
"polars_io_manager": PolarsParquetIOManager(
    base_dir=f"{data_dir}/vanguard_wholesale"
)

# asset code — no file paths, no storage logic
@dg.asset(
    io_manager_key="polars_io_manager",
    partitions_def=partner_partition_def,
)
def billing_usage_aggregated(
    billing_usage_raw: pl.DataFrame,       # IO manager loads this
    dealer_partner_mapping: pl.DataFrame,  # IO manager loads this
) -> dg.MaterializeResult:
    # Pure transform logic — no idea where data comes from or goes
    partner_usage = billing_usage_raw.filter(...)
    return dg.MaterializeResult(value=partner_usage)

The asset doesn't know whether data lives on NFS, local disk, or ADLS. It receives DataFrames and returns DataFrames. To move from local development (tmp/dagster_data/) to production (/opt/dagster/shared-data/), nothing in the asset code changes — get_data_dir() returns the right path based on environment.

What the datalake likely does instead

Based on the briefing's description of explicit ADLS paths, partition layouts (ingestion_date=YYYY-MM-DD/), and manual idempotent-write logic (clear_today_partition), the datalake assets probably look like:

@dg.asset
def bronze_v3_customer(context):
    # Storage logic embedded in asset code
    azure_path = f"azure://datalake/bronze/v3_customer/ingestion_date={today}/"
    clear_today_partition(azure_path, except_run_id=context.run_id)

    df = extract_from_mysql("v3_customer", watermark=load_watermark())
    write_parquet_to_adls(df, f"{azure_path}/v3_customer_{context.run_id}.parquet")

    persist_watermark(df["updated_at"].max())

Every asset contains:

  • The ADLS connection string or path convention
  • Partition path construction (ingestion_date=YYYY-MM-DD/)
  • Idempotent-write logic (clear_today_partition)
  • Watermark persistence
  • Direct file I/O calls

This works, but it means:

  • Changing storage requires editing every asset. Moving from ADLS to NFS (or the reverse) touches 35+ files.
  • Partition logic is duplicated. Each asset builds its own ingestion_date= path string.
  • No Dagster materialisation metadata. If you write Parquet directly, Dagster's asset materialisation event doesn't capture the storage path, file size, or schema — just that the run succeeded.
  • Testing requires mocking storage. An asset that calls write_parquet_to_adls() can't be tested without ADLS credentials or a mock.

How IO managers solve these problems

Problem 1: Storage location is a code change

Without IO manager:

# Changing storage = editing 35 assets
write_parquet_to_adls(df, f"azure://datalake/bronze/{table}/...")
# becomes
df.write_parquet(f"/opt/dagster/shared-data/datalake/bronze/{table}/...")

With IO manager:

# Changing storage = editing 1 line in definitions.py
"polars_io_manager": PolarsParquetIOManager(
    base_dir="/opt/dagster/shared-data/datalake"  # ← this is the only change
)

Problem 2: Partition path logic is duplicated

Without IO manager, each asset constructs ingestion_date=YYYY-MM-DD/{table}_{run_id}.parquet. With IO manager, the partition key is resolved automatically:

@dg.asset(
    io_manager_key="polars_io_manager",
    partitions_def=dg.DailyPartitionsDefinition(start_date="2026-01-01"),
)
def bronze_v3_customer(context, v3: V3Resource) -> pl.DataFrame:
    # IO manager writes to {base_dir}/bronze_v3_customer/{partition_key}.parquet
    query = "SELECT * FROM v3_customer.customer"
    with v3.get_connection() as conn:
        return pl.read_database(query, conn)

The IO manager maps the Dagster partition key (2026-04-21) to the file path. No path construction in asset code.

Problem 3: Dagster doesn't know about the data

When an asset writes Parquet directly, Dagster records "this run succeeded" but doesn't know the output's location, size, or schema. With an IO manager, Dagster records materialisation metadata — the UI shows storage paths, row counts (if added to MaterializeResult.metadata), and the IO manager's type information.

Problem 4: Idempotent writes need manual implementation

The datalake's clear_today_partition(except_run_id=...) is a hand-built idempotency mechanism. PolarsParquetIOManager handles this by overwriting the partition file atomically — write to temp, rename. Retries produce the same file, not duplicates.


The IO manager gap: DuckDB direct scanning

There is one case where IO managers work against you. When DuckDB scans a large Parquet file directly, it applies predicates at the file level — reading only the columns and row groups it needs. If the IO manager loads the full file into a Polars DataFrame first, then passes it to DuckDB, you lose this optimisation.

For the datalake's bronze-to-silver transforms (e.g., filtering 75 GB of billing CDRs down to a partner's slice), this matters.

Options for bridging the gap

Option A: Accept the IO manager overhead for now. At current scale (even the 75 GB billing CDR table is split into daily partitions — each partition is 200–500 MB), loading a full partition into memory is fast. Optimise later when it becomes a bottleneck.

Option B: Use InputManager for large bronze inputs. Dagster supports custom input managers that control how upstream assets are loaded. A DuckDB-aware input manager could pass a file path instead of a DataFrame, letting the asset scan Parquet directly:

class DuckDBPathInputManager(dg.InputManager):
    def __init__(self, base_dir: str):
        self.base_dir = base_dir

    def load_input(self, context):
        asset_key = context.asset_key.path[-1]
        partition = context.partition_key
        return f"{self.base_dir}/{asset_key}/{partition}/*.parquet"

@dg.asset
def silver_fact_invoice(
    bronze_invoice: str,        # receives a path, not a DataFrame
    bronze_invoice_status: str,
    bronze_invoice_data: str,
) -> pl.DataFrame:
    return duckdb.sql(f"""
        SELECT i.*, s.status_label, d.payment_method
        FROM '{bronze_invoice}' i
        LEFT JOIN '{bronze_invoice_status}' s ON i.status_id = s.id
        LEFT JOIN '{bronze_invoice_data}' d ON i.invoice_id = d.invoice_id
    """).pl()

This keeps storage location externalised (the input manager owns the path convention) while giving DuckDB direct file access. The asset still doesn't contain a storage path — it receives one from the manager.

Option C: Hybrid — IO manager for small inputs, direct scan for large ones. Use PolarsParquetIOManager for dimension tables and small facts (millions of rows). Use a DuckDB path manager for large bronze tables (billions of rows). Different io_manager_key per asset group.


Recommendation for the merged stack

  1. Adopt PolarsParquetIOManager for all datalake assets. This is the immediate win — decouple storage from logic, get Dagster materialisation metadata, eliminate duplicated path/partition code.

  2. Use Dagster's DailyPartitionsDefinition instead of filesystem ingestion_date= conventions. Dagster manages the partition set, provides backfill UI, and resolves partition keys to file paths via the IO manager.

  3. Keep the datalake's watermark persistence as a separate mechanism (resource or asset metadata), not embedded in IO logic. Watermarks are extraction state, not storage state.

  4. Defer DuckDB direct-scan optimisation (Option B above) until a specific bronze table is too large for in-memory loading. At current volumes, Option A is sufficient.

  5. Preserve the datalake's idempotency contract by configuring the IO manager's write behaviour, not by hand-coding clear_today_partition. For append-only bronze, a custom IO manager subclass that writes {partition_key}/{run_id}.parquet (instead of overwriting) would preserve the immutability guarantee.