elasticsearchhadoophiveelasticsearch-hadoop

ElasticSearch hive SerializationError handler


Using Elastic search version 6.8.0

hive> select * from provider1;
OK
{"id","k11",}
{"id","k12",}
{"id","k13",}
{"id","k14",}
{"id":"K1","name":"Ravi","salary":500}
{"id":"K2","name":"Ravi","salary":500}
{"id":"K3","name":"Ravi","salary":500}
{"id":"K4","name":"Ravi","salary":500}
{"id":"K5","name":"Ravi","salary":500}
{"id":"K6","name":"Ravi","salary":"sdfgg"}
{"id":"K7","name":"Ravi","salary":"sdf"}
{"id":"k8"}
{"id":"K9","name":"r1","salary":522}
{"id":"k10","name":"r2","salary":53}
Time taken: 0.179 seconds, Fetched: 14 row(s)

ADD JAR /home/smrafi/elasticsearch-hadoop-6.8.0/dist/elasticsearch-hadoop-6.8.0.jar;
CREATE external TABLE hive_es_with_handler( data STRING)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
  'es.resource' = 'test_eshadoop/healthCareProvider',
  'es.nodes' = 'vpc-pid-pre-prod-es-cluster-b7thvqfj3tp45arxl34gge3yyi.us-east-2.es.amazonaws.com',
  'es.input.json' = 'yes',
  'es.index.auto.create' = 'true',
  'es.write.operation'='upsert',
  'es.nodes.wan.only' = 'true',
  'es.port' = '443',
  'es.net.ssl'='true',
  'es.batch.size.entries'='1',
  'es.mapping.id' ='id',
  'es.batch.write.retry.count'='-1',
  'es.batch.write.retry.wait'='60s',
  'es.write.rest.error.handlers' = 'es, ignoreBadRecords',
  'es.write.data.error.handlers' = 'customLog',
  'es.write.data.error.handler.customLog' = 'com.verisys.elshandler.CustomLogOnError',
  'es.write.rest.error.handler.es.client.resource'="error_es_index/error",
  'es.write.rest.error.handler.es.return.default'='HANDLED',
  'es.write.rest.error.handler.log.logger.name' = 'BulkErrors',
  'es.write.data.error.handler.log.logger.name' = 'SerializationErrors',
  'es.write.rest.error.handler.ignoreBadRecords' = 'com.verisys.elshandler.IgnoreBadRecordHandler',
  'es.write.rest.error.handler.es.return.error'='HANDLED');
insert into hive_es_with_handler10 select * from provider1;

Below is exception trace, it failed complaining the error.handler index is not present

Caused by: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: org.codehaus.jackson.JsonParseException: Unexpected character (',' (code 44)): was expecting a colon to separate field name and value  at [Source: [B@1e3f0aea; line: 1, column: 7]
    at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:95)
    at org.elasticsearch.hadoop.serialization.ParsingUtils.doFind(ParsingUtils.java:168)
    at org.elasticsearch.hadoop.serialization.ParsingUtils.values(ParsingUtils.java:151)
    at org.elasticsearch.hadoop.serialization.field.JsonFieldExtractors.process(JsonFieldExtractors.java:213) 
    at org.elasticsearch.hadoop.serialization.bulk.JsonTemplatedBulk.preProcess(JsonTemplatedBulk.java:64)
    at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:54)
    at org.elasticsearch.hadoop.hive.EsSerDe.serialize(EsSerDe.java:171)
    at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:725)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
    at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
    at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:130)
    at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:148)
    at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:550)
    ... 9 more
Caused by: org.codehaus.jackson.JsonParseException: Unexpected character (',' (code 44)): was expecting a colon to separate field name and value  at [Source: [B@1e3f0aea; line: 1, column: 7]
    at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1433)
    at org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:521)
    at org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(JsonParserMinimalBase.java:442)
    at org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.java:500)
    at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:93)   ... 22 more

I tried to use the custom SerializationErrorHandler But it is of no use and Handler is not coming into context, Its completely stopping the job instead of continuing for the good records even After having default (HANDLED as the constant)


Solution

  • Seems you have invalid JSON

    Mentioned in the docs, this is not handled by Hive

    Serialization Error Handlers are not yet available for Hive. Elasticsearch for Apache Hadoop uses Hive’s SerDe constructs to convert data into bulk entries before being sent to the output format. SerDe objects do not have a cleanup method that is called when the object ends its lifecycle. Because of this, we do not support serialization error handlers in Hive as they cannot be closed at the end of the job execution