
Apache Spark performance and correctness often depend on understanding when work is planned vs when it is executed. Lazy evaluation is the mechanism that lets Spark build an execution plan before running it.
Explain the concept of lazy evaluation in Apache Spark (RDD/DataFrame/Dataset APIs).
In your answer, cover:
Assume the interviewer expects a clear mental model (DAG + trigger) and practical implications (debugging, performance, caching/checkpointing), not deep internals of Catalyst or Tungsten.
Transformations (e.g., map, filter, select) are lazy: they define a new dataset but do not execute immediately. Actions (e.g., count, collect, write) force Spark to materialize results, triggering a job.
rdd2 = rdd.filter(lambda x: x > 0) # transformation (lazy)
result = rdd2.count() # action (triggers execution)
Spark records each transformation as a node in a lineage graph (DAG) describing how to compute the final result from source data. This DAG is only executed when an action occurs, enabling Spark to plan the full computation end-to-end.
df2 = (df
.filter("country = 'US'")
.groupBy("user_id")
.count()) # DAG is built, not executed yet
Because Spark can see the entire chain of transformations before running, it can optimize the plan (reorder filters/projections, collapse operations, choose join strategies). This reduces I/O and shuffle and can significantly improve runtime.
df.select("a", "b").filter("a > 10")
# Spark can push filter earlier and prune columns in the physical plan
When an action triggers execution, Spark splits the DAG into stages separated by shuffle boundaries (wide dependencies like groupByKey/join). Each stage runs as parallel tasks over partitions; understanding where shuffles occur is key for performance.
# Wide transformation often implies shuffle boundary
rdd.mapValues(f).reduceByKey(g) # typically creates a shuffle stage
cache/persist are also lazy: they mark a dataset to be materialized on first action and reused later. checkpoint truncates lineage (after an action) to improve fault recovery costs and avoid recomputation from long DAGs.
df_cached = df.filter("x > 0").cache() # not cached yet
_ = df_cached.count() # first action materializes cache