pythonsqlpysparkapache-spark-sql

Fuzzy Join on Venue Names Based on City


I am working with PySpark and need to join two datasets based on the city and a fuzzy matching condition on the venue names. The first dataset contains information about stadiums including a unique venue_id, while the second dataset, which I receive periodically, only includes venue names and cities without the venue_id.

I want to join these datasets to match the venue_name from the incoming data to the existing dataset using fuzzy logic (since the names are not always written identically), and then pull the corresponding venue_id.

Existing Dataset (df_stadium_information):

venue_name city venue_id
Sree Kanteerava Stadium Bengaluru 1
Sree Kanteerava Stadium Kochi 2
Eden Gardens Kolkata 3
Narendra Modi Stadium Ahmedabad 4

Incoming Data (df_new_stadium_data):

venue_name city
Sri Kanteerava Indoor Stadium Bengaluru
Eden Gardens Kolkata

Desired Output:

venue_name city venue_id
Sri Kanteerava Indoor Stadium Bengaluru 1
Eden Gardens Kolkata null

I want the output to show the venue_id from df_stadium_information if there is a fuzzy match on venue_name and an exact match on city. If there's no fuzzy match, the venue_id should be null.


Solution

  • Your desired example is incorrect as there is a match of Eden Gardens in Kolkata with a venue_id of 3.

    First we create our dummy tables.

    %sql
    CREATE OR REPLACE TABLE ref_tbl (
        venue_name STRING,
        city STRING,
        venue_id INT
      );
    
    INSERT INTO ref_tbl VALUES
    ('Sree Kanteerava Stadium','Bengaluru',1),
    ('Sree Kanteerava Stads','Bengaluru',2),
    ('Sree Kanteerava Stadium','Kochi',3),
    ('Eden Gardens','Kolkata',4),
    ('Narendra Modi Stadium','Ahmedabad',5),
    ('Sree Kanteerava Stadium','Bangalore',6);
    
    CREATE OR REPLACE TABLE IN_TABLE (
        venue_name STRING,
        city STRING
      );
    
    INSERT INTO IN_TABLE VALUES
    ('Sri Kanteerava Indoor Stadium','Kolkata'),
    ('Eden Garden','Kolkata'),
    ('Sri Kanteerava Stadium','Bengaluru');
    

    Install the python package called rapidfuzz

    pip install rapidfuzz
    

    Then register it as a spark udf function

    from rapidfuzz import fuzz
    from pyspark.sql.functions import udf
    import pyspark.sql.functions as F
    from pyspark.sql.types import StringType,IntegerType,DoubleType
        
    def fuzzyrapid(s1, s2):
        return (fuzz.token_sort_ratio(s1,s2))
    
    spark.udf.register("fuzztest",  fuzzyrapid,DoubleType())
    

    Select the distinct combinations of venue name and city.

    %sql
    -- Select distinct cities from the input file/table
    CREATE OR REPLACE TEMPORARY VIEW v_distinct_input_names 
    AS 
    SELECT DISTINCT venue_name as input_name,city FROM IN_TABLE;
    
    SELECT * FROM v_distinct_input_names;
    

    Do a crossjoin so you get all combinations with the same city

    %sql
    -- cross join the distinct cities with the lookup table 
    CREATE OR REPLACE TEMPORARY VIEW v_crossjoin 
    AS 
    SELECT A.venue_name as ref_name,  A.city as city, Input_name FROM ref_tbl A CROSS JOIN v_distinct_input_names B where A.city = B.city;
    

    Now execute the custom spark udf to get the similarity score and do a windowing function to get the best score for a single combination

    %sql
    CREATE OR REPLACE TEMPORARY VIEW vw_fuzzy1 AS
    SELECT ref_name, Input_name, city, fuzztest(ref_name,Input_name) AS similarity_score FROM v_crossjoin;
    
    CREATE OR REPLACE TEMPORARY VIEW v_fuzzy2 AS 
     SELECT ref_name, Input_name, city, similarity_score
     FROM
     (
         SELECT ROW_NUMBER() OVER (PARTITION BY input_name ORDER BY similarity_score DESC) as RowNum, *
         FROM vw_fuzzy1
     ) 
     WHERE RowNum = 1;
    

    Now you can do a join to get the venue id. You need to define a score threshold for your usecase. In my case I've used 70.

    %sql
    SELECT input_name, B.venue_name, A.city,
    case
    when similarity_score > 70
      then venue_id
    else
      null
    end as venue_id
    from v_fuzzy2 A left join ref_tbl B on A.ref_name = B.venue_name AND A.city=B.city;
    

    Your result will be enter image description here