javaapache-sparkrddmapper

RDD get type and index of each element


I am looking for ways to get type and index of each element in Java. For instance, suppose there is an RDD

['0,1,hi,1.0', '2,3,String,String2', '1.0,2.0,3,String']

Then I want

[(0, int),(1, int),(2, String),(3, Double),(0, int) ........]

So that I can reduce by keys and see the data types for each column. I achieved it in python, but not sure about how to do this in Java. Is there any way to do this? Here is how I did it in python

def infer_type(partition):
for row in partition:
    value = ""
    idx = 0
    for i in range(len(row)):
        if row[0] == self.prop.comment:
            break
        if row[i] == self.prop.delimiter or i == (len(row) - 1):
            if i == len(row) - 1:
                value += str(row[i])
            if bool(value.strip()) == False:
                yield (idx, 'None')
            elif int_regex_match.match(value):
                yield (idx, 'int')
            elif float_regex_match.match(value):
                yield (idx, 'float')
            else:
                if date_regex_match.match(value):
                    yield (idx, 'date')
                else:
                    yield (idx, 'str')
                idx += 1
                value = ""
        else:
            value += str(row[i])
    rdd = rdd.mapPartitions(infer_type).map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add).map(
    lambda x: (x[0][0], (x[0][1], x[1])))

EDIT : This is what I got upto now. However, I can't get iterator of tuple.

PairFlatMapFunction map = new PairFlatMapFunction<Iterator<String>, Integer, String>(){

        @Override
        public Iterator<Tuple2<Integer, String>> call(Iterator<String> iterator) throws Exception {
            // TODO Auto-generated method stub
            while(iterator.hasNext()) {
                String[] row = iterator.next().split(",");
                for(int j = 0; j<row.length;j++) {
                    if(row[j].matches(int_regex)) {
                        Tuple2<Integer, String> result =new Tuple2(j, "int");
                        // return iterator of result..?
                    }else if(row[j].matches(float_regex)) {
                        Tuple2<Integer, String> result =new Tuple2(j, "float");
                        // return iterator of result..?
                    }else if(row[j].matches(date_regex_match)) {
                        Tuple2<Integer, String> result =new Tuple2(j, "date");
                        // return iterator of result..?
                    }else {
                        Tuple2<Integer, String> result =new Tuple2(j, "str");
                        // return iterator of result..?
                    }
                }
            }
        }
 };
JavaPairRDD pair_rdd = rdd.mapPartitionsToPair(map, false);

Solution

  • According to the need you express, I cannot see why you use mapPartition instead of simply map. Also another mistake here is that you should be using flatMapToPair instead of mapToPair.

    To achieve what you want, your flatmap function needs to map a string (e.g. "0,1,hi,1.0") to an iterator of tuples. To do that, you can simply create an ArrayList of the results you compute :

    @Override
    public Iterator<Tuple2<Integer, String>> call(String row) throws Exception {
        String[] split_row = row.split(",");
        //create list
        List<Tuple2<Integer, String>> result = new ArrayList<>()
        for(int j = 0; j<split_row.length;j++) {
            if(split_row[j].matches(int_regex)) {
                result.add(new Tuple2(j, "int"));
            } //else ...
        }
        //return the iterator
        return result.iterator();
    }
    

    In case you actually need to use mapPartition, you can apply the same logic to your function.