us three weeks to ship a single data pipeline. Today, an analyst with zero Python experience does it in a day. Here’s how we got there.
I’m Kiril Kazlou, a data engineer at Mindbox. Our team regularly recalculates business metrics for clients — which means we’re constantly building data marts for billing and analytics, pulling from dozens of different sources.
For a long time, we relied on PySpark for all our data processing. The problem? You can’t really work with PySpark without Python experience. Every new pipeline required a developer. And that meant waiting — sometimes for weeks.
In this post, I’ll walk you through how we built an internal data platform where an analyst or product manager can spin up a regularly updated pipeline by writing just four YAML files.
Why PySpark Was Slowing Us Down
Let me illustrate the pain with a textbook example — calculating MAU (Monthly Active Users).
On the surface, this feels like a simple SQL job: COUNT(DISTINCT customerId) across a few tables over a time window. But because of all the infrastructure overhead — PySpark, Airflow DAG setup, Spark resource allocation, testing — we had to hand it off to developers. The result? A full week just to ship a MAU counter.
Every new metric took one to three weeks to deliver. And every single time, the process looked the same:
- An analyst defined the business requirements, found an available developer, and handed over the context.
- The developer clarified details, wrote PySpark code, went through code review, configured the DAG, and deployed.
What we actually wanted was for analysts and product managers — the people who understand the business logic best and are fluent in SQL and YAML — to handle this themselves. No Python. No PySpark.

What We Replaced PySpark With: YAML and SQL Are All You Need
To take a declarative approach, we split our data layer into three parts and picked the right tool for each:
- dlt (data load tool) — ingests data from external APIs and databases into object storage. Configured entirely through a YAML file. No code required.
- dbt (data build tool) on Trino — transforms data using pure SQL. It links models via
ref(), automatically builds a dependency graph, and handles incremental updates. - Airflow + Cosmos — orchestrates the pipelines. The Airflow DAG is auto-generated from
dag.yamland the dbt project.
We were already using Trino as a query engine for ad-hoc queries and had it plugged into Superset for BI. It had already proven itself: for queries with standard logic, it processed massive datasets faster and with fewer resources than Spark. On top of that, Trino natively supports federated access to multiple data stores from a single SQL query. For 90% of our pipelines, Trino was a perfect fit.

