Skip to content

Type Conversion

Skill: spark-python-data-source

Your custom data source sits between two type systems — the external API’s JSON/database types and Spark’s StructType schema. Type mismatches don’t throw errors; they silently produce null values. Your AI coding assistant can generate conversion helpers and schema mappings that catch these problems at the boundary instead of discovering them three tables downstream.

“Map my REST API’s JSON response types to a Spark schema for my custom data source. The API returns strings, integers, floats, booleans, ISO timestamps, and nested objects. Use Python with pyspark.sql.types.”

from pyspark.sql.types import (
StructType, StructField, StringType, IntegerType, LongType,
FloatType, DoubleType, BooleanType, TimestampType, DateType
)
from datetime import datetime
def convert_external_to_spark(value, spark_type):
"""Convert an API response value to the target Spark type."""
if value is None:
return None
if isinstance(spark_type, StringType):
return str(value)
if isinstance(spark_type, BooleanType):
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() in ("true", "1", "yes")
return bool(value)
if isinstance(spark_type, (IntegerType, LongType)):
if isinstance(value, bool):
raise ValueError("Cannot safely convert boolean to integer")
return int(value)
if isinstance(spark_type, (FloatType, DoubleType)):
if isinstance(value, bool):
raise ValueError("Cannot safely convert boolean to float")
return float(value)
if isinstance(spark_type, TimestampType):
if isinstance(value, datetime):
return value
if isinstance(value, str):
return datetime.fromisoformat(value.replace("Z", "+00:00"))
raise ValueError(f"Cannot convert {type(value).__name__} to timestamp")
if isinstance(spark_type, DateType):
if isinstance(value, str):
return datetime.fromisoformat(value.replace("Z", "+00:00")).date()
raise ValueError(f"Cannot convert {type(value).__name__} to date")
return value

Key decisions your AI coding assistant made:

  • StructType is the contract — the schema you declare in your DataSource.schema() method is a promise. Every tuple your read() yields must match it positionally, or Spark fills mismatched fields with null without warning.
  • Explicit boolean guard — Python’s bool is a subclass of int, so int(True) returns 1 silently. The converter rejects this to prevent semantic data corruption where True becomes 1 in an integer column.
  • ISO 8601 with Z-suffix handling — many APIs return timestamps ending in Z instead of +00:00. The .replace("Z", "+00:00") normalization prevents fromisoformat from failing on otherwise valid UTC timestamps.

Convert Spark rows to external system types for writes

Section titled “Convert Spark rows to external system types for writes”

“Generate a converter that maps Spark row values to my target API’s expected types — UUIDs, timestamps, decimals, and IP addresses.”

import uuid
from decimal import Decimal
def convert_spark_to_external(value, external_type):
"""Convert a Spark value to the target system's expected type."""
if value is None:
return None
type_lower = external_type.lower()
if "uuid" in type_lower:
return uuid.UUID(str(value))
if "timestamp" in type_lower:
if isinstance(value, str):
return datetime.fromisoformat(value.replace("Z", "+00:00"))
return value
if "decimal" in type_lower:
return Decimal(str(value))
if "inet" in type_lower:
import ipaddress
return ipaddress.ip_address(str(value))
return value

This runs inside your write() method on executors. Each row comes in as Spark types; the external system expects its own types. The converter bridges the gap per-field.

Infer a Spark schema from a sample API response

Section titled “Infer a Spark schema from a sample API response”

“Generate a schema inference function that samples one API response and builds a StructType from the JSON field types.”

def infer_schema_from_sample(sample_record):
"""Infer a StructType from a single JSON record."""
from pyspark.sql.types import StructType, StructField
fields = []
for key, value in sample_record.items():
spark_type = _infer_spark_type(value)
fields.append(StructField(key, spark_type, nullable=True))
return StructType(fields)
def _infer_spark_type(value):
"""Map a Python value to a Spark type."""
if isinstance(value, bool):
return BooleanType()
if isinstance(value, int):
return LongType()
if isinstance(value, float):
return DoubleType()
if isinstance(value, datetime):
return TimestampType()
# Default to string — safe fallback
return StringType()

Schema inference is convenient for prototyping, but in production you should declare the schema explicitly. Inferred schemas change when the API adds a field, which breaks downstream consumers silently.

“Generate a JSON encoder that handles datetime, date, and Decimal types for my data source’s write path.”

import json
from datetime import date, datetime
from decimal import Decimal
class SparkJsonEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, (datetime, date)):
return o.isoformat()
if isinstance(o, Decimal):
return float(o)
return super().default(o)
# Usage inside write()
def _send_batch(self, rows):
import requests
payload = json.dumps(rows, cls=SparkJsonEncoder)
requests.post(self.url, data=payload,
headers={"Content-Type": "application/json"}).raise_for_status()

Python’s default json.dumps fails on datetime and Decimal objects. This encoder handles them transparently so your write path doesn’t need per-field serialization logic.

  • Silent nulls from type mismatches — if your schema says IntegerType but read() yields a string for that position, Spark doesn’t error. It writes null. You’ll only notice when downstream aggregations produce wrong results. Test your converter against real API responses.
  • StringType as a safe fallback is a trap — mapping everything to StringType avoids conversion errors, but it pushes type handling to every downstream consumer. Use strict types for numeric and temporal fields; reserve StringType for genuinely unstructured data.
  • Boolean-integer confusionbool is a subclass of int in Python. isinstance(True, int) returns True. If your converter checks int before bool, booleans silently become integers. Always check bool first.
  • Timezone-naive timestamps — if your API returns timestamps without timezone info and Spark’s session timezone differs, the same timestamp value gets interpreted differently. Normalize to UTC at the conversion boundary.