Skip to content

Testing Patterns

Skill: spark-python-data-source

A custom data source has two testing challenges: the reader/writer logic itself and the Spark integration layer that serializes it to executors. Your AI coding assistant can generate tests that validate both — unit tests with mocked HTTP responses for fast iteration, and integration tests with a real Spark session to catch serialization and registration issues.

“Generate unit tests for my custom data source that mock the external API and verify partition behavior. Use Python with pytest and unittest.mock. Include a shared Spark session fixture.”

import json
import pytest
from unittest.mock import patch, Mock
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
@pytest.fixture(scope="session")
def spark():
return SparkSession.builder \
.master("local[2]") \
.config("spark.sql.shuffle.partitions", "2") \
.getOrCreate()
@pytest.fixture
def sample_schema():
return StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True),
])
def test_data_source_name():
"""Registration name matches what .format() expects."""
assert RestApiDataSource.name() == "rest-api"
def test_missing_url_raises():
"""Fails fast when required option is missing."""
with pytest.raises(AssertionError, match="url is required"):
RestApiBatchReader({}, sample_schema)
def test_reader_creates_partitions(sample_schema):
"""Partition count matches expected data volume."""
with patch("requests.get") as mock_get:
mock_get.return_value = Mock(
json=Mock(return_value={"total": 400}),
status_code=200,
)
reader = RestApiBatchReader(
{"url": "http://api.test", "page_size": "100"}, sample_schema
)
partitions = reader.partitions()
assert len(partitions) == 5 # ceil(400/100) + 1
def test_writer_sends_batches(spark):
"""Writer batches rows and POSTs them."""
with patch("requests.post") as mock_post:
mock_post.return_value = Mock(status_code=200, raise_for_status=Mock())
df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
df.write.format("rest-api") \
.option("url", "http://api.test") \
.option("batch_size", "10") \
.save()
assert mock_post.called

Key decisions your AI coding assistant made:

  • Test the reader independently from Sparktest_reader_creates_partitions instantiates the reader directly and calls partitions(). This catches logic errors without the overhead of a Spark job.
  • Mock at the requests level — patching requests.get and requests.post isolates your tests from network dependencies. Tests run in milliseconds instead of seconds.
  • Integration test for the full registration pathtest_writer_sends_batches exercises df.write.format(...), which tests serialization, registration, and the Spark-to-executor pipeline end to end.
  • Session-scoped Spark fixture — creating a SparkSession is expensive (~3s). Sharing one across all tests in the session keeps the test suite fast.

“Generate tests that verify my stream reader’s offset management — initial offset, latest offset, and non-overlapping partitions.”

def test_initial_offset_is_valid_json():
reader = EventStreamReader(
{"url": "http://api.test", "start_time": "2025-01-01T00:00:00+00:00"},
sample_schema,
)
offset = reader.initialOffset()
data = json.loads(offset)
assert "ts" in data
def test_partitions_dont_overlap():
reader = EventStreamReader(
{"url": "http://api.test", "partition_duration": "3600",
"start_time": "2025-01-01T00:00:00+00:00"},
sample_schema,
)
start = reader.initialOffset()
end = json.dumps({"ts": "2025-01-01T06:00:00+00:00"})
partitions = reader.partitions(start, end)
for i in range(len(partitions) - 1):
assert partitions[i].end_time < partitions[i + 1].start_time, \
f"Partition {i} overlaps with {i+1}"
def test_latest_offset_advances():
reader = EventStreamReader(
{"url": "http://api.test", "start_time": "latest"}, sample_schema
)
offset1 = json.loads(reader.latestOffset())["ts"]
import time; time.sleep(0.01)
offset2 = json.loads(reader.latestOffset())["ts"]
assert offset2 >= offset1

Streaming bugs are notoriously hard to diagnose in production. Testing offset logic in isolation catches overlap and gap issues before they become duplicate or missing rows in your pipeline.

“Generate an integration test that writes data through my custom source to a real PostgreSQL instance using Testcontainers.”

from testcontainers.postgres import PostgresContainer
@pytest.fixture(scope="session")
def postgres():
with PostgresContainer("postgres:15") as container:
yield container
def test_write_roundtrip(spark, postgres):
"""Data written through the source is readable from the database."""
import psycopg2
conn = psycopg2.connect(postgres.get_connection_url())
cursor = conn.cursor()
cursor.execute(
"CREATE TABLE events (id INT PRIMARY KEY, name VARCHAR(100))"
)
conn.commit()
df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
df.write.format("rest-api") \
.option("url", postgres.get_connection_url()) \
.option("table", "events") \
.save()
cursor.execute("SELECT COUNT(*) FROM events")
assert cursor.fetchone()[0] == 2
conn.close()

Integration tests catch the problems unit tests miss: connection pooling issues, type coercion at the database boundary, and transaction semantics. Run them in CI but not on every save.

“Show me how to organize my test suite so I can run fast unit tests separately from slow integration tests.”

tests/
├── conftest.py # Shared Spark fixture
├── unit/
│ ├── test_reader.py # Reader logic with mocked HTTP
│ ├── test_writer.py # Writer logic with mocked HTTP
│ └── test_type_conversion.py # Conversion helpers
├── integration/
│ ├── test_read_roundtrip.py # Full read through Spark
│ └── test_streaming.py # Streaming offset lifecycle
└── performance/
└── test_throughput.py # Throughput benchmarks
Terminal window
uv run pytest tests/unit/ # Fast — seconds
uv run pytest tests/integration/ # Slow — needs containers
uv run pytest --cov=your_package # Coverage report

Unit tests run in seconds with mocked dependencies. Integration tests spin up real services. Keeping them separate means developers get fast feedback locally, and CI runs the full suite.

  • Testing against a shared Spark session with side effects — if one test registers a data source and another test assumes it’s not registered, you get order-dependent failures. Use scope="session" for the Spark fixture and register data sources in a conftest.py that runs once.
  • Mocking at the wrong level — patching requests.post at the module level works when the import is at the top of your test file. But your data source imports requests inside write(), so you need to patch it where it’s imported: patch("requests.post") at the top level, not patch("your_module.requests.post").
  • Skipping serialization tests — your reader works when you instantiate it directly, but fails when Spark ships it to an executor because it has an unpicklable attribute. Always include at least one test that goes through spark.read.format(...).load() to catch serialization issues.
  • Flaky time-based tests — tests that depend on datetime.now() can fail when the test runner is slow. Use fixed timestamps in offset tests and mock datetime.now() when testing latestOffset().