I currently have a mapReduce program that send data to hdfs with different file name.So in my reducer I am using MultipleOutputs to write to different files in HDFS (Full Reducer code below).
I would like to test my code using mrunit and below is my test method.
@Test
public void reducerMRUnit() throws IOException{
String output="";
ArrayList<Text> list = new ArrayList<Text>(0);
list.add(new Text(""));
reduceDriver.withInput(new Text(""), list);
reduceDriver.withPathOutput(new Text(output),NullWritable.get(),"");
reduceDriver.runTest();
}
But, when I run this test it giving me NPE.
java.lang.NullPointerException
at org.apache.hadoop.fs.Path.<init>(Path.java:104)
at org.apache.hadoop.fs.Path.<init>(Path.java:93)
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getDefaultWorkFile(FileOutputFormat.java:286)
at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:129)
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.getRecordWriter(MultipleOutputs.java:476)
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:456)
at org.clinical3PO.learn.fasta.ArffToFastAReducer.reduce(ArffToFastAReducer.java:127)
at org.clinical3PO.learn.fasta.ArffToFastAReducer.reduce(ArffToFastAReducer.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mrunit.mapreduce.ReduceDriver.run(ReduceDriver.java:265)
at org.apache.hadoop.mrunit.TestDriver.runTest(TestDriver.java:640)
at org.apache.hadoop.mrunit.TestDriver.runTest(TestDriver.java:627)
at org.clinical3PO.learn.fasta.MRUnitTest.ArffToFastAReducerMRUnitTest.reducerMRUnit(ArffToFastAReducerMRUnitTest.java:63)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Reducer code:
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class AReducer extends Reducer<Text, Text, Text, NullWritable>{
private MultipleOutputs<Text, NullWritable> mos = null;
@Override
public void setup(Context context) throws IOException {
mos = new MultipleOutputs<Text, NullWritable>(context);
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
mos = new MultipleOutputs<Text, NullWritable>(context);
mos.write(key, value, "filename");
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
Any Suggestions?
MRUnit currently has a known issue, which is not well documented, that testing MultipleOutputs
requires running the test with PowerMockRunner
and a PrepareForTest
annotation applied to mock the reducer class. JIRA issues MRUNIT-13 and MRUNIT-213 contain detailed discussion of this. MRUNIT-213 is still unresolved/unfixed.
Adding PowerMock to the project then triggers some further challenges in lining up the right compatible versions of Mockito and PowerMock. The documentation on Using PowerMock with Mockito covers which versions are compatible.
I tried making these changes to your sample. That got past the NullPointerException
, but then I ran into one final problem. The expected path output declared in the test did not match up with the "filename"
path used by the reducer code. I changed the expected path output to get the test completely passing.
Here is my final result: a fully working project with your sample test. Enjoy!
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test</groupId>
<artifactId>test-mrunit</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>Test MRUnit</name>
<description>Test MRUnit</description>
<properties>
<hadoop.version>2.7.1</hadoop.version>
<powermock.version>1.6.4</powermock.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-core</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.1.0</version>
<classifier>hadoop2</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</project>
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class AReducer extends Reducer<Text, Text, Text, NullWritable>{
private MultipleOutputs<Text, NullWritable> mos = null;
@Override
public void setup(Context context) throws IOException {
mos = new MultipleOutputs<Text, NullWritable>(context);
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
mos.write(key, NullWritable.get(), "filename");
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest(AReducer.class)
public class TestAReducer {
@Test
public void reducerMRUnit() throws IOException{
ReduceDriver reduceDriver = new ReduceDriver(new AReducer());
String output = "";
ArrayList<Text> list = new ArrayList<Text>(0);
list.add(new Text(""));
reduceDriver.withInput(new Text(""), list);
reduceDriver.withPathOutput(new Text(output), NullWritable.get(), "filename");
reduceDriver.runTest();
}
}