Skip to content

Complete Examples

Skill: databricks-jobs

These are complete, production-ready job definitions you can adapt to your own pipelines. Each example combines task dependencies, scheduling, notifications, and compute configuration into a single deployable unit — either as a DABs YAML file or a Python SDK script.

“Build a DABs job definition for a daily ETL pipeline with three parallel extract tasks, a transform task that waits for all extracts, a load step, and a data quality validation. Use a shared job cluster, schedule it at 6 AM UTC, and notify the team on failure.”

resources:
jobs:
daily_etl:
name: "[${bundle.target}] Daily ETL Pipeline"
schedule:
quartz_cron_expression: "0 0 6 * * ?"
timezone_id: "UTC"
pause_status: ${if(bundle.target == "prod", "UNPAUSED", "PAUSED")}
parameters:
- name: load_date
default: "{{start_date}}"
- name: env
default: "${bundle.target}"
job_clusters:
- job_cluster_key: etl_cluster
new_cluster:
spark_version: "15.4.x-scala2.12"
node_type_id: "i3.xlarge"
num_workers: 4
email_notifications:
on_failure:
- "data-team@example.com"
tasks:
- task_key: extract_orders
job_cluster_key: etl_cluster
notebook_task:
notebook_path: ../src/extract_orders.py
base_parameters:
load_date: "{{job.parameters.load_date}}"
- task_key: extract_customers
job_cluster_key: etl_cluster
notebook_task:
notebook_path: ../src/extract_customers.py
- task_key: extract_products
job_cluster_key: etl_cluster
notebook_task:
notebook_path: ../src/extract_products.py
- task_key: transform
depends_on:
- task_key: extract_orders
- task_key: extract_customers
- task_key: extract_products
job_cluster_key: etl_cluster
notebook_task:
notebook_path: ../src/transform.py
- task_key: load
depends_on:
- task_key: transform
job_cluster_key: etl_cluster
notebook_task:
notebook_path: ../src/load.py
- task_key: validate
depends_on:
- task_key: load
run_if: ALL_SUCCESS
job_cluster_key: etl_cluster
notebook_task:
notebook_path: ../src/validate.py

Key decisions:

  • Shared job_clusters — All tasks use the same ephemeral cluster, which starts once and is reused across the DAG. This eliminates per-task cluster startup overhead.
  • Parallel extracts — The three extract tasks have no depends_on, so they run concurrently. The transform task fans in from all three.
  • run_if: ALL_SUCCESS on the validate task means it only runs when every upstream task succeeds. Use ALL_DONE instead if you want validation to run even after partial failures.
  • Job parameters with {{start_date}} resolve to the scheduled run time, giving each run a deterministic load date.

“Create a job triggered by file arrivals in S3 that autoscales based on incoming volume.”

resources:
jobs:
process_uploads:
name: "[${bundle.target}] Process Uploads"
trigger:
pause_status: UNPAUSED
file_arrival:
url: "s3://data-lake/incoming/orders/"
min_time_between_triggers_seconds: 300
wait_after_last_change_seconds: 60
health:
rules:
- metric: RUN_DURATION_SECONDS
op: GREATER_THAN
value: 1800
email_notifications:
on_failure:
- "data-alerts@example.com"
tasks:
- task_key: process
notebook_task:
notebook_path: ../src/process_uploads.py
new_cluster:
spark_version: "15.4.x-scala2.12"
node_type_id: "i3.xlarge"
autoscale:
min_workers: 2
max_workers: 10

The 5-minute cooldown (min_time_between_triggers_seconds: 300) batches rapid file arrivals into a single run. Autoscale from 2 to 10 workers handles variable volume without manual intervention.

