pythonpysparkrdd

Pyspark RDD ReducebyKey()


Hi I am trying to understand where is the problem in my code:

rdd_avg=sc.parallelize([("a",10),("b",15),("c",20),("a",8)])
rdd_sp1=rdd_avg.map(lambda x: (x,1))
rdd_sp1.collect()
rdd_sp2=rdd_sp1.reduceByKey(lambda a,b:(a[0]+b[0],a[1]+b[1]))
rdd_sp2.collect()

The output I am getting is

[(('c', 20), 1), (('a', 10), 1), (('b', 15), 1), (('a', 8), 1)]

But I am trying to get.

[(('c', 20), 1), (('a', 18), 2), (('b', 15), 1)]


Solution

  • Your problem is that the first element of the tuple must be the key, and the second, the value. In your example, when you do the first map (rdd_avg.map(lambda x: (x,1))), you are using as key the hole tuple. So these are your keys: ('c', 20), ('a', 10), ('b', 15), and ('a', 8). Note that the key ('a', 10) is different from the key ('a', 8). Your keys should be 'a', 'b' and 'c'.

    This is the code fixed:

    >>> rdd = sc.parallelize([("a",10), ("b",15), ("c",20),("a",8)])
    >>> # the first element of the tuple is x[0], not x
    >>> rdd_sp1 = rdd.map(lambda x: (x[0], (x[1], 1)))
    >>> rdd_sp1.collect()
    [('a', (10, 1)), ('b', (15, 1)), ('c', (20, 1)), ('a', (8, 1))]
    

    Note how now the first element of the tuple is the key, not a tuple

    >>> rdd_sp2 = rdd_sp1.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
    >>> rdd_sp2.collect()
    [('b', (15, 1)), ('c', (20, 1)), ('a', (18, 2))]
    

    With the keys issue fixed, we can effectively reduce by key

    >>> rdd_avg = rdd_sp2.map(lambda x: (x[0], x[1][0] / x[1][1]))
    >>> rdd_avg.collect()
    [('b', 15.0), ('c', 20.0), ('a', 9.0)]