scalding

How to check if a TypedPipe or a ValuePipe are empty in Scalding?


In Scalding, suppose you have a TypedPipe[Long] or ValuePipe[Long]. How would you go about checking whether they are empty in the most elegant/efficient way?

Currently testing the following:

val isTPEmpty: Boolean = typePipe.equals(TypedPipe.empty)
val isVPEmpty: Boolean = valuePipe.equals(EmptyValue)

Or, to make it more generic:

def isTypedPipeEmpty[A](typedPipe: TypedPipe[A]): Boolean = {
  val emptyTP: TypedPipe[A] = TypedPipe.empty
  typedPipe.equals(emptyTP)
}

UPDATE: this doesn't work (will return false for an empty TypedPipe). Appreciate any inputs.


Solution

  • After speaking to several people on this, there is no straight solution simply because a TypedPipe is distributed, and checking whether it is empty is "expensive", therefore one should avoid this as much as possible.

    If you absolutely have no choice, what worked for me was something "ugly" as creating a temporary empty TypedPipe, then calling mapWithValue on my ValuePipe, and if it is empty do X, otherwise do Y. Something like:

    TypedPipe.from(List()).mapWithValue(valuePipe) { case (temp, valuePipe) => if (valuePipe.isEmpty) doX else doY }
    

    But again, cumbersome.