streamsetsbuffer-overrun

Streamsets gives this error trying to parse a valid JSON


I am setting up streamsets for a project.It has Kafka consumer as its origin. It was working fine for smaller messages but when the message size is larger it throws this error.

com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name at [Source: com.streamsets.pipeline.api.ext.io.OverrunReader@39c37ffd; line: 1, column: 4191

I have already set Max Object Length (chars) to 1000000 and parser.limit property to 10335040. I am unable to figure out this issue.

NA

Complete stack trace is

KAFKA_37 - Cannot parse record from message 'rms-search-data::0::61950': com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name at [Source: com.streamsets.pipeline.api.ext.io.OverrunReader@39c37ffd; line: 1, column: 4191]
com.streamsets.pipeline.api.base.OnRecordErrorException: KAFKA_37 - Cannot parse record from message 'rms-search-data::0::61950': com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name
 at [Source: com.streamsets.pipeline.api.ext.io.OverrunReader@39c37ffd; line: 1, column: 4191]
    at com.streamsets.pipeline.stage.origin.kafka.BaseKafkaSource.processKafkaMessageDefault(BaseKafkaSource.java:265)
    at com.streamsets.pipeline.stage.origin.kafka.BaseKafkaSource.processKafkaMessageDefault(BaseKafkaSource.java:224)
    at com.streamsets.pipeline.stage.origin.kafka.StandaloneKafkaSource.produce(StandaloneKafkaSource.java:86)
    at com.streamsets.pipeline.api.base.configurablestage.DSource.produce(DSource.java:38)
    at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$2(StageRuntime.java:283)
    at com.streamsets.pipeline.api.impl.CreateByRef.call(CreateByRef.java:40)
    at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:235)
    at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:298)
    at com.streamsets.datacollector.runner.StagePipe.process(StagePipe.java:219)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.processPipe(ProductionPipelineRunner.java:810)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPollSource(ProductionPipelineRunner.java:554)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.run(ProductionPipelineRunner.java:383)
    at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:527)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(ProductionPipeline.java:109)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.run(ProductionPipelineRunnable.java:75)
    at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(StandaloneRunner.java:703)
    at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:151)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226)
    at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226)
    at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:100)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name
 at [Source: com.streamsets.pipeline.api.ext.io.OverrunReader@39c37ffd; line: 1, column: 4191]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:483)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._parseName2(ReaderBasedJsonParser.java:1716)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._parseName(ReaderBasedJsonParser.java:1700)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextFieldName(ReaderBasedJsonParser.java:921)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:493)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:362)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:27)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3786)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2115)
    at com.fasterxml.jackson.core.JsonParser.readValueAs(JsonParser.java:1627)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:126)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:122)
    at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:223)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:517)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:362)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:27)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3786)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2115)
    at com.fasterxml.jackson.core.JsonParser.readValueAs(JsonParser.java:1627)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:126)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:122)
    at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:223)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:517)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:362)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:27)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3786)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2115)
    at com.fasterxml.jackson.core.JsonParser.readValueAs(JsonParser.java:1627)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:126)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:122)
    at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:223)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:517)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:362)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:27)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3786)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2115)
    at com.fasterxml.jackson.core.JsonParser.readValueAs(JsonParser.java:1627)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:126)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:122)
    at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:223)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3786)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2115)
    at com.fasterxml.jackson.core.JsonParser.readValueAs(JsonParser.java:1627)
    at com.streamsets.datacollector.json.JsonObjectReaderImpl.readObjectFromStream(JsonObjectReaderImpl.java:199)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl.readObjectFromStream(OverrunJsonObjectReaderImpl.java:196)
    at com.streamsets.datacollector.json.JsonObjectReaderImpl.read(JsonObjectReaderImpl.java:111)
    at com.streamsets.pipeline.lib.parser.json.JsonCharDataParser.parse(JsonCharDataParser.java:70)
    at com.streamsets.pipeline.lib.parser.WrapperDataParserFactory$WrapperDataParser.lambda$parse$0(WrapperDataParserFactory.java:105)
    at com.streamsets.pipeline.api.impl.CreateByRef.call(CreateByRef.java:40)
    at com.streamsets.pipeline.lib.parser.WrapperDataParserFactory$WrapperDataParser.parse(WrapperDataParserFactory.java:105)
    at com.streamsets.pipeline.stage.origin.kafka.BaseKafkaSource.processKafkaMessageDefault(BaseKafkaSource.java:244)
    ... 29 more

