scalahadoopiteratorpipescalding

how to convert Scalding TypedPipe to Iterator


In my Scalding hadoop job, I've got some grouping logic on a pipe, and then I need to process each group:

val georecs : TypedPipe[GeoRecord] = getRecords

georecs.map( r => (getRegion(r),r) )
  .groupBy(_._1)
  .mapValueStream( xs => clusterRecords(xs) )
  .values
  .write(out)

Inside clusterRecords I need to convert the iterator that's passed in into a TypedPipe so that I can 1) sample it and 2) take the cross product:

//turn the iterator to a pipe so we can sample it    
    val sample = TypedPipe.from( xs.map( x => Centroid(x._2.coreActivity)).toIterable)
    .sample(0.11)
    .distinct

//turn the iterator to a pipe so we can take its cross product
val records : TypedPipe[GeoRecord] = TypedPipe.from(xs.map(_._2).toIterable)

records
  .cross(sample)   //cartesian product of records and centroids
  .groupBy( _._2)  // group By the user record so we get a list of pairs (user, centroid)
  .minBy( x => score( x._1.coreActivity, x._2.core) ) //find the centroid with the lowest score for each Record
  .values
  .groupBy( x => x._2 )   //now groupBy centroid to get the clusters
  .values

The problem is that mapValueStream expects the mapping function to return an iterator, but what I have is a TypedPipe. I know how to turn an iterator into a pipe, but not the other way around. Do I need to execute it, write it to disk, and then read it back in?

And if so, what's the best way to accomplish that?


Solution

  • Looks like you can convert a pipe to an iterator by running it. That can be accomplished like this:

    val georecs : TypedPipe[GeoRecord] = getRecords
    
    val i : Iterator[GeoRecord] = georecs
      .toIterableExecution
      .waitFor(this.scaldingConfig,this.mode)
      .get
      .toIterator
    

    (typechecks, but not yet tested)