Hi I am trying to understand where is the problem in my code:
rdd_avg=sc.parallelize([("a",10),("b",15),("c",20),("a",8)])
rdd_sp1=rdd_avg.map(lambda x: (x,1))
rdd_sp1.collect()
rdd_sp2=rdd_sp1.reduceByKey(lambda a,b:(a[0]+b[0],a[1]+b[1]))
rdd_sp2.collect()
The output I am getting is
[(('c', 20), 1), (('a', 10), 1), (('b', 15), 1), (('a', 8), 1)]
But I am trying to get.
[(('c', 20), 1), (('a', 18), 2), (('b', 15), 1)]
Your problem is that the first element of the tuple must be the key
, and the second, the value
. In your example, when you do the first map
(rdd_avg.map(lambda x: (x,1))
), you are using as key the hole tuple. So these are your keys: ('c', 20)
, ('a', 10)
, ('b', 15)
, and ('a', 8)
. Note that the key ('a', 10)
is different from the key ('a', 8)
. Your keys should be 'a'
, 'b'
and 'c'
.
This is the code fixed:
>>> rdd = sc.parallelize([("a",10), ("b",15), ("c",20),("a",8)])
>>> # the first element of the tuple is x[0], not x
>>> rdd_sp1 = rdd.map(lambda x: (x[0], (x[1], 1)))
>>> rdd_sp1.collect()
[('a', (10, 1)), ('b', (15, 1)), ('c', (20, 1)), ('a', (8, 1))]
Note how now the first element of the tuple is the key, not a tuple
>>> rdd_sp2 = rdd_sp1.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
>>> rdd_sp2.collect()
[('b', (15, 1)), ('c', (20, 1)), ('a', (18, 2))]
With the keys issue fixed, we can effectively reduce by key
>>> rdd_avg = rdd_sp2.map(lambda x: (x[0], x[1][0] / x[1][1]))
>>> rdd_avg.collect()
[('b', 15.0), ('c', 20.0), ('a', 9.0)]