“Build the same multi-task ETL pipeline using the Python SDK instead of YAML, in Python.”

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import (
Task, NotebookTask, TaskDependency, Source,
CronSchedule, PauseStatus, JobEmailNotifications,
JobCluster, ClusterSpec, JobParameterDefinition,
)
w = WorkspaceClient()
job = w.jobs.create(
name="Daily ETL Pipeline",
schedule=CronSchedule(
quartz_cron_expression="0 0 6 * * ?",
timezone_id="UTC",
pause_status=PauseStatus.UNPAUSED,
),
parameters=[
JobParameterDefinition(name="load_date", default="{{start_date}}"),
],
job_clusters=[
JobCluster(
job_cluster_key="etl_cluster",
new_cluster=ClusterSpec(
spark_version="15.4.x-scala2.12",
node_type_id="i3.xlarge",
num_workers=4,
),
),
],
email_notifications=JobEmailNotifications(
on_failure=["data-team@example.com"],
),
tasks=[
Task(
task_key="extract",
job_cluster_key="etl_cluster",
notebook_task=NotebookTask(
notebook_path="/Workspace/etl/extract",
source=Source.WORKSPACE,
),
),
Task(
task_key="transform",
depends_on=[TaskDependency(task_key="extract")],
job_cluster_key="etl_cluster",
notebook_task=NotebookTask(
notebook_path="/Workspace/etl/transform",
source=Source.WORKSPACE,
),
),
Task(
task_key="load",
depends_on=[TaskDependency(task_key="transform")],
job_cluster_key="etl_cluster",
notebook_task=NotebookTask(
notebook_path="/Workspace/etl/load",
source=Source.WORKSPACE,
),
),
],
)
print(f"Created job: {job.job_id}")

The SDK version is useful when you’re generating job definitions programmatically — for example, creating one job per client in a multi-tenant setup.

“Create a master orchestrator job that chains an ingestion job, a transformation job, and a validation notebook in sequence.”

resources:
jobs:
ingestion:
name: "[${bundle.target}] Data Ingestion"
tasks:
- task_key: ingest
notebook_task:
notebook_path: ../src/ingest.py
transformation:
name: "[${bundle.target}] Data Transformation"
tasks:
- task_key: transform
notebook_task:
notebook_path: ../src/transform.py
orchestrator:
name: "[${bundle.target}] Master Orchestrator"
schedule:
quartz_cron_expression: "0 0 1 * * ?"
timezone_id: "UTC"
pause_status: UNPAUSED
tasks:
- task_key: run_ingestion
run_job_task:
job_id: ${resources.jobs.ingestion.id}
- task_key: run_transformation
depends_on:
- task_key: run_ingestion
run_job_task:
job_id: ${resources.jobs.transformation.id}
- task_key: validate
depends_on:
- task_key: run_transformation
notebook_task:
notebook_path: ../src/validate.py

run_job_task triggers another job as a subtask, which lets you compose smaller, independently testable jobs into a larger workflow. Each child job retains its own retry policy and notifications.

“Create a job that discovers active regions, processes each one in parallel, then aggregates results.”

resources:
jobs:
region_processor:
name: "[${bundle.target}] Region Processor"
schedule:
quartz_cron_expression: "0 0 8 * * ?"
timezone_id: "UTC"
pause_status: UNPAUSED
tasks:
- task_key: get_regions
notebook_task:
notebook_path: ../src/get_active_regions.py
- task_key: process_regions
depends_on:
- task_key: get_regions
for_each_task:
inputs: "{{tasks.get_regions.values.regions}}"
concurrency: 10
task:
task_key: process_region
notebook_task:
notebook_path: ../src/process_region.py
base_parameters:
region: "{{input}}"
- task_key: aggregate
depends_on:
- task_key: process_regions
run_if: ALL_DONE
notebook_task:
notebook_path: ../src/aggregate_results.py

run_if: ALL_DONE on the aggregate task ensures it runs even when some regions fail — you still get partial results and a summary of which regions need attention.

  • Using existing_cluster_id in production — Shared clusters work for dev, but production jobs should use job_clusters (ephemeral) or new_cluster (per-task). Shared clusters create contention and make cost attribution impossible.
  • Missing depends_on on fan-in tasks — If your transform task doesn’t list all extract tasks in depends_on, it starts before all sources are ready. List every upstream task explicitly.
  • Hardcoding workspace paths — Use relative paths (../src/notebook.py) in DABs and Source.WORKSPACE in the SDK. Hardcoded paths like /Users/me/notebook break when another user deploys the bundle.
  • Forgetting run_if on cleanup tasks — Tasks that send summary reports or clean up temp tables should use run_if: ALL_DONE so they execute regardless of upstream failures. The default (ALL_SUCCESS) skips them after any failure.