I'm doing a recommandation system for movies, using the MovieLens datasets available here : http://grouplens.org/datasets/movielens/
To compute this recommandation system, I use the ML library of Flink in scala, and particulalrly the ALS algorithm (org.apache.flink.ml.recommendation.ALS
).
I first map the ratings of the movie into a DataSet[(Int, Int, Double)]
and then create a trainingSet
and a testSet
(see the code below).
My problem is that there is no bug when I'm using the ALS.fit
function with the whole dataset (all the ratings), but if I just remove only one rating, the fit function doesn't work anymore, and I don't understand why.
Do You have any ideas? :)
Code used :
Rating.scala
case class Rating(userId: Int, movieId: Int, rating: Double)
PreProcessing.scala
object PreProcessing {
def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
env.readCsvFile[(Int, Int, Double)](
ratingsPath, ignoreFirstLine = true,
includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}
Processing.scala
object Processing {
private val ratingsPath: String = "Path_to_ratings.csv"
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath)
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first(ratings.count().toInt)
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
.setTemporaryPath("/tmp/tmpALS")
val parameters = ParameterMap()
.add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
.add(ALS.Seed, 42L)
als.fit(trainingSet, parameters)
}
}
"But if I just remove only one rating"
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first((ratings.count()-1).toInt)
The error :
06/19/2015 15:00:24 CoGroup (CoGroup at org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:570))(4/4) switched to FAILED
java.lang.ArrayIndexOutOfBoundsException: 5
at org.apache.flink.ml.recommendation.ALS$BlockRating.apply(ALS.scala:358)
at org.apache.flink.ml.recommendation.ALS$$anon$111.coGroup(ALS.scala:635)
at org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)
...
The problem is the first
operator in combination with the setTemporaryPath
parameter of Flink's ALS
implementation. In order to understand the problem, let me quickly explain how the blocking ALS algorithm works.
The blocking implementation of alternating least squares first partitions the given ratings matrix user-wise and item-wise into blocks. For these blocks, routing information is calculated. This routing information says which user/item block receives which input from which item/user block, respectively. Afterwards, the ALS iteration is started.
Since Flink's underlying execution engine is a parallel streaming dataflow engine, it tries to execute as many parts of the dataflow as possible in a pipelined fashion. This requires to have all operators of the pipeline online at the same time. This has the advantage that Flink avoids to materialize intermediate results, which might be prohibitively large. The disadvantage is that the available memory has to be shared among all running operators. In the case of ALS where the size of the individual DataSet
elements (e.g. user/item blocks) is rather large, this is not desired.
In order to solve this problem, not all operators of the implementation are executed at the same time if you have set a temporaryPath
. The path defines where the intermediate results can be stored. Thus, if you've defined a temporary path, then ALS
first calculates the routing information for the user blocks and writes them to disk, then it calculates the routing information for the item blocks and writes them to disk and last but not least it starts ALS iteration for which it reads the routing information from the temporary path.
The calculation of the routing information for the user and item blocks both depend on the given ratings data set. In your case when you calculate the user routing information, then it will first read the ratings data set and apply the first
operator on it. The first
operator returns n
-arbitrary elements from the underlying data set. The problem right now is that Flink does not store the result of this first
operation for the calculation of the item routing information. Instead, when you start the calculation of the item routing information, Flink will re-execute the dataflow starting from its sources. This means that it reads the ratings data set from disk and applies the first
operator on it again. This will give you in many cases a different set of ratings compared to the result of the first first
operation. Therefore, the generated routing information are inconsistent and ALS
fails.
You can circumvent the problem by materializing the result of the first
operator and use this result as the input for the ALS
algorithm. The object FlinkMLTools
contains a method persist
which takes a DataSet
, writes it to the given path and then returns a new DataSet
which reads the just written DataSet
. This allows you to break up the resulting dataflow graph.
val firstTrainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.first((ratings.count()-1).toInt)
val trainingSet = FlinkMLTools.persist(firstTrainingSet, "/tmp/tmpALS/training")
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
.setTemporaryPath("/tmp/tmpALS/")
val parameters = ParameterMap()
.add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
.add(ALS.Seed, 42L)
als.fit(trainingSet, parameters)
Alternatively, you can try to leave the temporaryPath
unset. Then all steps (routing information calculation and als iteration) are executed in a pipelined fashion. This means that both the user and item routing information calculation use the same input data set which results from the first
operator.
The Flink community is currently working on keeping intermediate results of operators in the memory. This will allow to pin the result of the first
operator so that it won't be calculated twice and, thus, not giving differing results due to its non-deterministic nature.