javascalaapache-sparkrecord-linkage

Effective record linkage


I've asked a bit similar question earlier today. Here it is. Shortly: I need to do record linkage for two large datasets (1.6M & 6M). I was going to use Sparks thinking that Cartesian product I was warned about would not be such a big problem. But it is. It hit the performance so hard that the linkage process didn't finish in 7 hours..

Is there another library/framework/tool for doing this more effectively? Or maybe improve performance of the solution below?

The code I ended up with:

    object App {
    
      def left(col: Column, n: Int) = {
        assert(n > 0)
        substring(col, 1, n)
      }
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .master("local[4]")
          .appName("MatchingApp")
          .getOrCreate()
    
        import spark.implicits._
    
        val a = spark.read
          .format("csv")
          .option("header", true)
          .option("delimiter", ";")
          .load("/home/helveticau/workstuff/a.csv")
          .withColumn("FULL_NAME", concat_ws(" ", col("FIRST_NAME"), col("LAST_NAME")))
          .withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "yyyy-MM-dd"))
    
        val b = spark.read
          .format("csv")
          .option("header", true)
          .option("delimiter", ";")
          .load("/home/helveticau/workstuff/b.txt")
          .withColumn("FULL_NAME", concat_ws(" ", col("FIRST_NAME"), col("LAST_NAME")))
          .withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "dd.MM.yyyy"))
    
        // @formatter:off
        val condition = a
          .col("FULL_NAME").contains(b.col("FIRST_NAME"))
          .and(a.col("FULL_NAME").contains(b.col("LAST_NAME")))
          .and(a.col("BIRTH_DATE").equalTo(b.col("BIRTH_DATE"))
            .or(a.col("STREET").startsWith(left(b.col("STR"), 3))))
        // @formatter:on
        val startMillis = System.currentTimeMillis();
        val res = a.join(b, condition, "left_outer")
        val count = res
          .filter(col("B_ID").isNotNull)
          .count()
        println(s"Count: $count")
        val executionTime = Duration.ofMillis(System.currentTimeMillis() - startMillis)
        println(s"Execution time: ${executionTime.toMinutes}m")
      }
    }

Probably the condition is too complicated, but it must be that way.


Solution

  • You may improve performance of your current solution by changing a bit the logic of how your perform your linkage:

    Your code could be rewritten as follow:

    import org.apache.spark.sql.functions.{col, substring, to_date}
    import org.apache.spark.sql.SparkSession
    
    import java.time.Duration
    
    object App {
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder()
          .master("local[4]")
          .appName("MatchingApp")
          .getOrCreate()
    
        val a = spark.read
          .format("csv")
          .option("header", true)
          .option("delimiter", ";")
          .load("/home/helveticau/workstuff/a.csv")
          .withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "yyyy-MM-dd"))
    
        val b = spark.read
          .format("csv")
          .option("header", true)
          .option("delimiter", ";")
          .load("/home/helveticau/workstuff/b.txt")
          .withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "dd.MM.yyyy"))
    
        val condition = a.col("BIRTH_DATE").equalTo(b.col("BIRTH_DATE"))
          .or(a.col("STREET").startsWith(substring(b.col("STR"), 1, 3)))
    
        val startMillis = System.currentTimeMillis();
        val res = a.join(b, Seq("LAST_NAME", "FIRST_NAME"))
          .filter(condition)
          // two following lines optional if you want to only keep records with not null B_ID
          .select("B_ID", "A_ID")
          .join(a, Seq("A_ID"), "right_outer") 
    
        val count = res
          .filter(col("B_ID").isNotNull)
          .count()
        println(s"Count: $count")
        val executionTime = Duration.ofMillis(System.currentTimeMillis() - startMillis)
        println(s"Execution time: ${executionTime.toMinutes}m")
      }
    }
    

    So you will avoid cartesian product at the price of two joins instead of only one.

    Example

    With file a.csv containing the following data:

    "A_ID";"FIRST_NAME";"LAST_NAME";"BIRTH_DATE";"STREET"
    10;John;Doe;1965-10-21;Johnson Road
    11;Rebecca;Davis;1977-02-27;Lincoln Road
    12;Samantha;Johns;1954-03-31;Main Street
    13;Roger;Penrose;1987-12-25;Oxford Street
    14;Robert;Smith;1981-08-26;Canergie Road
    15;Britney;Stark;1983-09-27;Alshire Road
    

    And b.txt having the following data:

    "B_ID";"FIRST_NAME";"LAST_NAME";"BIRTH_DATE";"STR"
    29;John;Doe;21.10.1965;Johnson Road
    28;Rebecca;Davis;28.03.1986;Lincoln Road
    27;Shirley;Iron;30.01.1956;Oak Street
    26;Roger;Penrose;25.12.1987;York Street
    25;Robert;Dayton;26.08.1956;Canergie Road
    24;Britney;Stark;22.06.1962;Algon Road
    

    res dataframe will be:

    +----+----+----------+---------+----------+-------------+
    |A_ID|B_ID|FIRST_NAME|LAST_NAME|BIRTH_DATE|STREET       |
    +----+----+----------+---------+----------+-------------+
    |10  |29  |John      |Doe      |1965-10-21|Johnson Road |
    |11  |28  |Rebecca   |Davis    |1977-02-27|Lincoln Road |
    |12  |null|Samantha  |Johns    |1954-03-31|Main Street  |
    |13  |26  |Roger     |Penrose  |1987-12-25|Oxford Street|
    |14  |null|Robert    |Smith    |1981-08-26|Canergie Road|
    |15  |null|Britney   |Stark    |1983-09-27|Alshire Road |
    +----+----+----------+---------+----------+-------------+
    

    Note: if your FIRST_NAME and LAST_NAME columns are not exactly the same, you can try to make them matches with Spark's built-in functions, for instance:

    • trim to remove spaces at start and end of string
    • lower to transform the column to lower case (and thus ignore case in comparison)

    What is really important is to have the maximum number of columns that exactly match.