In trying to evaluate Spark to re-use our existing custom input formats from the mapreduce era I have come across a Java generics problem.
public abstract class AbstractInputFormat<K extends Message, V> extends FileInputFormat<ProtobufWritable<K>, V>
import com.example.MyProto; // this extends Message
public class MyInputFormat extends AbstractInputFormat<MyProto, Text>
SparkConf conf = new SparkConf().setAppName("Test");
SparkContext sc = new SparkContext(conf);
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
JavaPairRDD myRdd = jsc.newAPIHadoopFile(logFile, MyInputFormat.class, ProtobufWritable.class, Text.class,
The above leads to the following error at myRdd
Bound mismatch: The generic method newAPIHadoopFile(String, Class<F>, Class<K>, Class<V>, Configuration) of type JavaSparkContext is not applicable for the arguments (String, Class<MyInputFormat>, Class<ProtobufWritable>, Class<Text>, Configuration). The inferred type MyInputFormat is not a valid substitute for the bounded parameter <F extends InputFormat<K,V>>
Not sure what is happening. It does seem to me that I am satisfying the bounds ? I cannot spot the problem ?
This is the scala code which is being invoked.
The following changes worked for me
public class MyInputFormat<K extends Message> extends AbstractInputFormat<MyProto, Text>
public abstract class AbstractInputFormat<K extends Message, V> extends FileInputFormat<ProtobufWritable<K>, V>