Skip to content

MCP Approach

Skill: databricks-spark-declarative-pipelines

The MCP (Model Context Protocol) approach lets you create and run pipelines directly from your AI coding assistant without Asset Bundles. Ask it to create a pipeline and it will upload your files, call create_or_update_pipeline, run the pipeline, and verify the output tables — all in one conversation. This is the fastest path from idea to running pipeline for prototyping and experimentation.

“Create a serverless pipeline called my_orders_pipeline from local SQL and Python files, run it, and verify the output”

# Step 1: Upload pipeline files to the workspace
upload_folder(
local_folder="/path/to/my_pipeline",
workspace_folder="/Workspace/Users/user@example.com/my_pipeline"
)
# Step 2: Create the pipeline and start a run
result = create_or_update_pipeline(
name="my_orders_pipeline",
root_path="/Workspace/Users/user@example.com/my_pipeline",
catalog="my_catalog",
schema="my_schema",
workspace_file_paths=[
"/Workspace/Users/user@example.com/my_pipeline/bronze/ingest_orders.sql",
"/Workspace/Users/user@example.com/my_pipeline/silver/clean_orders.sql",
"/Workspace/Users/user@example.com/my_pipeline/gold/daily_summary.sql"
],
start_run=True
)

Key decisions:

  • MCP for prototyping, Asset Bundles for production — the MCP approach skips bundle configuration entirely. This is faster for experiments but lacks multi-environment deployment, CI/CD integration, and version control of pipeline settings.
  • workspace_file_paths lists every file — unlike Asset Bundles that use glob patterns, MCP requires explicit file paths. Add or remove files by updating this list.
  • start_run=True for immediate execution — the pipeline creates and runs in one call. Set to False if you want to inspect the config before running.

“Set up a local folder structure with SQL and Python files before uploading to the workspace”

my_pipeline/
├── bronze/
│ ├── ingest_orders.sql
│ └── ingest_events.py
├── silver/
│ └── clean_orders.sql
└── gold/
└── daily_summary.sql

SQL file (bronze/ingest_orders.sql):

CREATE OR REFRESH STREAMING TABLE bronze_orders
CLUSTER BY (order_date)
AS
SELECT
*,
current_timestamp() AS _ingested_at,
_metadata.file_path AS _source_file
FROM STREAM read_files(
'/Volumes/catalog/schema/raw/orders/',
format => 'json',
schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'
);

Python file (bronze/ingest_events.py):

from pyspark import pipelines as dp
from pyspark.sql.functions import col, current_timestamp
schema_location_base = spark.conf.get("schema_location_base")
@dp.table(name="bronze_events", cluster_by=["event_date"])
def bronze_events():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_events")
.load("/Volumes/catalog/schema/raw/events/")
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", col("_metadata.file_path"))
)

Write and test your SQL and Python files locally, then upload the entire folder in one call. The folder structure does not matter to the pipeline — it processes all files listed in workspace_file_paths.

“Check if the pipeline run succeeded and get detailed error information if it failed”

# Check the result
if result["success"]:
# Verify output tables exist and have data
stats = get_table_details(
catalog="my_catalog",
schema="my_schema",
table_names=["bronze_orders", "silver_orders", "gold_daily_summary"]
)
else:
# result["message"] includes suggested next steps
print(result["message"])
# Get detailed pipeline state and recent events
details = get_pipeline(pipeline_id=result["pipeline_id"])
print(details.get("recent_events"))

The create_or_update_pipeline response includes success, state, errors, and a human-readable message with suggested fixes. The get_pipeline tool provides deeper diagnostics including recent events and error stack traces. Always verify output tables after a successful run — a pipeline can complete with zero rows if the source path is wrong.

“Create a pipeline with development mode, continuous execution, and custom Spark configuration using extra_settings”

result = create_or_update_pipeline(
name="my_streaming_pipeline",
root_path="/Workspace/Users/user@example.com/my_pipeline",
catalog="my_catalog",
schema="my_schema",
workspace_file_paths=[...],
extra_settings={
"development": True,
"continuous": True,
"configuration": {
"spark.sql.shuffle.partitions": "auto",
"schema_location_base": "/Volumes/my_catalog/metadata/schemas"
},
"tags": {"environment": "development", "owner": "data-team"}
}
)

The extra_settings dict accepts any pipeline configuration parameter. Use it for development mode, continuous execution, pipeline-level Spark configs, custom tags, notifications, and cluster overrides. See the Advanced Configuration page for the full parameter reference.

“Show the other MCP tools available for managing pipelines after creation”

# Get pipeline status and recent events
details = get_pipeline(pipeline_id="abc-123")
# Run with options
run_pipeline(
pipeline_id="abc-123",
full_refresh=True,
wait_for_completion=True,
timeout=1800
)
# Stop a running pipeline
run_pipeline(pipeline_id="abc-123", stop=True)
# Validate without running
run_pipeline(pipeline_id="abc-123", validate_only=True)
# Delete when done prototyping
delete_pipeline(pipeline_id="abc-123")

The run_pipeline tool handles start, stop, validate, and full refresh. Use full_refresh=True when you change schema or need to reprocess all data. Use validate_only=True to check for configuration errors without consuming compute.

  • No version control for pipeline settings — MCP creates pipelines imperatively. If you lose the conversation, you lose the configuration. Export to Asset Bundles before going to production.
  • workspace_file_paths must be absolute — relative paths or local paths do not work. Always use the full /Workspace/Users/... path that matches where upload_folder placed the files.
  • Pipeline ID changes on recreate — if you delete and recreate a pipeline, the ID changes. Any external references (job triggers, monitoring dashboards) break. Update the pipeline in place with create_or_update_pipeline using the same name instead.
  • Timeout on large initial loads — the default timeout may not be enough for first-run full refreshes on large datasets. Set timeout=3600 (1 hour) or use wait_for_completion=False and poll with get_pipeline.