I had to apply Levenshtein Function on last column when passport and country are same.
matrix = passport_heck.select(\
f.col('name_id').alias('name_id_1'),
f.col('last').alias('last_1'),
f.col('country').alias('country_1'),
f.col('passport').alias('passport_1')) \
.crossJoin(passport_heck.select(\
f.col('name_id').alias('name_id_2'),
f.col('last').alias('last_2'),
f.col('country').alias('country_2'),
f.col('passport').alias('passport_2')))\
.filter((f.col('passport_1') == f.col('passport_2')) & (f.col('country_1') == f.col('country_2')))```
res = matrix.withColumn('distance', levenshtein(f.col('last_1'), f.col('last_2')))
Now I am getting the following output which is totally fine.
Now I need to delete duplicates pair (example ID 558635 with 1106562 then 1106562 with 558635 comparing same content).
Can anyone please give me some logic in pyspark to get below table.
Your problem can become quite complicated if you want to get it right, but here you have some sample code in pyspark that hopefully gets you started.
First a tiny dataset,
tinydata = sqlContext.createDataFrame(
[
(3527524, 'aamir', 'al malik', 'aamir.almalik@gmail.com'),
(4287983, 'aamir', 'al malik', 'aamir.almalik@company.com'),
(200490, 'aamir', 'al malik', 'aamir.almalik@gmail.come'),
(1906639, 'tahir', 'al malik', 'tahir.almalik@gmail.com')
],
['ID', 'first_NAME', 'last_NAME', 'EMAIL']
)
Then you convert it to a matrix of differences through a cross-join
. Note that if you have 5 million, this will become huge. You need to avoid comparisons as much as possible, such as following some of the comments to your question, and other ideas you may come up with. Note the final filter is to avoid comparing 2 rows twice.
matrix = tinydata.select(F.col('ID').alias('ID1'), F.col('EMAIL').alias('EMAIL1')) \
.crossJoin(tinydata.select(F.col('ID').alias('ID2'), F.col('EMAIL').alias('EMAIL2'))) \
.filter(F.col('ID1') > F.col('ID2'))
After that, you can calculate distances.
def lev_dist(left, right):
return Levenshtein.distance(left, right)
lev_dist_udf = udf(lev_dist, IntegerType())
res = matrix.withColumn('d', lev_dist_udf(F.col('EMAIL1'), F.col('EMAIL2')))
With the tiny example you get
res.show()
+-------+--------------------+-------+--------------------+---+
| ID1| EMAIL1| ID2| EMAIL2| d|
+-------+--------------------+-------+--------------------+---+
|3527524|aamir.almalik@gma...| 200490|aamir.almalik@gma...| 1|
|3527524|aamir.almalik@gma...|1906639|tahir.almalik@gma...| 2|
|4287983|aamir.almalik@com...|3527524|aamir.almalik@gma...| 5|
|4287983|aamir.almalik@com...| 200490|aamir.almalik@gma...| 6|
|4287983|aamir.almalik@com...|1906639|tahir.almalik@gma...| 7|
|1906639|tahir.almalik@gma...| 200490|aamir.almalik@gma...| 3|
+-------+--------------------+-------+--------------------+---+
Thanks for pointing out @cronoik
No need for udf, should be something like this:
from pyspark.sql.functions import levenshtein
matrix = tinydata.select(F.col('ID').alias('ID1'), F.col('EMAIL').alias('EMAIL1')) \
.crossJoin(tinydata.select(F.col('ID').alias('ID2'), F.col('EMAIL').alias('EMAIL2'))) \
.filter(F.col('ID1') > F.col('ID2'))
res = matrix.withColumn('d', levenshtein(F.col('EMAIL1'), F.col('EMAIL2')))