Document Processing Pipeline
Skill: databricks-ai-functions
What You Can Build
Section titled “What You Can Build”A multi-stage document processing pipeline that parses PDFs and images, classifies document types, extracts structured fields, and matches entities against master data — all using SQL AI Functions inside a Spark Declarative Pipeline. Each stage uses the cheapest function that can handle the job: ai_parse_document for OCR, ai_classify for routing, ai_extract for flat fields, and ai_query only when you need nested JSON output.
In Action
Section titled “In Action”“Write a SDP pipeline in Python that parses documents from a landing volume, classifies them, extracts invoice fields, and routes errors to a sidecar table.”
import dltimport yamlfrom pyspark.sql.functions import expr, col, from_json
CFG = yaml.safe_load(open("/Workspace/path/to/config.yml"))ENDPOINT = CFG["models"]["default"]PROMPT = CFG["prompts"]["extract_invoice"]
# Stage 1: Parse binary documents@dlt.table(comment="Parsed document text from landing volume")def raw_parsed(): return ( spark.read.format("binaryFile").load(CFG["volumes"]["input"]) .withColumn("parsed", expr("ai_parse_document(content)")) .selectExpr( "path", "parsed:pages[*].elements[*].content AS text_blocks", "parsed:error AS parse_error", ) .filter("parse_error IS NULL") )
# Stage 2: Classify document type@dlt.table(comment="Document type classification")def classified_docs(): return ( dlt.read("raw_parsed") .withColumn( "doc_type", expr("ai_classify(text_blocks, array('invoice', 'purchase_order', 'receipt', 'contract', 'other'))") ) )
# Stage 3: Extract structured fields@dlt.table(comment="Invoice fields extracted with ai_query for nested JSON")def extracted_docs(): return ( dlt.read("classified_docs") .filter("doc_type = 'invoice'") .withColumn( "ai_response", expr(f""" ai_query( '{ENDPOINT}', concat('{PROMPT.strip()}', '\\n\\nDocument text:\\n', LEFT(text_blocks, 6000)), responseFormat => '{{"type":"json_object"}}', failOnError => false ) """) ) .withColumn( "invoice", from_json( col("ai_response.response"), "STRUCT<invoice_number:STRING, vendor_name:STRING, issue_date:STRING, " "total_amount:DOUBLE, line_items:ARRAY<STRUCT<item_code:STRING, " "description:STRING, quantity:DOUBLE, unit_price:DOUBLE, total:DOUBLE>>>" ) ) .select("path", "doc_type", "invoice", col("ai_response.error").alias("extraction_error")) )
# Stage 4: Success output@dlt.table(comment="Processed documents ready for downstream consumption")def processed_docs(): return dlt.read("extracted_docs").filter("extraction_error IS NULL")
# Stage 5: Error sidecar@dlt.table(comment="Failed extractions for review and reprocess")def processing_errors(): return ( dlt.read("extracted_docs") .filter("extraction_error IS NOT NULL") .select("path", "doc_type", col("extraction_error").alias("error")) )Key decisions:
ai_parse_documentfirst, notai_query— it handles OCR natively without requiring a model endpoint, and the output is structured for downstream extractionai_classifyfor routing, notai_query— task-specific functions are faster and cheaper because they use optimized endpointsai_queryonly for nested JSON —ai_extracthandles flat fields well, but cannot produce nested arrays likeline_items. Reach forai_querywithresponseFormatonly when you need structured nestingfailOnError => falseeverywhere — a single malformed document should not kill the entire batch. Route errors to a sidecar table for retryLEFT(text_blocks, 6000)caps input length — long documents silently truncate or exceed context windows. Truncate explicitly
More Patterns
Section titled “More Patterns”Parse and chunk documents in pure SQL
Section titled “Parse and chunk documents in pure SQL”“Write SQL to parse documents from a volume, explode elements into individual chunks, and store them in a table for downstream processing.”
CREATE OR REPLACE TABLE catalog.schema.parsed_chunks ASWITH parsed AS ( SELECT path, ai_parse_document(content) AS doc FROM read_files('/Volumes/catalog/schema/volume/docs/', format => 'binaryFile')),elements AS ( SELECT path, explode(variant_get(doc, '$.document.elements', 'ARRAY<VARIANT>')) AS element FROM parsed)SELECT md5(concat(path, variant_get(element, '$.content', 'STRING'))) AS chunk_id, path AS source_path, variant_get(element, '$.content', 'STRING') AS content, variant_get(element, '$.type', 'STRING') AS element_type, current_timestamp() AS parsed_atFROM elementsWHERE variant_get(element, '$.content', 'STRING') IS NOT NULL AND length(trim(variant_get(element, '$.content', 'STRING'))) > 10;This pure-SQL approach is useful when you want to chunk documents for RAG or vector search without a full SDP pipeline. The length > 10 filter drops whitespace-only elements that add noise.
Streaming ingestion with incremental parsing
Section titled “Streaming ingestion with incremental parsing”“Write PySpark to set up a streaming job that parses new documents as they land in a volume.”
from pyspark.sql.functions import col, current_timestamp, expr
files_df = ( spark.readStream.format("binaryFile") .option("pathGlobFilter", "*.{pdf,jpg,jpeg,png}") .option("recursiveFileLookup", "true") .load("/Volumes/catalog/schema/volume/docs/"))
parsed_df = ( files_df .repartition(8, expr("crc32(path) % 8")) .withColumn("parsed", expr("ai_parse_document(content)")) .withColumn("parsed_at", current_timestamp()) .select("path", "parsed", "parsed_at"))
( parsed_df.writeStream.format("delta") .outputMode("append") .option("checkpointLocation", "/Volumes/catalog/schema/checkpoints/parse") .trigger(availableNow=True) .toTable("catalog.schema.parsed_documents_raw"))The repartition call distributes documents across partitions to parallelize ai_parse_document calls. Without it, a single partition processes all files sequentially. trigger(availableNow=True) processes all pending files then stops — useful for scheduled batch jobs that run on a cadence.
Fuzzy vendor matching with ai_similarity
Section titled “Fuzzy vendor matching with ai_similarity”“Write SQL to match extracted vendor names against a master data table using fuzzy similarity scoring.”
SELECT e.path, e.invoice.vendor_name AS extracted_vendor, m.vendor_id, m.vendor_name AS master_vendor, ai_similarity(e.invoice.vendor_name, m.vendor_name) AS match_scoreFROM catalog.schema.processed_docs eCROSS JOIN catalog.schema.vendor_master mWHERE ai_similarity(e.invoice.vendor_name, m.vendor_name) > 0.80ORDER BY match_score DESC;ai_similarity returns a 0-1 score based on semantic similarity. A threshold of 0.80 catches common variations (“Acme Corp” vs “ACME Corporation”) while filtering noise. For large vendor tables, pre-filter with a LIKE clause before running similarity to reduce cross-join cardinality.
Watch Out For
Section titled “Watch Out For”- Passing raw binary to
ai_queryproduces garbage — always parse documents withai_parse_documentfirst, then feed the extracted text toai_queryorai_extract. Binary content is not text. ai_extractcannot handle nested arrays — it returns flat key-value pairs. For invoice line items, order details, or any array-of-structs, you must useai_querywithresponseFormat.ai_parse_documentis the bottleneck — it is the slowest stage in the pipeline. Repartition before calling it to parallelize across executors, and usetrigger(availableNow=True)for batch scheduling.- Prompt drift across config changes — externalizing prompts to a
config.ymlmakes them versionable and testable. Hardcoded prompts scattered across pipeline stages are hard to audit and easy to break.