MCP Approach
Skill: databricks-spark-declarative-pipelines
What You Can Build
Section titled “What You Can Build”The MCP (Model Context Protocol) approach lets you create and run pipelines directly from your AI coding assistant without Asset Bundles. Ask it to create a pipeline and it will upload your files, call create_or_update_pipeline, run the pipeline, and verify the output tables — all in one conversation. This is the fastest path from idea to running pipeline for prototyping and experimentation.
In Action
Section titled “In Action”“Create a serverless pipeline called my_orders_pipeline from local SQL and Python files, run it, and verify the output”
# Step 1: Upload pipeline files to the workspaceupload_folder( local_folder="/path/to/my_pipeline", workspace_folder="/Workspace/Users/user@example.com/my_pipeline")
# Step 2: Create the pipeline and start a runresult = create_or_update_pipeline( name="my_orders_pipeline", root_path="/Workspace/Users/user@example.com/my_pipeline", catalog="my_catalog", schema="my_schema", workspace_file_paths=[ "/Workspace/Users/user@example.com/my_pipeline/bronze/ingest_orders.sql", "/Workspace/Users/user@example.com/my_pipeline/silver/clean_orders.sql", "/Workspace/Users/user@example.com/my_pipeline/gold/daily_summary.sql" ], start_run=True)Key decisions:
- MCP for prototyping, Asset Bundles for production — the MCP approach skips bundle configuration entirely. This is faster for experiments but lacks multi-environment deployment, CI/CD integration, and version control of pipeline settings.
workspace_file_pathslists every file — unlike Asset Bundles that use glob patterns, MCP requires explicit file paths. Add or remove files by updating this list.start_run=Truefor immediate execution — the pipeline creates and runs in one call. Set toFalseif you want to inspect the config before running.
More Patterns
Section titled “More Patterns”Write pipeline files locally first
Section titled “Write pipeline files locally first”“Set up a local folder structure with SQL and Python files before uploading to the workspace”
my_pipeline/├── bronze/│ ├── ingest_orders.sql│ └── ingest_events.py├── silver/│ └── clean_orders.sql└── gold/ └── daily_summary.sqlSQL file (bronze/ingest_orders.sql):
CREATE OR REFRESH STREAMING TABLE bronze_ordersCLUSTER BY (order_date)ASSELECT *, current_timestamp() AS _ingested_at, _metadata.file_path AS _source_fileFROM STREAM read_files( '/Volumes/catalog/schema/raw/orders/', format => 'json', schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE');Python file (bronze/ingest_events.py):
from pyspark import pipelines as dpfrom pyspark.sql.functions import col, current_timestamp
schema_location_base = spark.conf.get("schema_location_base")
@dp.table(name="bronze_events", cluster_by=["event_date"])def bronze_events(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_events") .load("/Volumes/catalog/schema/raw/events/") .withColumn("_ingested_at", current_timestamp()) .withColumn("_source_file", col("_metadata.file_path")) )Write and test your SQL and Python files locally, then upload the entire folder in one call. The folder structure does not matter to the pipeline — it processes all files listed in workspace_file_paths.
Handle run results and errors
Section titled “Handle run results and errors”“Check if the pipeline run succeeded and get detailed error information if it failed”
# Check the resultif result["success"]: # Verify output tables exist and have data stats = get_table_details( catalog="my_catalog", schema="my_schema", table_names=["bronze_orders", "silver_orders", "gold_daily_summary"] )else: # result["message"] includes suggested next steps print(result["message"])
# Get detailed pipeline state and recent events details = get_pipeline(pipeline_id=result["pipeline_id"]) print(details.get("recent_events"))The create_or_update_pipeline response includes success, state, errors, and a human-readable message with suggested fixes. The get_pipeline tool provides deeper diagnostics including recent events and error stack traces. Always verify output tables after a successful run — a pipeline can complete with zero rows if the source path is wrong.
Add advanced settings
Section titled “Add advanced settings”“Create a pipeline with development mode, continuous execution, and custom Spark configuration using extra_settings”
result = create_or_update_pipeline( name="my_streaming_pipeline", root_path="/Workspace/Users/user@example.com/my_pipeline", catalog="my_catalog", schema="my_schema", workspace_file_paths=[...], extra_settings={ "development": True, "continuous": True, "configuration": { "spark.sql.shuffle.partitions": "auto", "schema_location_base": "/Volumes/my_catalog/metadata/schemas" }, "tags": {"environment": "development", "owner": "data-team"} })The extra_settings dict accepts any pipeline configuration parameter. Use it for development mode, continuous execution, pipeline-level Spark configs, custom tags, notifications, and cluster overrides. See the Advanced Configuration page for the full parameter reference.
Pipeline lifecycle management
Section titled “Pipeline lifecycle management”“Show the other MCP tools available for managing pipelines after creation”
# Get pipeline status and recent eventsdetails = get_pipeline(pipeline_id="abc-123")
# Run with optionsrun_pipeline( pipeline_id="abc-123", full_refresh=True, wait_for_completion=True, timeout=1800)
# Stop a running pipelinerun_pipeline(pipeline_id="abc-123", stop=True)
# Validate without runningrun_pipeline(pipeline_id="abc-123", validate_only=True)
# Delete when done prototypingdelete_pipeline(pipeline_id="abc-123")The run_pipeline tool handles start, stop, validate, and full refresh. Use full_refresh=True when you change schema or need to reprocess all data. Use validate_only=True to check for configuration errors without consuming compute.
Watch Out For
Section titled “Watch Out For”- No version control for pipeline settings — MCP creates pipelines imperatively. If you lose the conversation, you lose the configuration. Export to Asset Bundles before going to production.
workspace_file_pathsmust be absolute — relative paths or local paths do not work. Always use the full/Workspace/Users/...path that matches whereupload_folderplaced the files.- Pipeline ID changes on recreate — if you delete and recreate a pipeline, the ID changes. Any external references (job triggers, monitoring dashboards) break. Update the pipeline in place with
create_or_update_pipelineusing the same name instead. - Timeout on large initial loads — the default timeout may not be enough for first-run full refreshes on large datasets. Set
timeout=3600(1 hour) or usewait_for_completion=Falseand poll withget_pipeline.