scalaapache-sparkhbasebulk-load

Spark issues in creating hfiles- Added a key not lexically larger than previous cell


I am trying to create hfiles to do bulk load into Hbase and it keeps throwing the error with the row key even though everything looks fine. I am using the following code:

val df = sqlContext.read.format("com.databricks.spark.csv")
   .option("header", "true")
   .option("inferSchema", "true")
   .load("data.csv")

import sqlContext.implicits._

val DF2 = df.filter($"company".isNotNull)
  .dropDuplicates(Array("company"))
  .sortWithinPartitions("company").sort("company")

val rdd = DF2.flatMap(x => { 
  val rowKey = Bytes.toBytes(x(0).toString)
  for (i <- 0 to cols.length - 1) yield {
    val index = x.fieldIndex(new String(cols(i)))
    val value = if (x.isNullAt(index)) "".getBytes else x(index).toString.getBytes
         (new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, COLUMN_FAMILY, cols(i), value))
  }
})

rdd.saveAsNewAPIHadoopFile("HDFS LOcation", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], fconf)

and I am using the following data

company,date,open,high,low,close,volume
ABG,01-Jan-2010,11.53,11.53,11.53,11.53,0
ABM,01-Jan-2010,20.66,20.66,20.66,20.66,0
ABR,01-Jan-2010,1.99,1.99,1.99,1.99,0
ABT,01-Jan-2010,53.99,53.99,53.99,53.99,0
ABX,01-Jan-2010,39.38,39.38,39.38,39.38,0
ACC,01-Jan-2010,28.1,28.1,28.1,28.1,0
ACE,01-Jan-2010,50.4,50.4,50.4,50.4,0
ACG,01-Jan-2010,8.25,8.25,8.25,8.25,0
ADC,01-Jan-2010,27.25,27.25,27.25,27.25,0

It throws the error as

java.io.IOException: Added a key not lexically larger than previous. Current cell = ADC/data:high/1505862570671/Put/vlen=5/seqid=0, lastCell = ADC/data:open/1505862570671/Put/vlen=5/seqid=0
    at org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:204)
    at org.apache.hadoop.hbase.io.hfile.HFileWriterV2.append(HFileWriterV2.java:265)
    at org.apache.hadoop.hbase.regionserver.StoreFile$Writer.append(StoreFile.java:992)
    at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:199)

I even tried sorting the data but still the error is thrown.


Solution

  • After spending couple of hours I found the solution, rootcause is that the columns are not sorted.

    Since Hfile needs keyvalue in lexicographically sorted order and in your case while writing HFileOutputFormat2->AbstractHFileWriter found Added a key not lexically larger than previous. Current cell. You have already applied sorting at row level once you sort the columns also it would work.

    Question here with good explanation why-hbase-keyvaluesortreducer-need-to-sort-all-keyvalue.

    Solution:

    //sort columns
    val cols = companyDs.columns.sorted
    
    //Rest of the code is same
    
    val output = companyDs.rdd.flatMap(x => {
      val rowKey = Bytes.toBytes(x(0).toString)
     val hkey = new ImmutableBytesWritable(rowKey)
      for (i <- 0 to cols.length - 1) yield {
        val index = x.fieldIndex(new String(cols(i)))
        val value = if (x.isNullAt(index)) "".getBytes else x(index).toString.getBytes
        val kv = new KeyValue(rowKey,COLUMN_FAMILY, cols(i).getBytes(),System.currentTimeMillis()+i ,x(i).toString.getBytes())
        (hkey,kv)
      }
    })
    output.saveAsNewAPIHadoopFile("<path>"
      , classOf[ImmutableBytesWritable], classOf[KeyValue],
      classOf[HFileOutputFormat2], config)