I am writing serialized Thrift records to a file using Elephant Bird's splittable LZO compression. To achieve this I am using their ThriftBlockWriter
class. My Scalding job then uses the FixedPathLzoThrift source to process the records. This all works fine. The problem is that I am limited to records of a single Thrift class.
I want to start using RawBlockWriter
instead of ThriftBlockWriter[MyThriftClass]
. So instead of LZO-compressed Thrift records, my input will be LZO-compressed raw byte arrays. My question is: what should I use instead of FixedPathLzoThrift[MyThriftClass]
?
Explanation of "protocol-buffers" tag: Elephant Bird uses a Protocol Buffers SerializedBlock
class to wrap the raw input, as seen here.
I solved this by creating a FixedPathLzoRaw
class to use in place of FixedPathLzoThrift
:
case class FixedPathLzoRaw(path: String*) extends FixedPathSource(path: _*) with LzoRaw
// Corresponds to LzoThrift trait
trait LzoRaw extends LocalTapSource with SingleMappable[Array[Byte]] with TypedSink[Array[Byte]] {
override def setter[U <: Array[Byte]] = TupleSetter.asSubSetter[Array[Byte], U](TupleSetter.singleSetter[Array[Byte]])
override def hdfsScheme = HadoopSchemeInstance((new LzoByteArrayScheme()).asInstanceOf[Scheme[_, _, _, _, _]])
}