pythonapache-sparkpysparkrdd

Problem with pyspark mapping - Index out of range after split


When trying to map our 6 column pyspark RDD into a 4d-tuple we get a list out of range error for any list element besides 0 which return the normal result.

The dataset is structured like this:

X,Y,FID,DIVISION,LOCATION,PREC

-118.289241553,33.7576608970001,1,HARBOR,2175 JOHN S. GIBSON BLVD.,5
-118.275394206,33.9386273800001,2,SOUTHEAST,145 W. 108TH ST.,18
-118.277669655,33.9703073800001,3,77TH STREET,7600 S. BROADWAY,12
-118.419841576,33.9916553210001,4,PACIFIC,12312 CULVER BLVD.,14
-118.305141563,34.0105753400001,5,SOUTHWEST,1546 MARTIN LUTHER KING JR. BLVD.,3
-118.256118891,34.012355905,6,NEWTON,3400 S. CENTRAL AVE.,13
-118.247294123,34.0440195,7,CENTRAL,251 E. 6TH ST.,1
-118.450779541,34.0437774120001,8,WEST LOS ANGELES,1663 BUTLER AVE.,8
-118.213067956,34.045008769,9,HOLLENBECK,2111 E. 1ST ST.,4
-118.342829525,34.046747682,10,WILSHIRE,4861 VENICE BLVD.,7
-118.291175911,34.050208529,11,OLYMPIC,1130 S. VERMONT AVE.,20
-118.266979649,34.056690437,12,RAMPART,1401 W. 6TH ST.,2
-118.33066931,34.095833225,13,HOLLYWOOD,1358 N. WILCOX AVE.,6
-118.249414484,34.119200666,14,NORTHEAST,3353 SAN FERNANDO RD.,11
-118.385859348,34.1716939300001,15,NORTH HOLLYWOOD,11640 BURBANK BLVD.,15
-118.445225709,34.1837432730001,16,VAN NUYS, 6240 SYLMAR AVE.,9
-118.547454438,34.193397227,17,WEST VALLEY,19020 VANOWEN ST.,10
-118.599636542,34.221376654,18,TOPANGA,21501 SCHOENBORN ST.,21
-118.410417183,34.2530912220001,19,FOOTHILL,12760 OSBORNE ST.,16
-118.531373363,34.256969059,20,DEVONSHIRE,10250 ETIWANDA AVE.,17
-118.468197808,34.272979397,21,MISSION,11121 N. SEPULVEDA BLVD.,19
rdd3 = sc.textFile('hdfs://path/data.csv')

header3 = rdd3.first()
rdd3 = rdd3.filter(lambda line: line!=header3)\
       .map(lambda row: row.split(","))\
       .map(lambda row: (row[0],row[1],row[3],row[5])) \
       .collect()

For example if we only keep row[0] there is no error but if we keep row[1] it throws a list out of range exception. It's like the row.split(",") does not return a 6 element list, which it should.

This is the exception Exception

Any ideas?


Solution

  • It looks like the problem you are trying to solve is better solved with dataframes interface:

    df = spark_session.read.csv('hdfs://path/data.csv', header=True)
    
    res = df.select('X', 'Y', 'DIVISION', 'PREC').collect()
    

    EDIT: If using rdds is mandatory you should ignore the blank line after the header line.