How We Load Data: dlt.yaml
The first YAML file describes where and how to load data for downstream processing. Here’s a real-world example — loading billing data from an internal API:
product: sg-team
feature: billing
schema: billing_tarification
dag:
dag_id: dlt_billing_tarification
schedule: "0 4 * * *"
description: "Daily refresh of tarification data"
tags:
- billing
alerts:
enabled: true
severity: warning
source:
type: rest_api
client:
base_url: "https://internal-api.example.com"
auth:
type: bearer
token: dlt-billing.token
resources:
- name: tarification_data
endpoint:
path: /tarificationData
method: POST
json:
firstPeriod: "{{ previous_month_date }}"
lastPeriod: "{{ previous_month_date }}"
pricingPlanLine: CurrentPlan
write_disposition: replace
processing_steps:
- map: dlt_custom.billing_tarification_data.map
- name: charges_raw
columns:
staffUserName:
data_type: text
nullable: true
endpoint:
path: /data-feed/charges
method: POST
json:
firstPeriod: "{{ previous_month_date }}"
lastPeriod: "{{ previous_month_date }}"
write_disposition: replace
- name: discounts_raw
endpoint:
path: /data-feed/discounts
method: POST
json:
firstPeriod: "{{ previous_month_date }}"
lastPeriod: "{{ previous_month_date }}"
write_disposition: replace
This config defines four resources from a single API. For each one, we specify the endpoint, request parameters, and a write strategy — in our case, replace means “overwrite every time.” You can also add processing steps, define column types, and configure alerts.
The entire config is 40 lines of YAML. Without dlt, each connector would be a Python script handling requests, pagination, retries, serialization to Delta Table format, and uploads to storage.
How We Transform Data With SQL: dbt_project.yaml and sources.yaml
The next step is configuring the dbt model. With Trino, that means SQL queries.
Here’s an example of how we set up the MAU calculation. This is what event preparation from a single source looks like:
-- int_mau_events_visits.sql (simplified)
{{ config(materialized='table') }}
WITH period AS (
-- Rolling window: last 5 months to current
SELECT
YEAR(CURRENT_DATE - INTERVAL '5' MONTH) AS start_year,
MONTH(CURRENT_DATE - INTERVAL '5' MONTH) AS start_month,
YEAR(CURRENT_DATE) AS end_year,
MONTH(CURRENT_DATE) AS end_month
),
events AS (
-- Pull visit events within the period window
SELECT src._tenant, src.unmergedCustomerId,
'visits' AS src_type, src.endpoint
FROM {{ source('final', 'customerstracking_visits') }} src
CROSS JOIN period p
WHERE src.unmergedCustomerId IS NOT NULL
AND /* ...timestamp filtering by year/month bounds... */
),
events_with_customer AS (
-- Resolve merged customer IDs
SELECT e._tenant,
COALESCE(mc.mergedCustomerId, e.unmergedCustomerId) AS customerId,
e.src_type, e.endpoint
FROM events e
LEFT JOIN {{ ref('int_merged_customers') }} mc
ON e._tenant = mc._tenant
AND e.unmergedCustomerId = mc.unmergedCustomerId
)
-- Keep only actual (non-deleted) customers
SELECT ewc._tenant, ewc.customerId, ewc.src_type, ewc.endpoint
FROM events_with_customer ewc
WHERE EXISTS (
SELECT 1 FROM {{ ref('int_actual_customers') }} ac
WHERE ewc._tenant = ac._tenant
AND ewc.customerId = ac.customerId
)
All 10 event sources follow the exact same pattern. The only differences are the source table and the filters. Then the models merge into a single stream:
-- int_mau_events.sql (union of all sources)
SELECT * FROM {{ ref('int_mau_events_inapps_targetings') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_inapps_clicks') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_visits') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_orders') }}
-- ...plus 6 more sources
And finally, the data mart where everything gets aggregated:
-- mau_period_datamart.sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key=['_tenant', 'start_year', 'start_month', 'end_year', 'end_month']
) }}
{%- set months_back = var('months_back', 5) | int -%}
WITH period AS (
SELECT
YEAR(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_year,
MONTH(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_month,
YEAR(CURRENT_DATE) AS end_year,
MONTH(CURRENT_DATE) AS end_month
),
events_resolved AS (
SELECT * FROM {{ ref('int_mau_events') }}
),
metrics_by_tenant AS (
SELECT
er._tenant,
COUNT(DISTINCT CASE WHEN src_type = 'visits'
THEN customerId END) AS CustomersTracking_Visits,
COUNT(DISTINCT CASE WHEN src_type = 'orders'
THEN customerId END) AS ProcessingOrders_Orders,
COUNT(DISTINCT CASE WHEN src_type = 'mailings'
THEN customerId END) AS Mailings_MessageStatuses,
-- ...other metrics
COUNT(DISTINCT customerId) AS MAU
FROM events_resolved er
GROUP BY er._tenant
)
SELECT m.*, p.start_year, p.start_month, p.end_year, p.end_month
FROM metrics_by_tenant m
CROSS JOIN period p
For the data mart configuration, we use incremental_strategy='merge'. dbt automatically generates the merge query, substituting the unique_key for upsert. No need to manually implement incremental loading.
To tie the models into a single project, we set up dbt_project.yaml:
name: mau_period
version: '1.0.0'
models:
mau_period:
+on_table_exists: replace
+on_schema_change: append_new_columns
And sources.yaml, which describes the input tables:
sources:
- name: final
database: data_platform
schema: final
tables:
- name: inapps_targetings_v2
- name: inapps_clicks_v2
- name: customerstracking_visits
- name: processingorders_orders
- name: cdp_mergedcustomers_v2
# ...
The result is the same business logic we had in PySpark, but in pure SQL: sources.yaml replaces typedspark schemas, {{ ref() }} and {{ source() }} replace .get_table(), and automatic execution order via the dependency graph replaces manual Spark resource tuning.
How We Configure Airflow: dag.yaml
The fourth configuration file defines when and how Airflow runs the pipeline:
product: sg-team
feature: billing
schema: mau
schedule: "15 21 * * *" # every day at 00:15 MSK
params:
- name: start_date
description: "Start date (YYYY-MM-DD). Leave empty for auto"
default: ""
- name: end_date
description: "End date (YYYY-MM-DD). Leave empty for auto"
default: ""
- name: months_back
description: "Months to look back (default: 5)"
default: 5
alerts:
enabled: true
severity: warning
Then our Python script parses dag.yaml and dbt_project.yaml and uses the Cosmos library to generate a fully functional Airflow DAG. This is the only piece of Python code in the entire setup. It’s written once and works for every dbt project. Here’s the key part:
def _build_dbt_project_dags(project_path: Path, environ: dict) -> list[DbtDag]:
config_dict = yaml.safe_load(dag_config_path.read_text())
config = DagConfig.model_validate(config_dict)
# YAML params → Airflow Params
params = {}
operator_vars = {}
for param in config.params:
params[param.name] = Param(
default=param.default if param.default is not None else "",
description=param.description,
)
operator_vars[param.name] = f"{{{{ params.{param.name} }}}}"
# Cosmos creates the DAG from the dbt project
with DbtDag(
dag_id=f"dbt_{project_path.name}",
schedule=config.schedule,
params=params,
project_config=ProjectConfig(dbt_project_path=project_path),
profile_config=ProfileConfig(
profile_name="default",
target_name=project_name,
profile_mapping=TrinoLDAPProfileMapping(
conn_id="trino_default",
profile_args={
"database": profile_database,
"schema": profile_schema,
},
),
),
operator_args={"vars": operator_vars},
) as dag:
# Create schema before running models
create_schema = SQLExecuteQueryOperator(
task_id="create_schema",
conn_id="trino_default",
sql=f"CREATE SCHEMA IF NOT EXISTS {profile_database}.{profile_schema} ...",
)
# Attach to root tasks
for unique_id, _ in dag.dbt_graph.filtered_nodes.items():
task = dag.tasks_map[unique_id]
if not task.upstream_task_ids:
create_schema >> task
Cosmos reads manifest.json from the dbt project, parses the model dependency graph, and creates a separate Airflow task for each model. Task dependencies are built automatically based on ref() calls in the SQL.
How Analysts Build Pipelines Without Developers
Now when an analyst needs a new recurring pipeline, they can put it together in a few steps:
Step 1. Create a folder in the repo: dbt-projects/my_new_pipeline/.
Step 2. If external data ingestion is needed, write a YAML config for dlt.
Step 3. Write SQL models in the models/ folder and describe the sources in sources.yaml.
Step 4. Create dbt_project.yaml and dag.yaml.
Step 5. Push to Git, go through review, merge.
CI/CD builds the dbt project and ships artifacts to S3. Airflow reads the DAG files from there, Cosmos parses the dbt project and generates the task graph. On schedule, dbt runs the models on Trino in the correct order. The end result is an updated data mart in the warehouse, accessible through Superset.
What Changed After the Migration

For analysts to build pipelines on their own, they need to understand ref() and source() concepts, the difference between table and incremental materialization, and the basics of Git. We ran a few internal workshops and put together step-by-step guides for each task type.
Why the New Stack Doesn’t Fully Replace PySpark
For about 10% of our pipelines, PySpark is still the only option — when a transformation simply doesn’t fit into SQL. dbt supports Jinja macros, but that’s no substitute for full-blown Python. And it would be dishonest to skip over the limitations of the new tools.
dlt + Delta: experimental upsert support. We use the Delta format in our storage layer. dlt’s Delta connector is marked as experimental, so the merge strategy didn’t work out of the box. We had to find workarounds — in some cases we used replace instead of merge (sacrificing incrementality), and in others we wrote custom processing_steps.
Trino’s limited fault tolerance. Trino does have a fault tolerance mechanism, but it works by writing intermediate results to S3. At our terabyte-scale data volumes, this is impractical — the sheer number of S3 operations makes it prohibitively expensive. Without fault tolerance enabled, if a Trino worker goes down, the entire query fails. Spark, by contrast, restarts just the failed task. We addressed this with DAG-level retries and by decomposing heavy models into chains of intermediate ones.
UDFs and custom logic. In Spark, you can write custom logic in Python right inside the pipeline — super convenient. With the new architecture, this is much harder. dbt on top of Trino doesn’t help: Jinja only generates SQL, and dbt’s Python models only work with Snowflake, Databricks, and BigQuery. You can write UDFs in Trino, but only in Java — with all the overhead that entails: a separate repo, a build pipeline, deploying JARs across all workers. So when a transformation doesn’t fit into SQL, you either end up with an unmaintainable SQL monster or a standalone script that breaks the lineage.
What’s Next: Tests, Model Templates, and Training
Better testing. We had solid pipeline testing in PySpark, but the new architecture is still catching up. Recent dbt versions introduced unit testing — you can now validate SQL model logic against mock data without spinning up the full pipeline. We want to add dbt tests both at the model level and as a separate monitoring layer.
Reusable templates for common patterns. Many of our dbt models look alike. A single config could describe a dozen models with the same pattern — only the source table and filters differ. We plan to extract the shared logic into dbt macros.
Expanding the platform’s user base. We want more engineers and analysts to work with data independently. We’re planning regular internal training sessions, documentation, and onboarding guides so new users can get up to speed quickly and start building their own models.
If your team is stuck in the same “analysts wait for developers” loop, I’d love to hear how you’re solving it. Connect with me on LinkedIn and let’s compare notes.
All images in this article are by the author unless otherwise noted.