This json fails:-

{"payload":{"data":{"aIndex":"application0502","aType":"application","pIndex":"profile000","pType":"profile","da":{"clientId":"168613","clientType":"1","statusDataList":{"68348":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68348","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68349":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68349","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68351":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68351","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68365":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68365","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68366":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68366","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68367":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68369":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68370":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68371":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68372":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"}},"recruiterId":"76866550","isActivity":false},"ignoreParamsForIndexing":{"statusDetailsForAsyncActions":{"clientId":"168613","statusId":"1949","subStatusId":null,"assessmentTestId":"","feedbackFormIds":[],"hiring managers":[],"isBillingEnabled":null,"isOfferGenerationEnabled":null,"statusDataJson":{"assessment":{"action":1,"sendToNew":false,"resendToAll":false,"statusId":"1949","subStatusId":null},"CURR_STATUS_DATE":"2019-05-21 17:18:59"}},"projectDetailsForAsyncActions":{"projectId":"15463"}},"optn":{"_routing":"168613"},"action":22,"activityField":"STATUS_CHANGED"},"dataArray":null,"retryCount":3,"additionalHeaders":{},"routingKey":"168613","topic":"rms-search-data"},"headers":{"AppId":123,"SystemId":"1234","X-TRANSACTION-ID":"27108593751"}}

This Json gives success:-

{"payload":{"data":{"aIndex":"application0502","aType":"application","pIndex":"profile000","pType":"profile","da":{"clientId":"168613","clientType":"1","statusDataList":{"68348":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68348","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68349":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68349","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68351":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68351","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68365":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68365","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68366":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68366","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68367":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68369":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68370":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68371":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"}},"recruiterId":"76866550","isActivity":false},"ignoreParamsForIndexing":{"statusDetailsForAsyncActions":{"clientId":"168613","statusId":"1949","subStatusId":null,"assessmentTestId":"","feedbackFormIds":[],"hiring managers":[],"isBillingEnabled":null,"isOfferGenerationEnabled":null,"statusDataJson":{"assessment":{"action":1,"sendToNew":false,"resendToAll":false,"statusId":"1949","subStatusId":null},"CURR_STATUS_DATE":"2019-05-21 17:18:59"}},"projectDetailsForAsyncActions":{"projectId":"15463"}},"optn":{"_routing":"168613"},"action":22,"activityField":"STATUS_CHANGED"},"dataArray":null,"retryCount":3,"additionalHeaders":{},"routingKey":"168613","topic":"rms-search-data"},"headers":{"AppId":123,"SystemId":"1234","X-TRANSACTION-ID":"27108593751"}}


Solution

  • I wrote a quick pipeline to try to replicate this, but it works as I would expect. I had to set Max Object Length (chars) in the Kafka Consumer's Data Format configuration, as you did, and it read and parsed the data just fine.

    Check whether the data is being retrieved from Kafka intact: duplicate the pipeline, change the Data Format of the Kafka Consumer to Text, and send the output to a file. You should be able to see if all of the data is being read from the Kafka topic. It's possible that the maximum message size in Kafka has been set to 4k, and this results in messages being truncated.

    Another thing to check is that you are using the correct stage library. In fact, as explained in the comments, this was the fix - Deep was using the CDH 2.x consumer; when he changed it to Kafka 0.11.0.0 it started working correctly.