scalaapache-sparkscala-catsscala-implicitsapache-spark-encoders

How to implement Functor[Dataset]


I am struggling on how to create an instance of Functor[Dataset]... the problem is that when you map from A to B the Encoder[B] must be in the implicit scope but I am not sure how to do it.

implicit val datasetFunctor: Functor[Dataset] = new Functor[Dataset] {
    override def map[A, B](fa: Dataset[A])(f: A => B): Dataset[B] = fa.map(f)
  }

Of course this code is throwing a compilation error since Encoder[B] is not available but I can't add Encoder[B] as an implicit parameter because it would change the map method signature, how can I solve this?


Solution

  • You cannot apply f right away, because you are missing the Encoder. The only obvious direct solution would be: take cats and re-implement all the interfaces, adding an implict Encoder argument. I don't see any way to implement a Functor for Dataset directly.

    However maybe the following substitute solution is good enough. What you could do is to create a wrapper for the dataset, which has a map method without the implicit Encoder, but additionally has a method toDataset, which needs the Encoder in the very end.

    For this wrapper, you could apply a construction which is very similar to the so-called Coyoneda-construction (or Coyo? What do they call it today? I don't know...). It essentially is a way to implement a "free functor" for an arbitrary type constructor.

    Here is a sketch (it compiles with cats 1.0.1, replaced Spark traits by dummies):

    import scala.language.higherKinds
    import cats.Functor
    
    /** Dummy for spark-Encoder */
    trait Encoder[X]
    
    /** Dummy for spark-Dataset */
    trait Dataset[X] {
      def map[Y](f: X => Y)(implicit enc: Encoder[Y]): Dataset[Y]
    }
    
    /** Coyoneda-esque wrapper for `Dataset` 
      * that simply stashes all arguments to `map` away
      * until a concrete `Encoder` is supplied during the
      * application of `toDataset`.
      *
      * Essentially: the wrapped original dataset + concatenated
      * list of functions which have been passed to `map`.
      */
    abstract class MappedDataset[X] private () { self =>
      type B
      val base: Dataset[B]
      val path: B => X
      def toDataset(implicit enc: Encoder[X]): Dataset[X] = base map path
    
      def map[Y](f: X => Y): MappedDataset[Y] = new MappedDataset[Y] {
        type B = self.B
        val base = self.base
        val path: B => Y = f compose self.path
      }
    }
    
    object MappedDataset {
      /** Constructor for MappedDatasets.
        * 
        * Wraps a `Dataset` into a `MappedDataset` 
        */
      def apply[X](ds: Dataset[X]): MappedDataset[X] = new MappedDataset[X] {
        type B = X
        val base = ds
        val path = identity
      }
    
    }        
    
    object MappedDatasetFunctor extends Functor[MappedDataset] {
      /** Functorial `map` */
      def map[A, B](da: MappedDataset[A])(f: A => B): MappedDataset[B] = da map f
    }
    

    Now you can wrap a dataset ds into a MappedDataset(ds), then map it using the implicit MappedDatasetFunctor as long as you want, and then call toDataset in the very end, there you can supply a concrete Encoder for the final result.

    Note that this will combine all functions inside map into a single spark stage: it won't be able to save the intermediate results, because the Encoders for all intermediate steps are missing.


    I'm not quite there yet with studying cats, I cannot guarantee that this is the most idiomatic solution. Probably there is something Coyoneda-esque already in the library.

    EDIT: There is Coyoneda in the cats library, but it requires a natural transformation F ~> G to a functor G. Unfortunately, we don't have a Functor for Dataset (that was the problem in the first place). What my implementation above does is: instead of a Functor[G], it requires a single morphism of the (non-existent) natural transformation at a fixed X (this is what the Encoder[X] is).