Sign Up to Our Newsletter

Be the first to know the latest tech updates

Uncategorized

4 YAML Files Instead of PySpark: How We Let Analysts Build Data Pipelines Without Engineers

4 YAML Files Instead of PySpark: How We Let Analysts Build Data Pipelines Without Engineers


us three weeks to ship a single data pipeline. Today, an analyst with zero Python experience does it in a day. Here’s how we got there.

I’m Kiril Kazlou, a data engineer at Mindbox. Our team regularly recalculates business metrics for clients — which means we’re constantly building data marts for billing and analytics, pulling from dozens of different sources.

For a long time, we relied on PySpark for all our data processing. The problem? You can’t really work with PySpark without Python experience. Every new pipeline required a developer. And that meant waiting — sometimes for weeks.

In this post, I’ll walk you through how we built an internal data platform where an analyst or product manager can spin up a regularly updated pipeline by writing just four YAML files.

Why PySpark Was Slowing Us Down

Let me illustrate the pain with a textbook example — calculating MAU (Monthly Active Users).

On the surface, this feels like a simple SQL job: COUNT(DISTINCT customerId) across a few tables over a time window. But because of all the infrastructure overhead — PySpark, Airflow DAG setup, Spark resource allocation, testing — we had to hand it off to developers. The result? A full week just to ship a MAU counter.

Every new metric took one to three weeks to deliver. And every single time, the process looked the same:

  1. An analyst defined the business requirements, found an available developer, and handed over the context.
  2. The developer clarified details, wrote PySpark code, went through code review, configured the DAG, and deployed.

What we actually wanted was for analysts and product managers — the people who understand the business logic best and are fluent in SQL and YAML — to handle this themselves. No Python. No PySpark.

Diagram of the old pipeline workflow: an analyst defines requirements and hands them off to a developer, who writes PySpark code, goes through code review, configures the Airflow DAG, and deploys. The full process takes one to three weeks.
How pipelines were built with PySpark

What We Replaced PySpark With: YAML and SQL Are All You Need

To take a declarative approach, we split our data layer into three parts and picked the right tool for each:

  • dlt (data load tool) — ingests data from external APIs and databases into object storage. Configured entirely through a YAML file. No code required.
  • dbt (data build tool) on Trino — transforms data using pure SQL. It links models via ref(), automatically builds a dependency graph, and handles incremental updates.
  • Airflow + Cosmos — orchestrates the pipelines. The Airflow DAG is auto-generated from dag.yaml and the dbt project.

We were already using Trino as a query engine for ad-hoc queries and had it plugged into Superset for BI. It had already proven itself: for queries with standard logic, it processed massive datasets faster and with fewer resources than Spark. On top of that, Trino natively supports federated access to multiple data stores from a single SQL query. For 90% of our pipelines, Trino was a perfect fit.

Diagram of the new pipeline workflow: an analyst writes YAML configs and SQL models directly. dbt and Trino handle execution automatically through Airflow. No developer involvement required. The full process takes one day.
After: analyst-owned pipelines with dbt + Trino

How We Load Data: dlt.yaml

The first YAML file describes where and how to load data for downstream processing. Here’s a real-world example — loading billing data from an internal API:

product: sg-team
feature: billing
schema: billing_tarification

dag:
  dag_id: dlt_billing_tarification
  schedule: "0 4 * * *"
  description: "Daily refresh of tarification data"
  tags:
    - billing

alerts:
  enabled: true
  severity: warning

