I have almost finished my Scalding project which uses the Type Safe API instead of the Fields API. The last issue that remains for me in overall project set up is the integration tests of the entire Scalding job itself (I have finished unit tests for Type Safe External Operations pattern yay!). This means to run the complete job and test the output of the various sinks of my job.
However, something very peculiar is happening. In my
typedSink { scala.collection.mutable.Buffer[] => Unit }
It seems that my program does not see the buffer or do anything with the buffer so the integration test always passes even when it should not. Below is both the job itself and the test to help illuminate what is going on:
object MyJob {
val inputArgPath = "input"
val validOutputArgPath = "validOutput"
val invalidOutputArgPath = "invalidOutput"
}
class MyJob(args: Args) extends Job(args) {
import OperationWrappers._
implicit lazy val uId: Some[UniqueID] = Some(UniqueID.getIDFor(flowDef))
val inputPath: String = args(MyJob.inputArgPath)
val validOutputPath: String = args(MyJob.validOutputArgPath)
val invalidOutputPath: String = args(MyJob.invalidOutputArgPath)
val eventInput: TypedPipe[(LongWritable, Text)] = this.mode match {
case m: HadoopMode => TypedPipe.from(WritableSequenceFile[LongWritable, Text](inputPath))
case _ => TypedPipe.from(TypedTsv[(LongWritable, Text)](inputPath))
}
def returnOutputPipe(outputString: String): FixedPathSource with TypedSink[(LongWritable, Text)] with TypedSource[(LongWritable, Text)] = {
val eventOutput: FixedPathSource with TypedSink[(LongWritable, Text)] with TypedSource[(LongWritable, Text)] = this.mode match {
case m: HadoopMode => WritableSequenceFile[LongWritable, Text](outputString)
case _ => TypedTsv[(LongWritable, Text)](outputString)
}
eventOutput
}
val validatedEvents: TypedPipe[(LongWritable, Either[Text, Event])] = eventInput.convertJsonToEither.forceToDisk
validatedEvents.removeInvalidTuples.removeEitherWrapper.write(returnOutputPipe(invalidOutputPath))
validatedEvents.keepValidTuples.removeEitherWrapper.write(returnOutputPipe(validOutputPath))
override protected def handleStats(statsData: CascadingStats) = {
//This is code to handle counters.
}
}
Below is the integration test:
class MyJobTest extends FlatSpec with Matchers {
private val LOG = LoggerFactory.getLogger(classOf[MyJobTest])
val validEvents: List[(LongWritable, Text)] = scala.io.Source.fromInputStream(getClass.getResourceAsStream("/validEvents.txt")).getLines().toList.map(s => {
val eventText = new Text
val typedFields = s.split(Constants.TAB)
eventText.set(typedFields(1))
(new LongWritable(typedFields(0).toLong), eventText)
})
"Integrate-Test: My Job" should "run test" in {
LOG.info("Before Job Test starts.")
JobTest(classOf[MyJob].getName)
.arg(MyJob.inputArgPath, "input")
.arg(MyJob.invalidOutputArgPath, "invalidOutput")
.arg(MyJob.validOutputArgPath, "validOutput")
.source(TypedTsv[(LongWritable, Text)]("input"), validEvents)
.typedSink[(LongWritable, Text)](TypedTsv[(LongWritable, Text)]("invalidOutput")) {
(buffer: mutable.Buffer[(LongWritable, Text)]) => {
LOG.info("This is inside the buffer1.")
buffer.size should equal(1000000)
}
}
.typedSink[(LongWritable, Text)](TypedTsv[(LongWritable, Text)]("validOutput")) {
(buffer: mutable.Buffer[(LongWritable, Text)]) => {
LOG.info("This is inside the buffer2.")
buffer.size should equal(1000000000)
}
}
.run
.finish
}
}
And finally, the output:
[INFO] --- maven-surefire-plugin:2.7:test (default-test) @ MyJob ---
[INFO] Tests are skipped.
[INFO]
[INFO] --- scalatest-maven-plugin:1.0:test (test) @ MyJob ---
Discovery starting.
16/01/28 10:06:42 INFO jobs.MyJobTest: Before Job Test starts.
16/01/28 10:06:42 INFO property.AppProps: using app.id: A98C9B84C79348F8A7784D8247410C13
16/01/28 10:06:42 INFO util.Version: Concurrent, Inc - Cascading 2.6.1
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] starting
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] source: MemoryTap["NullScheme"]["0.2996348736498404"]
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] sink: MemoryTap["NullScheme"]["0.8393418014297485"]
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] sink: MemoryTap["NullScheme"]["0.20643450953780684"]
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] parallel execution is enabled: true
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] starting jobs: 1
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] allocating threads: 1
16/01/28 10:06:42 INFO flow.FlowStep: [com.myCompany.myProject.c...] starting step: local
16/01/28 10:06:42 INFO util.Version: HV000001: Hibernate Validator 5.0.3.Final
Dumping custom counters:
rawEvent 6
validEvent 6
16/01/28 10:06:42 INFO jobs.MyJob: RawEvents: 6
16/01/28 10:06:42 INFO jobs.MyJob: ValidEvents: 6
16/01/28 10:06:42 INFO jobs.MyJob: InvalidEvents: 0
16/01/28 10:06:42 INFO jobs.MyJob: Job has valid counters and is exiting successfully.
As you can see, the Logger logs the "Before Job Test Starts" but nothing happens inside of the typedSink parts. This is frustrating because my code looks like all of the other code I see for this but it does not work. It should fail the test but everything runs successfully. Additionally, the Logger inside of the typedSink never outputs. Lastly, if you look at the output, you see that it handled counters correctly so it is running the job to completion. I have spent many hours trying new things but nothing seems to work. Hopefully the community will be able to help me. Thanks!
So, while I don't have a great answer to this post, I have what worked for me. Basically my problem was that I was using ScalaTest to run my Scalding jobs from this link: Using the ScalaTest Maven plugin. This worked fine for my unit tests on operations but this caused weirdness when using ScalaTest with JobTest. After talked to the Scalding devs and finally acknowledging my team's own success with JUnitRunner, I decided to go with that. I changed my POM to support JUnitRunner and added @RunWith(classOf[JUnitRunner])
annotations to my tests. Everything worked and behaved like I wanted them too.