scalaapache-sparkapache-spark-sqlimplicit

how to call a class inside another Scala Object?


I have a class DFHelper which helps getting the dataframe keys. I want to maintain it as generic code and call it from another main scala object. E.g the first code section i am defining for generic class and the second code section is my main object. Inside main object i want to call the DFHelper class

package integration.utils
import org.apache.spark.sql.types.{ArrayType, StructType, TimestampType}
import org.apache.spark.sql.DataFrame


implicit class DFHelpers(df: DataFrame) {
  def fields: Seq[String] =
    this.fields(df.schema)

  def fields(
              schema: StructType = df.schema,
              root: String = "",
              sep: String = "."
            ): Seq[String] = {
    schema.fields.flatMap { column =>
      column match {
        case _ if column.dataType.isInstanceOf[StructType] =>
          fields(
            column.dataType.asInstanceOf[StructType],
            s"${root}${sep}`${column.name}`".stripPrefix(sep)
          )
        case _ if column.dataType.isInstanceOf[ArrayType] =>
          column.dataType
            .asInstanceOf[ArrayType]
            .productIterator
            .filter(_.isInstanceOf[StructType])
            .map(_.asInstanceOf[StructType])
            .flatMap(f => fields(f, s"${root}${sep}`${column.name}`".stripPrefix(sep)))
        case _ => Seq(s"${root}${sep}`${column.name}`".stripPrefix(sep))
      }
    }.toList
  }
};

Main scala

    package integration.scipts
    import org.apache.spark.sql.types.{ArrayType, StructType, TimestampType}
    import org.apache.spark.sql.{Dataset, Row, SparkSession}
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.functions.{array, col, explode, lit, struct,explode_outer,arrays_zip,current_timestamp,expr, typedLit, array_union}
    import integration.utils.DFHelpers

object mainDataProcessing  {
  /**
   * This is the method that Cradle will call when your job starts.
   */
  def execute(spark: SparkSession,
              input: Iterable[Dataset[Row]]): Dataset[Row] = {
input_df=spark.read_json("path");
val inputFieldMap = typedLit(inputDF.fields.map(f => f -> f).toMap);


}
}

Solution

  • I guess, by "calling class" you mean this code: inputDF.fields.

    You can import DFHelpers with

    import integration.utils.DFHelpers
    

    or just

    import integration.utils._
    

    Then inputDF.fields should compile.

    The compiler should desugar inputDF.fields into new DFHelpers(inputDF).fields automatically.

    https://docs.scala-lang.org/overviews/core/implicit-classes.html