apache-sparkmapreducepysparkapache-spark-sqlrdd

Filter RDD by values PySpark


I'm using PySpark and I'm looking for a way to check:

For a given check_number = 01

if the value of the third element in my rdd1 do not contains check_number ==> get all information about this check_number from rdd2..

Given:

rdd1 = sc.parallelize([(u'_guid_F361IeVTC8Q0kckDRw7iOJCe64ELpRmMKQgESgf-uEE=',
                        u'serviceXXX',
                        u'testAB_02',
                        u'2016-07-03')])

Supposing that the first element is an ID, the second is a service name, the third is a test name, with an ID, and the fourth element is a date.

rdd2 = sc.parallelize([(u'9b023b8233c242c09b93506942002e0a',
                        u'01',
                        u'2016-11-02'),

                       (u'XXXX52547412558933nnBlmquhdyhM',
                        u'02',
                        u'2016-11-04')])

Supposing that the first element is an ID, the second is a test id, and the last element is a date.

So, here I have in my rdd1 testAB_02, which does not match with my check_number (so the service name must ends with check_number's value). My object if is to get all rows from rdd2, with 01 as test id. The expected output here must be :

[(u'9b023b8233c242c09b93506942002e0a',
  u'01',
  u'2016-11-02')

This is my code:

def update_typesdecohorte_table(rdd1, rdd2):

    if rdd1.filter(lambda x : (re.match('.*?' + check_number, x[2]))).isEmpty() is True:

        new_rdd2 = rdd2.filter(lambda x : x[1] == check_number)

    else:

         pass

    return new_rdd2

new_rdd2 = update_typesdecohorte_table(rdd1, rdd2)

Which gives:

[(u'9b023b8233c242c09b93506942002e0a', u'01', u'2016-11-02')]

This code works, but I don't like the method. What is the most efficient way to do this?


Solution

  • If you want to get all records from rdd2 that have no matching elements in rdd1 you can use cartesian:

    new_rdd2 = rdd1.cartesian(rdd2)
        .filter(lambda r: not r[0][2].endswith(r[1][1]))
        .map(lambda r: r[1])
    

    If your check_number is fixed, at the end filter by this value:

    new_rdd2.filter(lambda r: r[1] == check_number).collect()
    

    But if your check_number is fixed and both RDDs are large it cen be even slower than yours solution as it needs shuffling over partitions during join (your code performs only non-shuffling transformations).