dataframescalaapache-sparkapache-spark-datasetbloom-filter

Join dataset with case class spark scala


I am converting a dataframe into a dataset using case class which has a sequence of another case class

case class IdMonitor(id: String, ipLocation: Seq[IpLocation])
case class IpLocation(
    ip: String,
    ipVersion: Byte,
    ipType: String,
    city: String,
    state: String,
    country: String)

Now I have another dataset of strings that has just IPs. My requirement is to get all records from IpLocation if ipType == "home" or IP dataset has the given IP from ipLocation. I am trying to use bloom filter on the IP dataset to search through that dataset but it is inefficient and not working that well in general. I want to join the IP dataset with IpLocation but I'm having trouble since this is in a Seq. I'm very new to spark and scala so I'm probably missing something. Right now my code looks like this


def buildBloomFilter(Ips: Dataset[String]): BloomFilter[String] = {
    val count = Ips.count
    val bloomFilter = Ips.rdd
      .mapPartitions { iter =>
        val b = BloomFilter.optimallySized[String](count, FP_PROBABILITY)
        iter.foreach(i => b += i)
        Iterator(b)
      }
      .treeReduce(_|_)
    bloomFilter
  }

val ipBf = buildBloomFilter(Ips)
val ipBfBroadcast = spark.sparkContext.broadcast(ipBf)

idMonitor.map { x => 
    x.ipLocation.filter(
       x => x.ipType == "home" && ipBfBroadcast.value.contains(x.ip)
    )
}

I just want to figure out how to join IpLocation and Ips


Solution

  • Sample:

    Starting from your case class,

    case class IpLocation(
        ip: String,
        ipVersion: Byte,
        ipType: String,
        city: String,
        state: String,
        country: String
    )
    case class IdMonitor(id: String, ipLocation: Seq[IpLocation])
    

    I have defined the sample data as follows:

    val ip_locations1 = Seq(IpLocation("123.123.123.123", 12.toByte, "home", "test", "test", "test"), IpLocation("123.123.123.124", 12.toByte, "otherwise", "test", "test", "test"))
    val ip_locations2 = Seq(IpLocation("123.123.123.125", 13.toByte, "company", "test", "test", "test"), IpLocation("123.123.123.124", 13.toByte, "otherwise", "test", "test", "test"))
    
    val id_monitor = Seq(IdMonitor("1", ip_locations1), IdMonitor("2", ip_locations2))
    val df = id_monitor.toDF()
    df.show(false)
    
    +---+------------------------------------------------------------------------------------------------------+
    |id |ipLocation                                                                                            |
    +---+------------------------------------------------------------------------------------------------------+
    |1  |[{123.123.123.123, 12, home, test, test, test}, {123.123.123.124, 12, otherwise, test, test, test}]   |
    |2  |[{123.123.123.125, 13, company, test, test, test}, {123.123.123.124, 13, otherwise, test, test, test}]|
    +---+------------------------------------------------------------------------------------------------------+
    

    and the IPs:

    val ips = Seq("123.123.123.125")
    val df_ips = ips.toDF("ips")
    df_ips.show()
    
    +---------------+
    |            ips|
    +---------------+
    |123.123.123.125|
    +---------------+
    

    Join:

    From the above example data, explode the array of the IdMonitor and join with the IPs.

    df.withColumn("ipLocation", explode('ipLocation)).alias("a")
      .join(df_ips.alias("b"), col("a.ipLocation.ipType") === lit("home") || col("a.ipLocation.ip") === col("b.ips"), "inner")
      .select("ipLocation.*")
      .as[IpLocation].collect()
    

    Finally, the collected result is given as follows:

    res32: Array[IpLocation] = Array(IpLocation(123.123.123.123,12,home,test,test,test), IpLocation(123.123.123.125,13,company,test,test,test))