source:
  type: rest_api
  client:
    base_url: "https://internal-api.example.com"
    auth:
      type: bearer
      token: dlt-billing.token
  resources:
    - name: tarification_data
      endpoint:
        path: /tarificationData
        method: POST
        json:
          firstPeriod: "{{ previous_month_date }}"
          lastPeriod: "{{ previous_month_date }}"
          pricingPlanLine: CurrentPlan
      write_disposition: replace
      processing_steps:
        - map: dlt_custom.billing_tarification_data.map

    - name: charges_raw
      columns:
        staffUserName:
          data_type: text
          nullable: true
      endpoint:
        path: /data-feed/charges
        method: POST
        json:
          firstPeriod: "{{ previous_month_date }}"
          lastPeriod: "{{ previous_month_date }}"
      write_disposition: replace

    - name: discounts_raw
      endpoint:
        path: /data-feed/discounts
        method: POST
        json:
          firstPeriod: "{{ previous_month_date }}"
          lastPeriod: "{{ previous_month_date }}"
      write_disposition: replace

This config defines four resources from a single API. For each one, we specify the endpoint, request parameters, and a write strategy — in our case, replace means “overwrite every time.” You can also add processing steps, define column types, and configure alerts.

The entire config is 40 lines of YAML. Without dlt, each connector would be a Python script handling requests, pagination, retries, serialization to Delta Table format, and uploads to storage.

How We Transform Data With SQL: dbt_project.yaml and sources.yaml

The next step is configuring the dbt model. With Trino, that means SQL queries.

Here’s an example of how we set up the MAU calculation. This is what event preparation from a single source looks like:

-- int_mau_events_visits.sql (simplified)
{{ config(materialized='table') }}

WITH period AS (
    -- Rolling window: last 5 months to current
    SELECT
        YEAR(CURRENT_DATE - INTERVAL '5' MONTH) AS start_year,
        MONTH(CURRENT_DATE - INTERVAL '5' MONTH) AS start_month,
        YEAR(CURRENT_DATE) AS end_year,
        MONTH(CURRENT_DATE) AS end_month
),

events AS (
    -- Pull visit events within the period window
    SELECT src._tenant, src.unmergedCustomerId,
           'visits' AS src_type, src.endpoint
    FROM {{ source('final', 'customerstracking_visits') }} src
    CROSS JOIN period p
    WHERE src.unmergedCustomerId IS NOT NULL
      AND /* ...timestamp filtering by year/month bounds... */
),

events_with_customer AS (
    -- Resolve merged customer IDs
    SELECT e._tenant,
           COALESCE(mc.mergedCustomerId, e.unmergedCustomerId) AS customerId,
           e.src_type, e.endpoint
    FROM events e
    LEFT JOIN {{ ref('int_merged_customers') }} mc
      ON e._tenant = mc._tenant
      AND e.unmergedCustomerId = mc.unmergedCustomerId
)

-- Keep only actual (non-deleted) customers
SELECT ewc._tenant, ewc.customerId, ewc.src_type, ewc.endpoint
FROM events_with_customer ewc
WHERE EXISTS (
    SELECT 1 FROM {{ ref('int_actual_customers') }} ac
    WHERE ewc._tenant = ac._tenant
      AND ewc.customerId = ac.customerId
)

All 10 event sources follow the exact same pattern. The only differences are the source table and the filters. Then the models merge into a single stream:

