I'm using PySpark and I'm looking for a way to check:
For a given check_number = 01
if the value of the third element in my rdd1
do not contains check_number
==> get all information about this check_number from rdd2
..
Given:
rdd1 = sc.parallelize([(u'_guid_F361IeVTC8Q0kckDRw7iOJCe64ELpRmMKQgESgf-uEE=',
u'serviceXXX',
u'testAB_02',
u'2016-07-03')])
Supposing that the first element is an ID
, the second is a service name, the third is a test name, with an ID
, and the fourth element is a date.
rdd2 = sc.parallelize([(u'9b023b8233c242c09b93506942002e0a',
u'01',
u'2016-11-02'),
(u'XXXX52547412558933nnBlmquhdyhM',
u'02',
u'2016-11-04')])
Supposing that the first element is an ID, the second is a test id, and the last element is a date.
So, here I have in my rdd1
testAB_02
, which does not match with my check_number (so the service name must ends with check_number's value). My object if is to get all rows from rdd2
, with 01
as test id. The expected output here must be :
[(u'9b023b8233c242c09b93506942002e0a',
u'01',
u'2016-11-02')
This is my code:
def update_typesdecohorte_table(rdd1, rdd2):
if rdd1.filter(lambda x : (re.match('.*?' + check_number, x[2]))).isEmpty() is True:
new_rdd2 = rdd2.filter(lambda x : x[1] == check_number)
else:
pass
return new_rdd2
new_rdd2 = update_typesdecohorte_table(rdd1, rdd2)
Which gives:
[(u'9b023b8233c242c09b93506942002e0a', u'01', u'2016-11-02')]
This code works, but I don't like the method. What is the most efficient way to do this?
If you want to get all records from rdd2 that have no matching elements in rdd1 you can use cartesian
:
new_rdd2 = rdd1.cartesian(rdd2)
.filter(lambda r: not r[0][2].endswith(r[1][1]))
.map(lambda r: r[1])
If your check_number is fixed, at the end filter by this value:
new_rdd2.filter(lambda r: r[1] == check_number).collect()
But if your check_number is fixed and both RDDs are large it cen be even slower than yours solution as it needs shuffling over partitions during join (your code performs only non-shuffling transformations).