Code Examples

Seq to Dataset

//don't need to import ss.implicits anymore
import zio.spark.sql.implicits._

//implicits on sequence
val createPersonDs: URIO[SparkSession, Dataset[Person]] = Seq(Person("toto", 13)).toDS

Manage Error on Loads

//use of ZIO + effects composition to load a file or another if it doesn't work
val input: ZIO[SparkSession, Throwable, DataFrame] ="path1") orElse"path2")

Manage Analysis Errors

Analysis erros, are error returned by Spark (org.apache.spark.sql.AnalysisException) when something wrong happen building the data transformation. For example, if you select a field that do not exist, of it you try to cast as a type, for the Dataframe schema doesn't match.

In zio-spark, those potential errors are isolated, you can either :

  • recover from them
  • throws directly (as would apache spark do)
import zio.spark.sql.implicits._ //if not done already that the beginning of the file

//explicit error management + encoder on Person
def process1(input: DataFrame): Task[Long] =[Person] match {
case TryAnalysis.Failure(_) => input.filter("age >= 18").getOrThrow.count
case TryAnalysis.Success(ds) => ds.count

//less error management
def process2(input: DataFrame) = {
import zio.spark.sql.syntax.throwsAnalysisException._

val selectPerson:Dataset[Person] =[Person]


Cancellable Effects + Caching

Caching mechanism is from Spark, it's effecful, you need to call it before doing a computation to mark the dataset as something to be cached. CancellableEffect is an experimental feature from zio-spark, that allow to propagate an Interrupt to stop the job.

//cache, cancellable
def improveProcess(input: DataFrame): ZIO[SparkSession, Throwable, Long] =
for {
cached <- input.cache
p = CancellableEffect.makeItCancellable(process(cached))
//if the first job is stale, it launches a second job. When on of the jobs finish, it stops the remaining job
count <- p.race(p.delay(10.seconds))
} yield count