Skip to content

Authentication Patterns

Skill: spark-python-data-source

Every custom Spark data source that talks to an external API needs authentication, and the authentication code runs on executors — not the driver. Your AI coding assistant can generate credential resolution with priority-based fallback, token caching at the executor level, and secure secret handling so credentials never leak into logs or Spark UI.

“Add OAuth2 client-credentials authentication to my custom Spark data source with automatic token refresh. The token should be cached per executor and refreshed before expiry. Use Python with requests.”

from datetime import datetime, timedelta
class OAuth2Reader:
def __init__(self, options, schema):
self.url = options.get("url")
self.token_url = options["token_url"]
self.client_id = options["client_id"]
self.client_secret = options["client_secret"]
self.scope = options.get("scope", "")
self._token = None
self._token_expiry = None
def _get_valid_token(self):
"""Fetch or refresh the OAuth2 token."""
import requests # Executor-side import
if self._token and datetime.now() < self._token_expiry:
return self._token
response = requests.post(self.token_url, data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": self.scope,
})
response.raise_for_status()
token_data = response.json()
self._token = token_data["access_token"]
expires_in = token_data.get("expires_in", 3600)
# Refresh 60s before expiry to avoid edge-case failures
self._token_expiry = datetime.now() + timedelta(seconds=expires_in - 60)
return self._token
def read(self, partition):
import requests
headers = {"Authorization": f"Bearer {self._get_valid_token()}"}
response = requests.get(
self.url,
headers=headers,
params={"offset": partition.start, "limit": partition.end}
)
response.raise_for_status()
for item in response.json()["results"]:
yield tuple(item.values())

Key decisions your AI coding assistant made:

  • Token cached on the instance_token and _token_expiry live on self, so repeated read() calls within the same executor reuse the token instead of hitting the token endpoint per partition.
  • Refresh buffer — subtracts 60 seconds from expires_in to avoid requests failing with a token that expires mid-flight.
  • Executor-side requests import — the _get_valid_token() method runs on executors, so requests must be imported there.

“Generate an authentication factory for my data source that tries Unity Catalog credentials first, then managed identity, then service principal, then API key, then basic auth.”

def get_credential(options):
"""Resolve credential by priority. First match wins."""
# Priority 1: Databricks Unity Catalog credential
if options.get("databricks_credential"):
import databricks.service_credentials
return databricks.service_credentials.getServiceCredentialsProvider(
options["databricks_credential"]
)
# Priority 2: Cloud default credential (managed identity)
if options.get("default_credential", "false").lower() == "true":
from azure.identity import DefaultAzureCredential
return DefaultAzureCredential()
# Priority 3: Service principal
if all(k in options for k in ("tenant_id", "client_id", "client_secret")):
from azure.identity import ClientSecretCredential
return ClientSecretCredential(
tenant_id=options["tenant_id"],
client_id=options["client_id"],
client_secret=options["client_secret"],
)
# Priority 4: API key
if "api_key" in options:
return {"Authorization": f"Bearer {options['api_key']}"}
# Priority 5: Basic auth
if "username" in options and "password" in options:
from requests.auth import HTTPBasicAuth
return HTTPBasicAuth(options["username"], options["password"])
raise ValueError(
"No valid auth configured. Provide one of: "
"'databricks_credential', 'default_credential=true', "
"'tenant_id/client_id/client_secret', 'api_key', or 'username/password'"
)

The priority order matters: Unity Catalog credentials are preferred because they’re managed, rotated, and audited. API keys and passwords are fallback for systems that don’t support anything better.

“Add API key authentication to my data source reader using a requests.Session so connections are pooled across paginated reads.”

class ApiKeyReader:
def __init__(self, options, schema):
self.url = options.get("url")
self.api_key = options["api_key"]
def read(self, partition):
import requests
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {self.api_key}"})
page = partition.start
while page <= partition.end:
response = session.get(self.url, params={"page": page})
response.raise_for_status()
for item in response.json()["data"]:
yield tuple(item.values())
page += 1
session.close()

The Session object pools TCP connections, which cuts latency on paginated reads where you’re hitting the same host repeatedly.

“Make sure my data source never leaks credentials in Spark UI, logs, or __repr__ output.”

class SecureDataSource:
_SENSITIVE_KEYS = {"password", "api_key", "client_secret", "token", "access_token"}
def __init__(self, options):
self.options = options
self._safe_options = {
k: "***" if k.lower() in self._SENSITIVE_KEYS else v
for k, v in options.items()
}
def __repr__(self):
return f"SecureDataSource({self._safe_options})"

PySpark may call __repr__ when logging query plans. Without masking, your API keys show up in the Spark UI’s SQL tab and driver logs.

  • Authenticating on the driver, using on executors — if you fetch a short-lived token in __init__ (driver-side) and the token expires before executors start reading, every partition fails with 401. Fetch tokens inside read() or write() methods, or cache with a refresh buffer.
  • One token per partition — if your read() method calls the token endpoint on every invocation without caching, you’ll hit rate limits on the identity provider. Cache the token on self and check expiry before refreshing.
  • Storing secrets in .option() calls — passing raw secrets in notebook code means they’re visible in query plans. Use Databricks secrets (dbutils.secrets.get(scope, key)) to resolve credentials, and pass only the scope/key names as options.