I have a simple spark job which reads from a Mongo collection:
SparkSession spark = SparkSession
.builder()
.appName("LuckyBetsFP")
.config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/exercises.movie_logs")
.getOrCreate();
Dataset<Row> items = spark
.read()
.format("mongodb")
.option("aggregation.pipeline", FP_PIPELINE)
.load();
and I have my pipeline:
public static final String MOVIES_PIPELINE =
"""
[
{
$group: {
_id: "$user_id"
movie_ids: {
$addToSet: "$movie_id"
}
}
},
{
$project: {
_id: 0,
user_id: "$_id",
movie_ids: 1
}
},
{
$sort: {
user_id: -1
}
}
]
""";
What I am trying to do get all users and an array of all movies watched by every user from my logs collection. Since my collection contains only 192 unique users, the output in Mongo Compass is 192 documents and I intentionally checked the output in mongosh
too with the same aggregation pipeline and the result is still the same. However, when I run the code in Spark, it reads a different number of records each item - mainly between 240-260 and I can't figure out what causes this.
For example, for a user_id = 4387
, Mongo Compass and mongosh return a document like this:
{
"movie_ids": [
56788,
45421
],
"user_id": 4387
}
but when I checked the dataset that Spark produces after it reads from Mongo, I see a duplicate for this specific id:
+--------------------+---------+
| movie_ids|user_id|
+--------------------+---------+
| [56788, 45421| 4387|
| [32233]| 4387|
|....................|.........|
The second record is completely off because the user has not seen the movie and Mongo doesn't return anything like this, I checked all 192 records.
What causes Spark to read the data so off? This happens across many jobs which I read from Mongo collections.
It turned out that, by default, spark uses a SamplePartitioner
:
.config("spark.mongodb.read.partitioner", "com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner")
What happens with it is that when reading from Mongo (and other DB sources for sure), Spark calculates how many partitions it will need for parallel reading from the collection based on the total size of the collection divided by the size of a partition. The default is 64MB:
partitioner.options.partition.size=64
So, as per the documenation, if the collection has 640 documents of 0.5MB each, the default SamplePartitioner
will create 5 partitions each responsible to read 128 documents. This obviously allows spark to distribute the workload, but in cases like mine where the collection has multiple records for the same user about interactions with different items, this setting may cause problems:
partitioner.options.partition.field
it is by default _id
, which creates ranges based on the _id
for each partition to read. However, as in my case, if I had only two partitions and each reads half of the documents in my collection, if I had 4 documents about movies watched by the same user, if the range of the first partition ends on the 2nd document inclusively, and the other partition starts from the 3rd, the documents about this user would be split across the partitions. Similar case would be if the 3 of the documents are somewhere in the ranges of the first partition, while the last one is somewhere in the ranges of the second partition.
This is a problem because when multiple partitions read data and pipelines are involved, each pipeline is applied to each partition after the data for its range is read, and then all data is aggregated, as opposed to combining all data and applying the pipeline afterwards. So this created my problem since documents for the same user were split across many partitions and created duplicates. It can easily be fixed if another partition field is used which separates the documents correctly and creates proper ranges, e.g.:
.config("spark.mongodb.read.partitioner.options.partition.field", "user_id")
Another approach is to use SinglePartitionPartitioner
, but if the collection's size is too large, it may cause issues with temporary storage or the network, and will probably be slower.