elasticsearchapache-flinkflink-streamingflink-sqlpyflink

Pyflink->Elastic converts Varchar to Long?


I started working with Pyflink last week and found myself in a roadblock situation. Basically I try to Import Data from Source A and sink it to Elastic, which works great, but there is one special Field that's not working properly.

The field is a 10 Char string which gets parsed by my PyFlink Job and runs through an encryption routine and converted to hex, which makes the string now 128 chars.

While sinking to elastic, somehow and somewhere the system seems to think of my string as a "long" type.

The following Error is thrown while trying to import:

Caused by: ElasticsearchException[Elasticsearch exception [type=mapper_parsing_exception, reason=failed to parse field [some_encrypted_id] of type [long] in document with id '10'. Preview of field's value: 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx']]; nested: ElasticsearchException[Elasticsearch exception [type=illegal_argument_exception, reason=For input string: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"]];

My sink definition:

CREATE TABLE es_sink(
                some_encrypted_id VARCHAR
            ) with (
                'connector' = 'elasticsearch-7',
                'hosts' = 'x', //normally not x
                'index' = 'x',//normally not x
                'document-id.key-delimiter' = '$',
                'sink.bulk-flush.max-size' = '42mb',
                'sink.bulk-flush.max-actions' = '32',
                'sink.bulk-flush.interval' = '1000',
                'sink.bulk-flush.backoff.delay' = '1000',
                'format' = 'json'
            )

I tried replacing Varchar with Text, but while creating the Job I get the following error:

java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: TEXT

I am honestly out of ideas here. I tried multiple different fields, everything seems to work as expected, just this one example is not.

I also don't see why the system tries to sink it as "long" type. I never defined anything as "long".

Hopefully someone can figure out what I am doing wrong here and point me to the right direction. If more info is needed please let me know!


Solution

  • Did you specify a data type for your fields in your elastic search index? ES will guess a type for the field from the value you inserted, and sometimes it might not be the one you expect.

    E.g. if you have an index AA and a field aa, which does not have a type mapping yet. And your program inserts '57', 'abc', ... sequentially. When ES first sees 57, it will guess this might be a numeric type and use something like integer or long, and your subsequent insertions might fail.

    You can try putting a mapping for the index before writing. PUT mappings