I am given a rdd. Example: test = sc.parallelize([(1,0), (2,0), (3,0)])
I need to get the Cartesian product and remove resulting tuple pairs that have duplicate entries. In this toy example these would be ((1, 0), (1, 0)), ((2, 0), (2, 0)), ((3, 0), (3, 0)).
I can get the Cartesian product as follows: NOTE The collect and print statements are there ONLY for troubleshooting.
def compute_cartesian(rdd):
result1 = sc.parallelize(sorted(rdd.cartesian(rdd).collect()))
print(type(result1))
print(result1.collect())
My type and output at this stage are correct:
<class 'pyspark.rdd.RDD'>
[((1, 0), (1, 0)), ((1, 0), (2, 0)), ((1, 0), (3, 0)), ((2, 0), (1, 0)), ((2, 0), (2, 0)), ((2, 0), (3, 0)), ((3, 0), (1, 0)), ((3, 0), (2, 0)), ((3, 0), (3, 0))]
But now I need to remove the three pairs of tuples with duplicate entries.
Tried so far:
Without an RDD this task is easy.
# Remove duplicates
for elem in result:
if elem[0] == elem[1]:
result.remove(elem)
print(result)
print("After: ", len(result))
This was a function I wrote that removes duplicate tuple pairs and then spits out the resulting len so I could do a sanity check.
I am just not sure how to directly perform actions on the RDD, in this case remove any duplicate tuple pairs resulting from the Cartesian product, and return an RDD.
Yes, I can .collect() it, perform the operation, and then re-type it as an RDD, but that defeats the purpose. Suppose this was billions of pairs. I need to perform the operations on the rdd and return an rdd.
You can use filter
to remove the pairs that you don't want:
dd.cartesian(rdd).filter(lambda x: x[0] != x[1])
Note that I would not call those pairs "duplicate pairs", but rather "pairs of duplicates" or even better, "diagonal pairs": they correspond to the diagonal if you visualize the Cartesian product geometrically.
This is why distinct
and dropDuplicates
are not appropriate here: they remove duplicates, which is not what you want. For instance, [1,1,2].distinct()
is [1,2]
.