Type Conversion
Skill: spark-python-data-source
What You Can Build
Section titled “What You Can Build”Your custom data source sits between two type systems — the external API’s JSON/database types and Spark’s StructType schema. Type mismatches don’t throw errors; they silently produce null values. Your AI coding assistant can generate conversion helpers and schema mappings that catch these problems at the boundary instead of discovering them three tables downstream.
In Action
Section titled “In Action”“Map my REST API’s JSON response types to a Spark schema for my custom data source. The API returns strings, integers, floats, booleans, ISO timestamps, and nested objects. Use Python with
pyspark.sql.types.”
from pyspark.sql.types import ( StructType, StructField, StringType, IntegerType, LongType, FloatType, DoubleType, BooleanType, TimestampType, DateType)from datetime import datetime
def convert_external_to_spark(value, spark_type): """Convert an API response value to the target Spark type.""" if value is None: return None
if isinstance(spark_type, StringType): return str(value)
if isinstance(spark_type, BooleanType): if isinstance(value, bool): return value if isinstance(value, str): return value.lower() in ("true", "1", "yes") return bool(value)
if isinstance(spark_type, (IntegerType, LongType)): if isinstance(value, bool): raise ValueError("Cannot safely convert boolean to integer") return int(value)
if isinstance(spark_type, (FloatType, DoubleType)): if isinstance(value, bool): raise ValueError("Cannot safely convert boolean to float") return float(value)
if isinstance(spark_type, TimestampType): if isinstance(value, datetime): return value if isinstance(value, str): return datetime.fromisoformat(value.replace("Z", "+00:00")) raise ValueError(f"Cannot convert {type(value).__name__} to timestamp")
if isinstance(spark_type, DateType): if isinstance(value, str): return datetime.fromisoformat(value.replace("Z", "+00:00")).date() raise ValueError(f"Cannot convert {type(value).__name__} to date")
return valueKey decisions your AI coding assistant made:
StructTypeis the contract — the schema you declare in yourDataSource.schema()method is a promise. Every tuple yourread()yields must match it positionally, or Spark fills mismatched fields withnullwithout warning.- Explicit boolean guard — Python’s
boolis a subclass ofint, soint(True)returns1silently. The converter rejects this to prevent semantic data corruption whereTruebecomes1in an integer column. - ISO 8601 with Z-suffix handling — many APIs return timestamps ending in
Zinstead of+00:00. The.replace("Z", "+00:00")normalization preventsfromisoformatfrom failing on otherwise valid UTC timestamps.
More Patterns
Section titled “More Patterns”Convert Spark rows to external system types for writes
Section titled “Convert Spark rows to external system types for writes”“Generate a converter that maps Spark row values to my target API’s expected types — UUIDs, timestamps, decimals, and IP addresses.”
import uuidfrom decimal import Decimal
def convert_spark_to_external(value, external_type): """Convert a Spark value to the target system's expected type.""" if value is None: return None
type_lower = external_type.lower()
if "uuid" in type_lower: return uuid.UUID(str(value))
if "timestamp" in type_lower: if isinstance(value, str): return datetime.fromisoformat(value.replace("Z", "+00:00")) return value
if "decimal" in type_lower: return Decimal(str(value))
if "inet" in type_lower: import ipaddress return ipaddress.ip_address(str(value))
return valueThis runs inside your write() method on executors. Each row comes in as Spark types; the external system expects its own types. The converter bridges the gap per-field.
Infer a Spark schema from a sample API response
Section titled “Infer a Spark schema from a sample API response”“Generate a schema inference function that samples one API response and builds a
StructTypefrom the JSON field types.”
def infer_schema_from_sample(sample_record): """Infer a StructType from a single JSON record.""" from pyspark.sql.types import StructType, StructField
fields = [] for key, value in sample_record.items(): spark_type = _infer_spark_type(value) fields.append(StructField(key, spark_type, nullable=True))
return StructType(fields)
def _infer_spark_type(value): """Map a Python value to a Spark type.""" if isinstance(value, bool): return BooleanType() if isinstance(value, int): return LongType() if isinstance(value, float): return DoubleType() if isinstance(value, datetime): return TimestampType() # Default to string — safe fallback return StringType()Schema inference is convenient for prototyping, but in production you should declare the schema explicitly. Inferred schemas change when the API adds a field, which breaks downstream consumers silently.
Handle JSON serialization for writes
Section titled “Handle JSON serialization for writes”“Generate a JSON encoder that handles datetime, date, and Decimal types for my data source’s write path.”
import jsonfrom datetime import date, datetimefrom decimal import Decimal
class SparkJsonEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, (datetime, date)): return o.isoformat() if isinstance(o, Decimal): return float(o) return super().default(o)
# Usage inside write()def _send_batch(self, rows): import requests payload = json.dumps(rows, cls=SparkJsonEncoder) requests.post(self.url, data=payload, headers={"Content-Type": "application/json"}).raise_for_status()Python’s default json.dumps fails on datetime and Decimal objects. This encoder handles them transparently so your write path doesn’t need per-field serialization logic.
Watch Out For
Section titled “Watch Out For”- Silent nulls from type mismatches — if your schema says
IntegerTypebutread()yields a string for that position, Spark doesn’t error. It writesnull. You’ll only notice when downstream aggregations produce wrong results. Test your converter against real API responses. StringTypeas a safe fallback is a trap — mapping everything toStringTypeavoids conversion errors, but it pushes type handling to every downstream consumer. Use strict types for numeric and temporal fields; reserveStringTypefor genuinely unstructured data.- Boolean-integer confusion —
boolis a subclass ofintin Python.isinstance(True, int)returnsTrue. If your converter checksintbeforebool, booleans silently become integers. Always checkboolfirst. - Timezone-naive timestamps — if your API returns timestamps without timezone info and Spark’s session timezone differs, the same timestamp value gets interpreted differently. Normalize to UTC at the conversion boundary.