sqlpandaspysparklevenshtein-distancesoundex

Fuzzy matching a string in in pyspark or SQL using Soundex function or Levenshtein distance


I had to apply Levenshtein Function on last column when passport and country are same.

matrix = passport_heck.select(\
                                  f.col('name_id').alias('name_id_1'),
                                  f.col('last').alias('last_1'),
                                  f.col('country').alias('country_1'),
                                  f.col('passport').alias('passport_1')) \
         .crossJoin(passport_heck.select(\
                                             f.col('name_id').alias('name_id_2'),
                                             f.col('last').alias('last_2'),
                                             f.col('country').alias('country_2'),
                                             f.col('passport').alias('passport_2')))\
          .filter((f.col('passport_1') == f.col('passport_2')) & (f.col('country_1') == f.col('country_2')))```



res = matrix.withColumn('distance', levenshtein(f.col('last_1'), f.col('last_2'))) 

Now I am getting the following output which is totally fine.

enter image description here

Now I need to delete duplicates pair (example ID 558635 with 1106562 then 1106562 with 558635 comparing same content).

Can anyone please give me some logic in pyspark to get below table.

enter image description here


Solution

  • Your problem can become quite complicated if you want to get it right, but here you have some sample code in pyspark that hopefully gets you started.

    First a tiny dataset,

    tinydata = sqlContext.createDataFrame(
        [
            (3527524, 'aamir', 'al malik', 'aamir.almalik@gmail.com'),
            (4287983, 'aamir', 'al malik', 'aamir.almalik@company.com'),
            (200490, 'aamir', 'al malik', 'aamir.almalik@gmail.come'),
            (1906639, 'tahir', 'al malik', 'tahir.almalik@gmail.com')
        ],
        ['ID', 'first_NAME', 'last_NAME', 'EMAIL']
    )
    

    Then you convert it to a matrix of differences through a cross-join. Note that if you have 5 million, this will become huge. You need to avoid comparisons as much as possible, such as following some of the comments to your question, and other ideas you may come up with. Note the final filter is to avoid comparing 2 rows twice.

    matrix = tinydata.select(F.col('ID').alias('ID1'), F.col('EMAIL').alias('EMAIL1')) \
        .crossJoin(tinydata.select(F.col('ID').alias('ID2'), F.col('EMAIL').alias('EMAIL2'))) \
        .filter(F.col('ID1') > F.col('ID2'))
    

    After that, you can calculate distances.

    def lev_dist(left, right):
        return Levenshtein.distance(left, right)
    
    lev_dist_udf = udf(lev_dist, IntegerType())
    
    res = matrix.withColumn('d', lev_dist_udf(F.col('EMAIL1'), F.col('EMAIL2')))
    

    With the tiny example you get

    res.show()
    +-------+--------------------+-------+--------------------+---+
    |    ID1|              EMAIL1|    ID2|              EMAIL2|  d|
    +-------+--------------------+-------+--------------------+---+
    |3527524|aamir.almalik@gma...| 200490|aamir.almalik@gma...|  1|
    |3527524|aamir.almalik@gma...|1906639|tahir.almalik@gma...|  2|
    |4287983|aamir.almalik@com...|3527524|aamir.almalik@gma...|  5|
    |4287983|aamir.almalik@com...| 200490|aamir.almalik@gma...|  6|
    |4287983|aamir.almalik@com...|1906639|tahir.almalik@gma...|  7|
    |1906639|tahir.almalik@gma...| 200490|aamir.almalik@gma...|  3|
    +-------+--------------------+-------+--------------------+---+
    

    Thanks for pointing out @cronoik

    No need for udf, should be something like this:

    from pyspark.sql.functions import levenshtein
    
    matrix = tinydata.select(F.col('ID').alias('ID1'), F.col('EMAIL').alias('EMAIL1')) \
        .crossJoin(tinydata.select(F.col('ID').alias('ID2'), F.col('EMAIL').alias('EMAIL2'))) \
        .filter(F.col('ID1') > F.col('ID2'))
    
    res = matrix.withColumn('d', levenshtein(F.col('EMAIL1'), F.col('EMAIL2')))