-- int_mau_events.sql (union of all sources)
SELECT * FROM {{ ref('int_mau_events_inapps_targetings') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_inapps_clicks') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_visits') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_orders') }}
-- ...plus 6 more sources

And finally, the data mart where everything gets aggregated:

-- mau_period_datamart.sql
{{ config(
    materialized='incremental',
    incremental_strategy='merge',
    unique_key=['_tenant', 'start_year', 'start_month', 'end_year', 'end_month']
) }}

{%- set months_back = var('months_back', 5) | int -%}

WITH period AS (
    SELECT
        YEAR(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_year,
        MONTH(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_month,
        YEAR(CURRENT_DATE) AS end_year,
        MONTH(CURRENT_DATE) AS end_month
),
events_resolved AS (
    SELECT * FROM {{ ref('int_mau_events') }}
),
metrics_by_tenant AS (
    SELECT
        er._tenant,
        COUNT(DISTINCT CASE WHEN src_type = 'visits'
              THEN customerId END) AS CustomersTracking_Visits,
        COUNT(DISTINCT CASE WHEN src_type = 'orders'
              THEN customerId END) AS ProcessingOrders_Orders,
        COUNT(DISTINCT CASE WHEN src_type = 'mailings'
              THEN customerId END) AS Mailings_MessageStatuses,
        -- ...other metrics
        COUNT(DISTINCT customerId) AS MAU
    FROM events_resolved er
    GROUP BY er._tenant
)
SELECT m.*, p.start_year, p.start_month, p.end_year, p.end_month
FROM metrics_by_tenant m
CROSS JOIN period p

For the data mart configuration, we use incremental_strategy='merge'. dbt automatically generates the merge query, substituting the unique_key for upsert. No need to manually implement incremental loading.

To tie the models into a single project, we set up dbt_project.yaml:

name: mau_period
version: '1.0.0'

models:
  mau_period:
    +on_table_exists: replace
    +on_schema_change: append_new_columns

And sources.yaml, which describes the input tables:

sources:
  - name: final
    database: data_platform
    schema: final
    tables:
      - name: inapps_targetings_v2
      - name: inapps_clicks_v2
      - name: customerstracking_visits
      - name: processingorders_orders
      - name: cdp_mergedcustomers_v2
      # ...

The result is the same business logic we had in PySpark, but in pure SQL: sources.yaml replaces typedspark schemas, {{ ref() }} and {{ source() }} replace .get_table(), and automatic execution order via the dependency graph replaces manual Spark resource tuning.

How We Configure Airflow: dag.yaml

The fourth configuration file defines when and how Airflow runs the pipeline:

product: sg-team
feature: billing
schema: mau
schedule: "15 21 * * *"  # every day at 00:15 MSK

params:
  - name: start_date
    description: "Start date (YYYY-MM-DD). Leave empty for auto"
    default: ""
  - name: end_date
    description: "End date (YYYY-MM-DD). Leave empty for auto"
    default: ""
  - name: months_back
    description: "Months to look back (default: 5)"
    default: 5

alerts:
  enabled: true
  severity: warning

Then our Python script parses dag.yaml and dbt_project.yaml and uses the Cosmos library to generate a fully functional Airflow DAG. This is the only piece of Python code in the entire setup. It’s written once and works for every dbt project. Here’s the key part:

def _build_dbt_project_dags(project_path: Path, environ: dict) -> list[DbtDag]:
    config_dict = yaml.safe_load(dag_config_path.read_text())
    config = DagConfig.model_validate(config_dict)

    # YAML params → Airflow Params
    params = {}
    operator_vars = {}
    for param in config.params:
        params[param.name] = Param(
            default=param.default if param.default is not None else "",
            description=param.description,
        )
        operator_vars[param.name] = f"{{{{ params.{param.name} }}}}"

    # Cosmos creates the DAG from the dbt project
    with DbtDag(
        dag_id=f"dbt_{project_path.name}",
        schedule=config.schedule,
        params=params,
        project_config=ProjectConfig(dbt_project_path=project_path),
        profile_config=ProfileConfig(
            profile_name="default",
            target_name=project_name,
            profile_mapping=TrinoLDAPProfileMapping(
                conn_id="trino_default",
                profile_args={
                    "database": profile_database,
                    "schema": profile_schema,
                },
            ),
        ),
        operator_args={"vars": operator_vars},
    ) as dag:
        # Create schema before running models
        create_schema = SQLExecuteQueryOperator(
            task_id="create_schema",
            conn_id="trino_default",
            sql=f"CREATE SCHEMA IF NOT EXISTS {profile_database}.{profile_schema} ...",
        )
        # Attach to root tasks
        for unique_id, _ in dag.dbt_graph.filtered_nodes.items():
            task = dag.tasks_map[unique_id]
            if not task.upstream_task_ids:
                create_schema >> task

Cosmos reads manifest.json from the dbt project, parses the model dependency graph, and creates a separate Airflow task for each model. Task dependencies are built automatically based on ref() calls in the SQL.

How Analysts Build Pipelines Without Developers

Now when an analyst needs a new recurring pipeline, they can put it together in a few steps:

Step 1. Create a folder in the repo: dbt-projects/my_new_pipeline/.

Step 2. If external data ingestion is needed, write a YAML config for dlt.

Step 3. Write SQL models in the models/ folder and describe the sources in sources.yaml.

Step 4. Create dbt_project.yaml and dag.yaml.

Step 5. Push to Git, go through review, merge.

CI/CD builds the dbt project and ships artifacts to S3. Airflow reads the DAG files from there, Cosmos parses the dbt project and generates the task graph. On schedule, dbt runs the models on Trino in the correct order. The end result is an updated data mart in the warehouse, accessible through Superset.

What Changed After the Migration

Before-and-after comparison showing pipeline delivery time dropping from one to three weeks under PySpark to one day with the YAML-based stack, and pipeline ownership shifting from developers to analysts.
What changed: from weeks to one day, from developers to analysts

For analysts to build pipelines on their own, they need to understand ref() and source() concepts, the difference between table and incremental materialization, and the basics of Git. We ran a few internal workshops and put together step-by-step guides for each task type.

Why the New Stack Doesn’t Fully Replace PySpark

For about 10% of our pipelines, PySpark is still the only option — when a transformation simply doesn’t fit into SQL. dbt supports Jinja macros, but that’s no substitute for full-blown Python. And it would be dishonest to skip over the limitations of the new tools.

dlt + Delta: experimental upsert support. We use the Delta format in our storage layer. dlt’s Delta connector is marked as experimental, so the merge strategy didn’t work out of the box. We had to find workarounds — in some cases we used replace instead of merge (sacrificing incrementality), and in others we wrote custom processing_steps.

Trino’s limited fault tolerance. Trino does have a fault tolerance mechanism, but it works by writing intermediate results to S3. At our terabyte-scale data volumes, this is impractical — the sheer number of S3 operations makes it prohibitively expensive. Without fault tolerance enabled, if a Trino worker goes down, the entire query fails. Spark, by contrast, restarts just the failed task. We addressed this with DAG-level retries and by decomposing heavy models into chains of intermediate ones.

UDFs and custom logic. In Spark, you can write custom logic in Python right inside the pipeline — super convenient. With the new architecture, this is much harder. dbt on top of Trino doesn’t help: Jinja only generates SQL, and dbt’s Python models only work with Snowflake, Databricks, and BigQuery. You can write UDFs in Trino, but only in Java — with all the overhead that entails: a separate repo, a build pipeline, deploying JARs across all workers. So when a transformation doesn’t fit into SQL, you either end up with an unmaintainable SQL monster or a standalone script that breaks the lineage.

What’s Next: Tests, Model Templates, and Training

Better testing. We had solid pipeline testing in PySpark, but the new architecture is still catching up. Recent dbt versions introduced unit testing — you can now validate SQL model logic against mock data without spinning up the full pipeline. We want to add dbt tests both at the model level and as a separate monitoring layer.

Reusable templates for common patterns. Many of our dbt models look alike. A single config could describe a dozen models with the same pattern — only the source table and filters differ. We plan to extract the shared logic into dbt macros.

Expanding the platform’s user base. We want more engineers and analysts to work with data independently. We’re planning regular internal training sessions, documentation, and onboarding guides so new users can get up to speed quickly and start building their own models.

If your team is stuck in the same “analysts wait for developers” loop, I’d love to hear how you’re solving it. Connect with me on LinkedIn and let’s compare notes.


All images in this article are by the author unless otherwise noted.



Source link

Team TeachToday

Team TeachToday

About Author

TechToday Logo

Your go-to destination for the latest in tech, AI breakthroughs, industry trends, and expert insights.

Get Latest Updates and big deals

Our expertise, as well as our passion for web design, sets us apart from other agencies.

Digitally Interactive  Copyright 2022-25 All Rights Reserved.