Error Handling
Skill: spark-python-data-source
What You Can Build
Section titled “What You Can Build”External APIs fail — network timeouts, rate limits, transient 503s. Without error handling, a single flaky API response crashes your entire Spark job. Your AI coding assistant can generate retry logic with exponential backoff, circuit breakers that prevent cascading failures, and dead letter queues that capture failed records instead of losing them.
In Action
Section titled “In Action”“Add retry logic and error handling to my custom data source so it recovers from transient API failures. Use exponential backoff with jitter, respect Retry-After headers, and classify errors as retryable or fatal. Python with
requests.”
import timefrom requests.exceptions import Timeout, ConnectionError
class ResilientWriter: def __init__(self, options, schema=None): self.url = options.get("url") self.max_retries = int(options.get("max_retries", "5")) self.initial_backoff = float(options.get("initial_backoff", "1.0")) self.max_backoff = float(options.get("max_backoff", "60.0"))
def write(self, iterator): import requests # Executor-side import
for row in iterator: self._send_with_retry(requests, row.asDict())
def _send_with_retry(self, requests, data): for attempt in range(self.max_retries + 1): try: response = requests.post(self.url, json=data) response.raise_for_status() return except Exception as e: if not self._is_retryable(e): raise # Client errors, auth failures — no point retrying
if attempt >= self.max_retries: raise RuntimeError( f"Failed after {self.max_retries} retries: {e}" ) from e
# Respect Retry-After header if present backoff = self._get_backoff(e, attempt) time.sleep(backoff)
def _is_retryable(self, error): """Only retry transient errors. Never retry 4xx (except 429).""" if isinstance(error, (Timeout, ConnectionError)): return True if hasattr(error, "response") and error.response is not None: code = error.response.status_code return code == 429 or 500 <= code < 600 return False
def _get_backoff(self, error, attempt): """Calculate backoff, preferring Retry-After header.""" if hasattr(error, "response") and error.response is not None: retry_after = error.response.headers.get("Retry-After") if retry_after: try: return int(retry_after) except ValueError: pass return min(self.initial_backoff * (2 ** attempt), self.max_backoff)Key decisions your AI coding assistant made:
- Retries at the executor level, not the driver —
write()runs on executors. Retry logic must live there because the driver can’t intercept per-row failures happening on remote processes. - Exponential backoff with a cap — doubling from 1s gives you 1, 2, 4, 8, 16, 32, 60, 60… seconds. The cap prevents absurd waits while still giving the target system time to recover.
Retry-Afterheader takes priority — when an API tells you exactly how long to wait (429 responses), respect it. IgnoringRetry-Aftergets your IP blocked faster.- Raise on non-retryable errors immediately — a 400 (bad request) or 401 (auth failure) won’t succeed on retry. Failing fast surfaces the real problem instead of wasting 5 retry cycles.
More Patterns
Section titled “More Patterns”Circuit breaker to prevent cascading failures
Section titled “Circuit breaker to prevent cascading failures”“Add a circuit breaker to my data source writer so it stops hammering a failing API after 10 consecutive failures, waits 5 minutes, then tries again.”
from datetime import datetime, timedelta
class CircuitBreaker: def __init__(self, failure_threshold=10, recovery_timeout=300): self.threshold = failure_threshold self.timeout = recovery_timeout self.failures = 0 self.open_until = None
def check(self): if self.open_until is None: return # Circuit closed if datetime.now() >= self.open_until: self.failures = 0 self.open_until = None return # Half-open — allow one attempt raise RuntimeError( f"Circuit open — {self.failures} consecutive failures. " f"Retry after {self.open_until.isoformat()}" )
def record_success(self): self.failures = 0 self.open_until = None
def record_failure(self): self.failures += 1 if self.failures >= self.threshold: self.open_until = datetime.now() + timedelta(seconds=self.timeout)
class CircuitBreakerWriter: def __init__(self, options, schema=None): self.url = options.get("url") self.breaker = CircuitBreaker( failure_threshold=int(options.get("cb_threshold", "10")), recovery_timeout=int(options.get("cb_timeout", "300")), )
def write(self, iterator): import requests
for row in iterator: self.breaker.check() try: requests.post(self.url, json=row.asDict()).raise_for_status() self.breaker.record_success() except Exception as e: self.breaker.record_failure() raiseWithout a circuit breaker, a down API causes every executor to burn through retries on every row. The circuit breaker fails fast after a threshold, giving the target system breathing room to recover.
Dead letter queue for unrecoverable failures
Section titled “Dead letter queue for unrecoverable failures”“Add a dead letter queue to my writer so rows that fail after all retries get captured to a file instead of crashing the job.”
import jsonimport osfrom datetime import datetime
class DLQWriter: def __init__(self, options, schema=None): self.url = options.get("url") self.dlq_path = options.get("dlq_path") # e.g., /dbfs/dlq/my-source/
def write(self, iterator): import requests
for row in iterator: try: requests.post(self.url, json=row.asDict()).raise_for_status() except Exception as e: if self.dlq_path: self._write_to_dlq(row, e) else: raise # No DLQ configured — fail the task
def _write_to_dlq(self, row, error): record = { "timestamp": datetime.now().isoformat(), "error_type": type(error).__name__, "error": str(error), "row": row.asDict(), } os.makedirs(os.path.dirname(self.dlq_path), exist_ok=True) with open(self.dlq_path, "a") as f: f.write(json.dumps(record) + "\n")The dead letter queue captures enough context (timestamp, error type, original row) to diagnose and replay failures later. The trade-off: your job completes successfully even when some rows fail, so you need monitoring on the DLQ to catch data gaps.
Read with fallback to a secondary endpoint
Section titled “Read with fallback to a secondary endpoint”“Add fallback logic to my reader so it tries a secondary API endpoint when the primary is unreachable.”
class FallbackReader: def __init__(self, options, schema): self.primary_url = options.get("url") self.secondary_url = options.get("secondary_url")
def read(self, partition): import requests
try: response = requests.get(self.primary_url, params={ "start": partition.start, "end": partition.end }, timeout=10) response.raise_for_status() except (ConnectionError, Timeout) as e: if not self.secondary_url: raise response = requests.get(self.secondary_url, params={ "start": partition.start, "end": partition.end }, timeout=10) response.raise_for_status()
for item in response.json()["results"]: yield tuple(item.values())Fallback endpoints work for read-only data sources with replicated backends. For writes, fallback is riskier because you may end up with data split across two targets.
Watch Out For
Section titled “Watch Out For”- Retrying non-retryable errors — a 400 (bad request) or 403 (forbidden) will never succeed on retry. Burning through 5 attempts with exponential backoff delays the inevitable failure by minutes. Classify errors first, retry only transient ones.
- Retry logic on the driver instead of executors — wrapping
df.write.format(...).save()in a try/except on the driver only catches driver-side errors. Per-row API failures happen on executors insidewrite(). That’s where your retry logic must live. - Silent data loss with
continue_on_error— swallowing exceptions to keep the job running means you lose data without knowing it. If you skip failed rows, always write them to a dead letter queue and alert on DLQ growth. - No timeout on HTTP requests — a hung API connection keeps the executor blocked indefinitely. Always pass
timeout=torequests.get()andrequests.post(). 30 seconds is a reasonable default for most APIs.