I am having some issues while computing some cosine similarities for a product recommender. I have an article database containing 40k articles each of them with their description. I am trying to calculate the cosine similarity matrix for this elements so that when any article is given the top N most similar according to the description can be retrieved.
I am developing this in Python and the code should be run in a Microsoft Azure Function. As you can imagine already given the size of the initial df this cannot be run on this platform (It runs out of memory as the cosine similarity that is outputted is more 8 GB in size alone).
With this is mind I have decided to take another approach, which is using Spark, more accurately, a Spark Job definition from Microsoft Fabric. I have been reviewing similar questions (as my Pyspark knowledge is extremely limited) like this one: Calculating cosine similarity in Pyspark Dataframe, but I'm struggling to make the code work in my situation.
Trying to tackle problems one by one I have realized my code doesn't even get to the cosine similarity calculation (or at least not all of it) as my spark job gets frozen when attempting the crossjoin of matrices and I am unable to check what the exact issue is from the stderr logs.
I know my starting df of 40k rows is quite big and that this implies a 40k x 40k cos sim matrix but I tried this operation with plain python on a Google Collab notebook and it was able to calculate it in around 10 mins (It is true that I had to use the TPU backend as the normal CPU one also crashes after consuming the standard memory -12GB-). At this point I'm thinking either:
-MS Fabric SparkJobs are also not a valid tool for my needs here as they offer a "weak/capped" version of spark.
-My Pyspark code clearly has something that is incorrect and causing the Spark executor to be decommissioned wrongly.
I need some help finding in which scenario I am (and if on the second how to improve the code).
Let me share some code. This is the Python code that for the same task actually works on a TPU Collab notebook (using sklearn imports):
articulos = articulos[['article_id','product_code', 'detail_desc']]
articulos_unicos = articulos.drop_duplicates(subset=['product_code'])
articulos_final = articulos_unicos.dropna(subset=['detail_desc'])
articulos_final = articulos_final.reset_index(drop=True)
count = CountVectorizer(stop_words='english')
count_matrix = count.fit_transform(articulos_final['detail_desc'])
count_matrix = count_matrix.astype(np.float32)
similitud_coseno = cosine_similarity(count_matrix, count_matrix)
np.fill_diagonal(similitud_coseno, -1)
top_5_simart = []
for i in range(similitud_coseno.shape[0]):
top_indices = np.argpartition(similitud_coseno[i], -5)[-5:]
sorted_top_indices = top_indices[np.argsort(-similitud_coseno[i, top_indices])]
top_5_simart.append(sorted_top_indices.tolist())
with open('top_5_simart.json', 'w') as f:
json.dump(top_5_simart, f)
On the other hand this is the Pyspark code I am trying to implement:
# Selecting the necessary columns
articulos = articulos.select("article_id", "product_code", "detail_desc")
# Removing duplicates
articulos = articulos.dropDuplicates(["product_code"]).dropna(subset=["detail_desc"])
# Tokenizing the description text
tokenizer = Tokenizer(inputCol="detail_desc", outputCol="words")
articulos_tokenized = tokenizer.transform(articulos)
# Removing stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
articulos_clean = remover.transform(articulos_tokenized)
# Generating the feature column with CountVectorizer
vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="features")
vectorizer_model = vectorizer.fit(articulos_clean)
articulos_final = vectorizer_model.transform(articulos_clean)
articulos_final = articulos_final.select("article_id","features")
print(articulos_final.dtypes)
print(articulos_final.schema)
# Using crossJoin to obatain all the pairwise values for the cosine sim
articulos_final2 = articulos_final.withColumnRenamed("article_id","article_id2").withColumnRenamed("features","features2")
articulos_final_cos_sim = articulos_final.crossJoin(articulos_final2)
articulos_final.unpersist()
articulos_final2.unpersist()
articulos_final_cos_sim.write.mode('overwrite').format('delta').save(cspath)
'''
# Realizar el crossJoin para obtener todas las combinaciones de pares de filas
articulos_final_cos_sim = (
articulos_final.alias("a")
.crossJoin(articulos_final.alias("b"))
.withColumn(
"dot_product",
F.col("a.features").dot(F.col("b.features")) # Producto punto de los vectores
)
.withColumn(
"norm_a",
F.expr("a.features.norm(2)") # Norma L2 del vector 'a'
)
.withColumn(
"norm_b",
F.expr("b.features.norm(2)") # Norma L2 del vector 'b'
)
.withColumn(
"cosine_similarity",
F.col("dot_product") / (F.col("norm_a") * F.col("norm_b")) # Similitud coseno
)
)
# Eliminar las columnas innecesarias
articulos_final_cos_sim = articulos_final_cos_sim.drop("dot_product", "norm_a", "norm_b")
# Agrupar para construir la matriz de similitud
articulos_final_cos_sim = articulos_final_cos_sim.groupBy("a.article_id").pivot("b.article_id").sum("cosine_similarity")
# Guardar los resultados
articulos_final_cos_sim.write.mode('overwrite').format('delta').save(testpath2)
# Filtrar para obtener los 5 artículos más similares para cada uno
windowSpec = Window.partitionBy("article_id").orderBy(F.col("cosine_similarity").desc())
top_5_simart = articulos_final_cos_sim.withColumn("rank", F.row_number().over(windowSpec)).filter(F.col("rank") <= 5)
# Guardar los resultados en formato JSON
top_5_simart.write.mode('overwrite').json(Top5SimartPath)
'''
As you can see the last part of the code is commented since (after a lot of attempts) this code didn't give me any errors that made the execution fail, it simply took forever to execute. The active part of the code only does the cross join but doesn't do any of the calculations yes, as I mentioned it is stuck forever.
If needed I can also provide the stderror logs of Spark.
The cross join gives you 1,60,00,00,000
rows
and you doing group by on it, which does many shuffling and takes more time and memory.
So, my idea is to do partition before join and use more number of nodes/workers and cores.
Here is the sample code i used in my spark environment with 40k random data rows.
dot_udf = F.udf(lambda v1, v2: float(v1.dot(v2)), DoubleType())
# Define UDF to compute the L2 norm of a vector
norm_udf = F.udf(lambda v: float(v.norm(2)), DoubleType())
a = articulos_final.repartition(40000,"article_id")
b = articulos_final.repartition(40000,"article_id")
articulos_final_cos_sim = (
a.alias("a")
.crossJoin(b.alias("b"))
.withColumn("dot_product", dot_udf(F.col("a.features"), F.col("b.features"))) # Dot product
.withColumn("norm_a", norm_udf(F.col("a.features"))) # L2 norm of vector 'a'
.withColumn("norm_b", norm_udf(F.col("b.features"))) # L2 norm of vector 'b'
.withColumn(
"cosine_similarity",
F.col("dot_product") / (F.col("norm_a") * F.col("norm_b")) # Cosine similarity
)
)
final1 = articulos_final_cos_sim.groupBy("a.article_id").pivot("b.article_id").sum("cosine_similarity")
final1.select("article_id","39848").sort("39848",ascending=False).show(5)
Here, for the article 39848
the top 5 similar documents is calculated, which took around 29 mins to run, having 10 workers, 16 cores in each total 160 cores.
Output:
+----------+------------------+
|article_id| 39848|
+----------+------------------+
| 39848| 1.0|
| 20088|0.3333333333333333|
| 6257| 0.3125|
| 21005|0.2886751345948129|
| 29086|0.2886751345948129|
+----------+------------------+
only showing top 5 rows
and
In your fabric environment increase the compute size and run above code which is repartitioning to 40000
. You adjust this number according to your compute size and number of cores.
It can be 160
also, for each core one partition task.
Also, do persist if you do operations frequently.