Testing Patterns
Skill: spark-python-data-source
What You Can Build
Section titled “What You Can Build”A custom data source has two testing challenges: the reader/writer logic itself and the Spark integration layer that serializes it to executors. Your AI coding assistant can generate tests that validate both — unit tests with mocked HTTP responses for fast iteration, and integration tests with a real Spark session to catch serialization and registration issues.
In Action
Section titled “In Action”“Generate unit tests for my custom data source that mock the external API and verify partition behavior. Use Python with
pytestandunittest.mock. Include a shared Spark session fixture.”
import jsonimport pytestfrom unittest.mock import patch, Mockfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructType, StructField, IntegerType, StringType
@pytest.fixture(scope="session")def spark(): return SparkSession.builder \ .master("local[2]") \ .config("spark.sql.shuffle.partitions", "2") \ .getOrCreate()
@pytest.fixturedef sample_schema(): return StructType([ StructField("id", IntegerType(), False), StructField("name", StringType(), True), ])
def test_data_source_name(): """Registration name matches what .format() expects.""" assert RestApiDataSource.name() == "rest-api"
def test_missing_url_raises(): """Fails fast when required option is missing.""" with pytest.raises(AssertionError, match="url is required"): RestApiBatchReader({}, sample_schema)
def test_reader_creates_partitions(sample_schema): """Partition count matches expected data volume.""" with patch("requests.get") as mock_get: mock_get.return_value = Mock( json=Mock(return_value={"total": 400}), status_code=200, ) reader = RestApiBatchReader( {"url": "http://api.test", "page_size": "100"}, sample_schema ) partitions = reader.partitions() assert len(partitions) == 5 # ceil(400/100) + 1
def test_writer_sends_batches(spark): """Writer batches rows and POSTs them.""" with patch("requests.post") as mock_post: mock_post.return_value = Mock(status_code=200, raise_for_status=Mock()) df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"]) df.write.format("rest-api") \ .option("url", "http://api.test") \ .option("batch_size", "10") \ .save() assert mock_post.calledKey decisions your AI coding assistant made:
- Test the reader independently from Spark —
test_reader_creates_partitionsinstantiates the reader directly and callspartitions(). This catches logic errors without the overhead of a Spark job. - Mock at the
requestslevel — patchingrequests.getandrequests.postisolates your tests from network dependencies. Tests run in milliseconds instead of seconds. - Integration test for the full registration path —
test_writer_sends_batchesexercisesdf.write.format(...), which tests serialization, registration, and the Spark-to-executor pipeline end to end. - Session-scoped Spark fixture — creating a
SparkSessionis expensive (~3s). Sharing one across all tests in the session keeps the test suite fast.
More Patterns
Section titled “More Patterns”Test streaming offset logic
Section titled “Test streaming offset logic”“Generate tests that verify my stream reader’s offset management — initial offset, latest offset, and non-overlapping partitions.”
def test_initial_offset_is_valid_json(): reader = EventStreamReader( {"url": "http://api.test", "start_time": "2025-01-01T00:00:00+00:00"}, sample_schema, ) offset = reader.initialOffset() data = json.loads(offset) assert "ts" in data
def test_partitions_dont_overlap(): reader = EventStreamReader( {"url": "http://api.test", "partition_duration": "3600", "start_time": "2025-01-01T00:00:00+00:00"}, sample_schema, ) start = reader.initialOffset() end = json.dumps({"ts": "2025-01-01T06:00:00+00:00"})
partitions = reader.partitions(start, end)
for i in range(len(partitions) - 1): assert partitions[i].end_time < partitions[i + 1].start_time, \ f"Partition {i} overlaps with {i+1}"
def test_latest_offset_advances(): reader = EventStreamReader( {"url": "http://api.test", "start_time": "latest"}, sample_schema ) offset1 = json.loads(reader.latestOffset())["ts"] import time; time.sleep(0.01) offset2 = json.loads(reader.latestOffset())["ts"] assert offset2 >= offset1Streaming bugs are notoriously hard to diagnose in production. Testing offset logic in isolation catches overlap and gap issues before they become duplicate or missing rows in your pipeline.
Integration test with Testcontainers
Section titled “Integration test with Testcontainers”“Generate an integration test that writes data through my custom source to a real PostgreSQL instance using Testcontainers.”
from testcontainers.postgres import PostgresContainer
@pytest.fixture(scope="session")def postgres(): with PostgresContainer("postgres:15") as container: yield container
def test_write_roundtrip(spark, postgres): """Data written through the source is readable from the database.""" import psycopg2
conn = psycopg2.connect(postgres.get_connection_url()) cursor = conn.cursor() cursor.execute( "CREATE TABLE events (id INT PRIMARY KEY, name VARCHAR(100))" ) conn.commit()
df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"]) df.write.format("rest-api") \ .option("url", postgres.get_connection_url()) \ .option("table", "events") \ .save()
cursor.execute("SELECT COUNT(*) FROM events") assert cursor.fetchone()[0] == 2 conn.close()Integration tests catch the problems unit tests miss: connection pooling issues, type coercion at the database boundary, and transaction semantics. Run them in CI but not on every save.
Organize tests by speed
Section titled “Organize tests by speed”“Show me how to organize my test suite so I can run fast unit tests separately from slow integration tests.”
tests/├── conftest.py # Shared Spark fixture├── unit/│ ├── test_reader.py # Reader logic with mocked HTTP│ ├── test_writer.py # Writer logic with mocked HTTP│ └── test_type_conversion.py # Conversion helpers├── integration/│ ├── test_read_roundtrip.py # Full read through Spark│ └── test_streaming.py # Streaming offset lifecycle└── performance/ └── test_throughput.py # Throughput benchmarksuv run pytest tests/unit/ # Fast — secondsuv run pytest tests/integration/ # Slow — needs containersuv run pytest --cov=your_package # Coverage reportUnit tests run in seconds with mocked dependencies. Integration tests spin up real services. Keeping them separate means developers get fast feedback locally, and CI runs the full suite.
Watch Out For
Section titled “Watch Out For”- Testing against a shared Spark session with side effects — if one test registers a data source and another test assumes it’s not registered, you get order-dependent failures. Use
scope="session"for the Spark fixture and register data sources in aconftest.pythat runs once. - Mocking at the wrong level — patching
requests.postat the module level works when the import is at the top of your test file. But your data source importsrequestsinsidewrite(), so you need to patch it where it’s imported:patch("requests.post")at the top level, notpatch("your_module.requests.post"). - Skipping serialization tests — your reader works when you instantiate it directly, but fails when Spark ships it to an executor because it has an unpicklable attribute. Always include at least one test that goes through
spark.read.format(...).load()to catch serialization issues. - Flaky time-based tests — tests that depend on
datetime.now()can fail when the test runner is slow. Use fixed timestamps in offset tests and mockdatetime.now()when testinglatestOffset().