I am looking for ways to get type and index of each element in Java. For instance, suppose there is an RDD
['0,1,hi,1.0', '2,3,String,String2', '1.0,2.0,3,String']
Then I want
[(0, int),(1, int),(2, String),(3, Double),(0, int) ........]
So that I can reduce by keys and see the data types for each column. I achieved it in python, but not sure about how to do this in Java. Is there any way to do this? Here is how I did it in python
def infer_type(partition):
for row in partition:
value = ""
idx = 0
for i in range(len(row)):
if row[0] == self.prop.comment:
break
if row[i] == self.prop.delimiter or i == (len(row) - 1):
if i == len(row) - 1:
value += str(row[i])
if bool(value.strip()) == False:
yield (idx, 'None')
elif int_regex_match.match(value):
yield (idx, 'int')
elif float_regex_match.match(value):
yield (idx, 'float')
else:
if date_regex_match.match(value):
yield (idx, 'date')
else:
yield (idx, 'str')
idx += 1
value = ""
else:
value += str(row[i])
rdd = rdd.mapPartitions(infer_type).map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add).map(
lambda x: (x[0][0], (x[0][1], x[1])))
EDIT : This is what I got upto now. However, I can't get iterator of tuple.
PairFlatMapFunction map = new PairFlatMapFunction<Iterator<String>, Integer, String>(){
@Override
public Iterator<Tuple2<Integer, String>> call(Iterator<String> iterator) throws Exception {
// TODO Auto-generated method stub
while(iterator.hasNext()) {
String[] row = iterator.next().split(",");
for(int j = 0; j<row.length;j++) {
if(row[j].matches(int_regex)) {
Tuple2<Integer, String> result =new Tuple2(j, "int");
// return iterator of result..?
}else if(row[j].matches(float_regex)) {
Tuple2<Integer, String> result =new Tuple2(j, "float");
// return iterator of result..?
}else if(row[j].matches(date_regex_match)) {
Tuple2<Integer, String> result =new Tuple2(j, "date");
// return iterator of result..?
}else {
Tuple2<Integer, String> result =new Tuple2(j, "str");
// return iterator of result..?
}
}
}
}
};
JavaPairRDD pair_rdd = rdd.mapPartitionsToPair(map, false);
According to the need you express, I cannot see why you use mapPartition
instead of simply map
. Also another mistake here is that you should be using flatMapToPair
instead of mapToPair
.
To achieve what you want, your flatmap
function needs to map a string (e.g. "0,1,hi,1.0"
) to an iterator of tuples. To do that, you can simply create an ArrayList
of the results you compute :
@Override
public Iterator<Tuple2<Integer, String>> call(String row) throws Exception {
String[] split_row = row.split(",");
//create list
List<Tuple2<Integer, String>> result = new ArrayList<>()
for(int j = 0; j<split_row.length;j++) {
if(split_row[j].matches(int_regex)) {
result.add(new Tuple2(j, "int"));
} //else ...
}
//return the iterator
return result.iterator();
}
In case you actually need to use mapPartition, you can apply the same logic to your function.