Skip to content

Document Processing Pipeline

Skill: databricks-ai-functions

A multi-stage document processing pipeline that parses PDFs and images, classifies document types, extracts structured fields, and matches entities against master data — all using SQL AI Functions inside a Spark Declarative Pipeline. Each stage uses the cheapest function that can handle the job: ai_parse_document for OCR, ai_classify for routing, ai_extract for flat fields, and ai_query only when you need nested JSON output.

“Write a SDP pipeline in Python that parses documents from a landing volume, classifies them, extracts invoice fields, and routes errors to a sidecar table.”

import dlt
import yaml
from pyspark.sql.functions import expr, col, from_json
CFG = yaml.safe_load(open("/Workspace/path/to/config.yml"))
ENDPOINT = CFG["models"]["default"]
PROMPT = CFG["prompts"]["extract_invoice"]
# Stage 1: Parse binary documents
@dlt.table(comment="Parsed document text from landing volume")
def raw_parsed():
return (
spark.read.format("binaryFile").load(CFG["volumes"]["input"])
.withColumn("parsed", expr("ai_parse_document(content)"))
.selectExpr(
"path",
"parsed:pages[*].elements[*].content AS text_blocks",
"parsed:error AS parse_error",
)
.filter("parse_error IS NULL")
)
# Stage 2: Classify document type
@dlt.table(comment="Document type classification")
def classified_docs():
return (
dlt.read("raw_parsed")
.withColumn(
"doc_type",
expr("ai_classify(text_blocks, array('invoice', 'purchase_order', 'receipt', 'contract', 'other'))")
)
)
# Stage 3: Extract structured fields
@dlt.table(comment="Invoice fields extracted with ai_query for nested JSON")
def extracted_docs():
return (
dlt.read("classified_docs")
.filter("doc_type = 'invoice'")
.withColumn(
"ai_response",
expr(f"""
ai_query(
'{ENDPOINT}',
concat('{PROMPT.strip()}', '\\n\\nDocument text:\\n', LEFT(text_blocks, 6000)),
responseFormat => '{{"type":"json_object"}}',
failOnError => false
)
""")
)
.withColumn(
"invoice",
from_json(
col("ai_response.response"),
"STRUCT<invoice_number:STRING, vendor_name:STRING, issue_date:STRING, "
"total_amount:DOUBLE, line_items:ARRAY<STRUCT<item_code:STRING, "
"description:STRING, quantity:DOUBLE, unit_price:DOUBLE, total:DOUBLE>>>"
)
)
.select("path", "doc_type", "invoice", col("ai_response.error").alias("extraction_error"))
)
# Stage 4: Success output
@dlt.table(comment="Processed documents ready for downstream consumption")
def processed_docs():
return dlt.read("extracted_docs").filter("extraction_error IS NULL")
# Stage 5: Error sidecar
@dlt.table(comment="Failed extractions for review and reprocess")
def processing_errors():
return (
dlt.read("extracted_docs")
.filter("extraction_error IS NOT NULL")
.select("path", "doc_type", col("extraction_error").alias("error"))
)

Key decisions:

  • ai_parse_document first, not ai_query — it handles OCR natively without requiring a model endpoint, and the output is structured for downstream extraction
  • ai_classify for routing, not ai_query — task-specific functions are faster and cheaper because they use optimized endpoints
  • ai_query only for nested JSONai_extract handles flat fields well, but cannot produce nested arrays like line_items. Reach for ai_query with responseFormat only when you need structured nesting
  • failOnError => false everywhere — a single malformed document should not kill the entire batch. Route errors to a sidecar table for retry
  • LEFT(text_blocks, 6000) caps input length — long documents silently truncate or exceed context windows. Truncate explicitly

“Write SQL to parse documents from a volume, explode elements into individual chunks, and store them in a table for downstream processing.”

CREATE OR REPLACE TABLE catalog.schema.parsed_chunks AS
WITH parsed AS (
SELECT
path,
ai_parse_document(content) AS doc
FROM read_files('/Volumes/catalog/schema/volume/docs/', format => 'binaryFile')
),
elements AS (
SELECT
path,
explode(variant_get(doc, '$.document.elements', 'ARRAY<VARIANT>')) AS element
FROM parsed
)
SELECT
md5(concat(path, variant_get(element, '$.content', 'STRING'))) AS chunk_id,
path AS source_path,
variant_get(element, '$.content', 'STRING') AS content,
variant_get(element, '$.type', 'STRING') AS element_type,
current_timestamp() AS parsed_at
FROM elements
WHERE variant_get(element, '$.content', 'STRING') IS NOT NULL
AND length(trim(variant_get(element, '$.content', 'STRING'))) > 10;

This pure-SQL approach is useful when you want to chunk documents for RAG or vector search without a full SDP pipeline. The length > 10 filter drops whitespace-only elements that add noise.

Streaming ingestion with incremental parsing

Section titled “Streaming ingestion with incremental parsing”

“Write PySpark to set up a streaming job that parses new documents as they land in a volume.”

from pyspark.sql.functions import col, current_timestamp, expr
files_df = (
spark.readStream.format("binaryFile")
.option("pathGlobFilter", "*.{pdf,jpg,jpeg,png}")
.option("recursiveFileLookup", "true")
.load("/Volumes/catalog/schema/volume/docs/")
)
parsed_df = (
files_df
.repartition(8, expr("crc32(path) % 8"))
.withColumn("parsed", expr("ai_parse_document(content)"))
.withColumn("parsed_at", current_timestamp())
.select("path", "parsed", "parsed_at")
)
(
parsed_df.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", "/Volumes/catalog/schema/checkpoints/parse")
.trigger(availableNow=True)
.toTable("catalog.schema.parsed_documents_raw")
)

The repartition call distributes documents across partitions to parallelize ai_parse_document calls. Without it, a single partition processes all files sequentially. trigger(availableNow=True) processes all pending files then stops — useful for scheduled batch jobs that run on a cadence.

“Write SQL to match extracted vendor names against a master data table using fuzzy similarity scoring.”

SELECT
e.path,
e.invoice.vendor_name AS extracted_vendor,
m.vendor_id,
m.vendor_name AS master_vendor,
ai_similarity(e.invoice.vendor_name, m.vendor_name) AS match_score
FROM catalog.schema.processed_docs e
CROSS JOIN catalog.schema.vendor_master m
WHERE ai_similarity(e.invoice.vendor_name, m.vendor_name) > 0.80
ORDER BY match_score DESC;

ai_similarity returns a 0-1 score based on semantic similarity. A threshold of 0.80 catches common variations (“Acme Corp” vs “ACME Corporation”) while filtering noise. For large vendor tables, pre-filter with a LIKE clause before running similarity to reduce cross-join cardinality.

  • Passing raw binary to ai_query produces garbage — always parse documents with ai_parse_document first, then feed the extracted text to ai_query or ai_extract. Binary content is not text.
  • ai_extract cannot handle nested arrays — it returns flat key-value pairs. For invoice line items, order details, or any array-of-structs, you must use ai_query with responseFormat.
  • ai_parse_document is the bottleneck — it is the slowest stage in the pipeline. Repartition before calling it to parallelize across executors, and use trigger(availableNow=True) for batch scheduling.
  • Prompt drift across config changes — externalizing prompts to a config.yml makes them versionable and testable. Hardcoded prompts scattered across pipeline stages are hard to audit and easy to break.