Complete Examples
Skill: databricks-jobs
What You Can Build
Section titled “What You Can Build”These are complete, production-ready job definitions you can adapt to your own pipelines. Each example combines task dependencies, scheduling, notifications, and compute configuration into a single deployable unit — either as a DABs YAML file or a Python SDK script.
In Action
Section titled “In Action”“Build a DABs job definition for a daily ETL pipeline with three parallel extract tasks, a transform task that waits for all extracts, a load step, and a data quality validation. Use a shared job cluster, schedule it at 6 AM UTC, and notify the team on failure.”
resources: jobs: daily_etl: name: "[${bundle.target}] Daily ETL Pipeline"
schedule: quartz_cron_expression: "0 0 6 * * ?" timezone_id: "UTC" pause_status: ${if(bundle.target == "prod", "UNPAUSED", "PAUSED")}
parameters: - name: load_date default: "{{start_date}}" - name: env default: "${bundle.target}"
job_clusters: - job_cluster_key: etl_cluster new_cluster: spark_version: "15.4.x-scala2.12" node_type_id: "i3.xlarge" num_workers: 4
email_notifications: on_failure: - "data-team@example.com"
tasks: - task_key: extract_orders job_cluster_key: etl_cluster notebook_task: notebook_path: ../src/extract_orders.py base_parameters: load_date: "{{job.parameters.load_date}}"
- task_key: extract_customers job_cluster_key: etl_cluster notebook_task: notebook_path: ../src/extract_customers.py
- task_key: extract_products job_cluster_key: etl_cluster notebook_task: notebook_path: ../src/extract_products.py
- task_key: transform depends_on: - task_key: extract_orders - task_key: extract_customers - task_key: extract_products job_cluster_key: etl_cluster notebook_task: notebook_path: ../src/transform.py
- task_key: load depends_on: - task_key: transform job_cluster_key: etl_cluster notebook_task: notebook_path: ../src/load.py
- task_key: validate depends_on: - task_key: load run_if: ALL_SUCCESS job_cluster_key: etl_cluster notebook_task: notebook_path: ../src/validate.pyKey decisions:
- Shared
job_clusters— All tasks use the same ephemeral cluster, which starts once and is reused across the DAG. This eliminates per-task cluster startup overhead. - Parallel extracts — The three extract tasks have no
depends_on, so they run concurrently. The transform task fans in from all three. run_if: ALL_SUCCESSon the validate task means it only runs when every upstream task succeeds. UseALL_DONEinstead if you want validation to run even after partial failures.- Job parameters with
{{start_date}}resolve to the scheduled run time, giving each run a deterministic load date.
More Patterns
Section titled “More Patterns”Event-Driven Processing
Section titled “Event-Driven Processing”“Create a job triggered by file arrivals in S3 that autoscales based on incoming volume.”
resources: jobs: process_uploads: name: "[${bundle.target}] Process Uploads"
trigger: pause_status: UNPAUSED file_arrival: url: "s3://data-lake/incoming/orders/" min_time_between_triggers_seconds: 300 wait_after_last_change_seconds: 60
health: rules: - metric: RUN_DURATION_SECONDS op: GREATER_THAN value: 1800
email_notifications: on_failure: - "data-alerts@example.com"
tasks: - task_key: process notebook_task: notebook_path: ../src/process_uploads.py new_cluster: spark_version: "15.4.x-scala2.12" node_type_id: "i3.xlarge" autoscale: min_workers: 2 max_workers: 10The 5-minute cooldown (min_time_between_triggers_seconds: 300) batches rapid file arrivals into a single run. Autoscale from 2 to 10 workers handles variable volume without manual intervention.
Python SDK ETL Pipeline
Section titled “Python SDK ETL Pipeline”“Build the same multi-task ETL pipeline using the Python SDK instead of YAML, in Python.”
from databricks.sdk import WorkspaceClientfrom databricks.sdk.service.jobs import ( Task, NotebookTask, TaskDependency, Source, CronSchedule, PauseStatus, JobEmailNotifications, JobCluster, ClusterSpec, JobParameterDefinition,)
w = WorkspaceClient()
job = w.jobs.create( name="Daily ETL Pipeline", schedule=CronSchedule( quartz_cron_expression="0 0 6 * * ?", timezone_id="UTC", pause_status=PauseStatus.UNPAUSED, ), parameters=[ JobParameterDefinition(name="load_date", default="{{start_date}}"), ], job_clusters=[ JobCluster( job_cluster_key="etl_cluster", new_cluster=ClusterSpec( spark_version="15.4.x-scala2.12", node_type_id="i3.xlarge", num_workers=4, ), ), ], email_notifications=JobEmailNotifications( on_failure=["data-team@example.com"], ), tasks=[ Task( task_key="extract", job_cluster_key="etl_cluster", notebook_task=NotebookTask( notebook_path="/Workspace/etl/extract", source=Source.WORKSPACE, ), ), Task( task_key="transform", depends_on=[TaskDependency(task_key="extract")], job_cluster_key="etl_cluster", notebook_task=NotebookTask( notebook_path="/Workspace/etl/transform", source=Source.WORKSPACE, ), ), Task( task_key="load", depends_on=[TaskDependency(task_key="transform")], job_cluster_key="etl_cluster", notebook_task=NotebookTask( notebook_path="/Workspace/etl/load", source=Source.WORKSPACE, ), ), ],)print(f"Created job: {job.job_id}")The SDK version is useful when you’re generating job definitions programmatically — for example, creating one job per client in a multi-tenant setup.
Cross-Job Orchestration
Section titled “Cross-Job Orchestration”“Create a master orchestrator job that chains an ingestion job, a transformation job, and a validation notebook in sequence.”
resources: jobs: ingestion: name: "[${bundle.target}] Data Ingestion" tasks: - task_key: ingest notebook_task: notebook_path: ../src/ingest.py
transformation: name: "[${bundle.target}] Data Transformation" tasks: - task_key: transform notebook_task: notebook_path: ../src/transform.py
orchestrator: name: "[${bundle.target}] Master Orchestrator" schedule: quartz_cron_expression: "0 0 1 * * ?" timezone_id: "UTC" pause_status: UNPAUSED tasks: - task_key: run_ingestion run_job_task: job_id: ${resources.jobs.ingestion.id} - task_key: run_transformation depends_on: - task_key: run_ingestion run_job_task: job_id: ${resources.jobs.transformation.id} - task_key: validate depends_on: - task_key: run_transformation notebook_task: notebook_path: ../src/validate.pyrun_job_task triggers another job as a subtask, which lets you compose smaller, independently testable jobs into a larger workflow. Each child job retains its own retry policy and notifications.
Parallel Region Processing
Section titled “Parallel Region Processing”“Create a job that discovers active regions, processes each one in parallel, then aggregates results.”
resources: jobs: region_processor: name: "[${bundle.target}] Region Processor" schedule: quartz_cron_expression: "0 0 8 * * ?" timezone_id: "UTC" pause_status: UNPAUSED tasks: - task_key: get_regions notebook_task: notebook_path: ../src/get_active_regions.py
- task_key: process_regions depends_on: - task_key: get_regions for_each_task: inputs: "{{tasks.get_regions.values.regions}}" concurrency: 10 task: task_key: process_region notebook_task: notebook_path: ../src/process_region.py base_parameters: region: "{{input}}"
- task_key: aggregate depends_on: - task_key: process_regions run_if: ALL_DONE notebook_task: notebook_path: ../src/aggregate_results.pyrun_if: ALL_DONE on the aggregate task ensures it runs even when some regions fail — you still get partial results and a summary of which regions need attention.
Watch Out For
Section titled “Watch Out For”- Using
existing_cluster_idin production — Shared clusters work for dev, but production jobs should usejob_clusters(ephemeral) ornew_cluster(per-task). Shared clusters create contention and make cost attribution impossible. - Missing
depends_onon fan-in tasks — If your transform task doesn’t list all extract tasks independs_on, it starts before all sources are ready. List every upstream task explicitly. - Hardcoding workspace paths — Use relative paths (
../src/notebook.py) in DABs andSource.WORKSPACEin the SDK. Hardcoded paths like/Users/me/notebookbreak when another user deploys the bundle. - Forgetting
run_ifon cleanup tasks — Tasks that send summary reports or clean up temp tables should userun_if: ALL_DONEso they execute regardless of upstream failures. The default (ALL_SUCCESS) skips them after any failure.