I am working with PySpark and need to join two datasets based on the city and a fuzzy matching condition on the venue names. The first dataset contains information about stadiums including a unique venue_id, while the second dataset, which I receive periodically, only includes venue names and cities without the venue_id.
I want to join these datasets to match the venue_name from the incoming data to the existing dataset using fuzzy logic (since the names are not always written identically), and then pull the corresponding venue_id.
Existing Dataset (df_stadium_information):
venue_name | city | venue_id |
---|---|---|
Sree Kanteerava Stadium | Bengaluru | 1 |
Sree Kanteerava Stadium | Kochi | 2 |
Eden Gardens | Kolkata | 3 |
Narendra Modi Stadium | Ahmedabad | 4 |
Incoming Data (df_new_stadium_data):
venue_name | city |
---|---|
Sri Kanteerava Indoor Stadium | Bengaluru |
Eden Gardens | Kolkata |
Desired Output:
venue_name | city | venue_id |
---|---|---|
Sri Kanteerava Indoor Stadium | Bengaluru | 1 |
Eden Gardens | Kolkata | null |
I want the output to show the venue_id from df_stadium_information if there is a fuzzy match on venue_name and an exact match on city. If there's no fuzzy match, the venue_id should be null.
Your desired example is incorrect as there is a match of Eden Gardens in Kolkata with a venue_id of 3.
First we create our dummy tables.
%sql
CREATE OR REPLACE TABLE ref_tbl (
venue_name STRING,
city STRING,
venue_id INT
);
INSERT INTO ref_tbl VALUES
('Sree Kanteerava Stadium','Bengaluru',1),
('Sree Kanteerava Stads','Bengaluru',2),
('Sree Kanteerava Stadium','Kochi',3),
('Eden Gardens','Kolkata',4),
('Narendra Modi Stadium','Ahmedabad',5),
('Sree Kanteerava Stadium','Bangalore',6);
CREATE OR REPLACE TABLE IN_TABLE (
venue_name STRING,
city STRING
);
INSERT INTO IN_TABLE VALUES
('Sri Kanteerava Indoor Stadium','Kolkata'),
('Eden Garden','Kolkata'),
('Sri Kanteerava Stadium','Bengaluru');
Install the python package called rapidfuzz
pip install rapidfuzz
Then register it as a spark udf function
from rapidfuzz import fuzz
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from pyspark.sql.types import StringType,IntegerType,DoubleType
def fuzzyrapid(s1, s2):
return (fuzz.token_sort_ratio(s1,s2))
spark.udf.register("fuzztest", fuzzyrapid,DoubleType())
Select the distinct combinations of venue name and city.
%sql
-- Select distinct cities from the input file/table
CREATE OR REPLACE TEMPORARY VIEW v_distinct_input_names
AS
SELECT DISTINCT venue_name as input_name,city FROM IN_TABLE;
SELECT * FROM v_distinct_input_names;
Do a crossjoin so you get all combinations with the same city
%sql
-- cross join the distinct cities with the lookup table
CREATE OR REPLACE TEMPORARY VIEW v_crossjoin
AS
SELECT A.venue_name as ref_name, A.city as city, Input_name FROM ref_tbl A CROSS JOIN v_distinct_input_names B where A.city = B.city;
Now execute the custom spark udf to get the similarity score and do a windowing function to get the best score for a single combination
%sql
CREATE OR REPLACE TEMPORARY VIEW vw_fuzzy1 AS
SELECT ref_name, Input_name, city, fuzztest(ref_name,Input_name) AS similarity_score FROM v_crossjoin;
CREATE OR REPLACE TEMPORARY VIEW v_fuzzy2 AS
SELECT ref_name, Input_name, city, similarity_score
FROM
(
SELECT ROW_NUMBER() OVER (PARTITION BY input_name ORDER BY similarity_score DESC) as RowNum, *
FROM vw_fuzzy1
)
WHERE RowNum = 1;
Now you can do a join to get the venue id. You need to define a score threshold for your usecase. In my case I've used 70.
%sql
SELECT input_name, B.venue_name, A.city,
case
when similarity_score > 70
then venue_id
else
null
end as venue_id
from v_fuzzy2 A left join ref_tbl B on A.ref_name = B.venue_name AND A.city=B.city;