I'm trying to build item based collaborative filtering model with columnSimilarities() in spark. After using the columnsSimilarities() I want to assign the original column names back to the results in Spark scala.
Runnable code to calculate columnSimilarities() on data frame.
Data
// rdd
val rowsRdd: RDD[Row] = sc.parallelize(
Seq(
Row(2.0, 7.0, 1.0),
Row(3.5, 2.5, 0.0),
Row(7.0, 5.9, 0.0)
)
)
// Schema
val schema = new StructType()
.add(StructField("item_1", DoubleType, true))
.add(StructField("item_2", DoubleType, true))
.add(StructField("item_3", DoubleType, true))
// Data frame
val df = spark.createDataFrame(rowsRdd, schema)
Calculate columnSimilarities() on that data frame:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, CoordinateMatrix, RowMatrix}
val rows = new VectorAssembler().setInputCols(df.columns).setOutputCol("vs")
.transform(df)
.select("vs")
.rdd
val items_mllib_vector = rows.map(_.getAs[org.apache.spark.ml.linalg.Vector](0))
.map(org.apache.spark.mllib.linalg.Vectors.fromML)
val mat = new RowMatrix(items_mllib_vector)
val simsPerfect = mat.columnSimilarities()
simsPerfect.entries.collect.mkString(", ")
Output:
res0: String = MatrixEntry(0,2,0.24759378423606918), MatrixEntry(1,2,0.7376189553526812), MatrixEntry(0,1,0.8355316482961213)
I've to get the original names from columns instead of the position in that vector.
I tried to read the column names from df with:
val names = df.columns
and my idea was to match the names with the positions in that vector which should be in the same order, but I don't know how to attach the names back into that vector with the cosineSimilarities.
I'm happy for any advice!
Extract columns names (this is the tricky part here because it cannot be evaluated in the closure):
val names = df.columns
and map
the entries:
simsPerfect.entries.map {
case MatrixEntry(i, j, v) => (names(i.toInt), names(j.toInt), v)
}