hadoopprotocol-buffersthriftscaldinglzo

Is there a Scalding source I can use for lzo-compressed binary data?


I am writing serialized Thrift records to a file using Elephant Bird's splittable LZO compression. To achieve this I am using their ThriftBlockWriter class. My Scalding job then uses the FixedPathLzoThrift source to process the records. This all works fine. The problem is that I am limited to records of a single Thrift class.

I want to start using RawBlockWriter instead of ThriftBlockWriter[MyThriftClass]. So instead of LZO-compressed Thrift records, my input will be LZO-compressed raw byte arrays. My question is: what should I use instead of FixedPathLzoThrift[MyThriftClass]?

Explanation of "protocol-buffers" tag: Elephant Bird uses a Protocol Buffers SerializedBlock class to wrap the raw input, as seen here.


Solution

  • I solved this by creating a FixedPathLzoRaw class to use in place of FixedPathLzoThrift :

    case class FixedPathLzoRaw(path: String*) extends FixedPathSource(path: _*) with LzoRaw
    
    // Corresponds to LzoThrift trait
    trait LzoRaw extends LocalTapSource with SingleMappable[Array[Byte]] with TypedSink[Array[Byte]] {
      override def setter[U <: Array[Byte]] = TupleSetter.asSubSetter[Array[Byte], U](TupleSetter.singleSetter[Array[Byte]])
      override def hdfsScheme = HadoopSchemeInstance((new LzoByteArrayScheme()).asInstanceOf[Scheme[_, _, _, _, _]])
    }