In my Scalding hadoop job, I've got some grouping logic on a pipe, and then I need to process each group:
val georecs : TypedPipe[GeoRecord] = getRecords
georecs.map( r => (getRegion(r),r) )
.groupBy(_._1)
.mapValueStream( xs => clusterRecords(xs) )
.values
.write(out)
Inside clusterRecords I need to convert the iterator that's passed in into a TypedPipe so that I can 1) sample it and 2) take the cross product:
//turn the iterator to a pipe so we can sample it
val sample = TypedPipe.from( xs.map( x => Centroid(x._2.coreActivity)).toIterable)
.sample(0.11)
.distinct
//turn the iterator to a pipe so we can take its cross product
val records : TypedPipe[GeoRecord] = TypedPipe.from(xs.map(_._2).toIterable)
records
.cross(sample) //cartesian product of records and centroids
.groupBy( _._2) // group By the user record so we get a list of pairs (user, centroid)
.minBy( x => score( x._1.coreActivity, x._2.core) ) //find the centroid with the lowest score for each Record
.values
.groupBy( x => x._2 ) //now groupBy centroid to get the clusters
.values
The problem is that mapValueStream expects the mapping function to return an iterator, but what I have is a TypedPipe. I know how to turn an iterator into a pipe, but not the other way around. Do I need to execute it, write it to disk, and then read it back in?
And if so, what's the best way to accomplish that?
Looks like you can convert a pipe to an iterator by running it. That can be accomplished like this:
val georecs : TypedPipe[GeoRecord] = getRecords
val i : Iterator[GeoRecord] = georecs
.toIterableExecution
.waitFor(this.scaldingConfig,this.mode)
.get
.toIterator
(typechecks, but not yet tested)