I have a table "vertices" with a custom datatype "Properties", which implements a HashMap<String, PropertyValue> and is interpreted by Flink as
RAW('org...impl.properties.Properties', '...')
datatype. PropertyValue is also a custom datatype. Neither the Properties class nor the PropertyValue class is a POJO.
vertex_id | vertex_properties | event_time |
---|---|---|
v1 | Relevance=2:Integer, Weekday:Tuesday:String | 2021-04-27 10:21:09.999 |
Now I want to extract each property value (relevance, weekday) and save it into an own column, which is done by a UDF function "ExtractPropertyValue".
public class ExtractPropertyValue extends ScalarFunction {
private final String propertyKey;
public ExtractPropertyValue(String propertyKey) {
this.propertyKey = propertyKey;
}
public PropertyValue eval(Properties p) {
return p.get(propertyKey);
}
which leads to the table
vertex_id | Relevance | Weekday | event_time |
---|---|---|---|
v1 | 2 | Tuesday | 2021-04-27 10:21:09.999 |
When I register the UDF "ExtractPropertyValue" with the deprecated method StreamTableEnvironment.registerFunction() and call it via the Table Api , for example with the "relevance" key
StreamTableEnvironment.registerFunction("ExtractPropertyValue", new ExtractPropertyValue("Relevance"))
vertices.select(call("ExtractPropertyValue", $("vertex_properties"))
I get the correct result table, as described above, but the datatype of the relevance column is now
LEGACY('RAW', 'ANY<org.gradoop.common.model.impl.properties.PropertyValue, rO0ABXNyADFvcmcuYXBhY2hlLmZsaW5rLmFwaS5qYXZhLnR5cGV1dGlscy5WYWx1ZVR5cGVJbmZvAAAAAAAAAAECAAFMAAR0eXBldAARTGphdmEvbGFuZy9DbGFzczt4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLlR5cGVJbmZvcm1hdGlvbpSNyEi6s3rrAgAAeHB2cgA2b3JnLmdyYWRvb3AuY29tbW9uLm1vZGVsLmltcGwucHJvcGVydGllcy5Qcm9wZXJ0eVZhbHVlAAAAAAAAAAECAAFMAAV2YWx1ZXQAEkxqYXZhL2xhbmcvT2JqZWN0O3hw>')
and not a RAW(....) type anymore. Later on, there are problems with converting those LEGACY datatypes, so I tried to register the function with the non deprecated method and call it the same way.
StreamTableEnvironment.createTemporaryFunction("ExtractPropertyValue", new ExtractPropertyValue("Relevance"))
vertices.select(call("ExtractPropertyValue", $("vertex_properties"))
But now a org.apache.flink.table.api.ValidationException is thrown, because flink could not extract the datatype of the Properties class anymore.
org.apache.flink.table.api.ValidationException: Could not extract a data type from 'class org...impl.properties.Properties'. Please pass the required data type manually or allow RAW types.
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a data type from 'class org...impl.properties.Properties'. Interpreting it as a structured type was also not successful.
Caused by: org.apache.flink.table.api.ValidationException: Field 'properties' of class 'org...impl.properties.Properties' is neither publicly accessible nor does it have a corresponding getter method.
As I said, I cannot change the underlying Properties and PropertyValue classes. I also tried to add @FunctionsHints to my "ExtractPropertyValue" class like
@FunctionHint(
input = @DataTypeHint(bridgedTo = Properties.class, allowRawPattern = "TRUE"),
output = @DataTypeHint(bridgedTo = PropertyValue.class))
public class ExtractPropertyValue extends ScalarFunction {
....
}
But every annotation I tried didn't fix the error. Is the problem, that the updated Table API only supports POJOs as custom datatypes? Really grateful for every answer, since I don't know what to change anymore.
When someone stumbles over this question: The flink annotations were the correct solution, I just placed them wrong.
I have UDFs like "ExtractPropertyValue" with the eval-method described here: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/udfs/
There had to be one function hint above the class itself like:
@FunctionHint(output = @DataTypeHint(value= "RAW", bridgedTo = PropertyValue.class))
public class ExtractPropertyValue extends ScalarFunction {...}
where PropertyValue is the custom datatype this function should extract from the table column. The other datatype hint is above the eval-function itself.
@FunctionHint(input = @DataTypeHint(inputGroup = InputGroup.ANY))
public PropertyValue eval(Object propertiesO) {
Properties properties = (Properties) propertiesO;
return properties.get(propertyKey);
}
I also had the problem with aggregation functions, that implement the accumulator method. There, a function-hint bridging the accumulator was necessary:
@FunctionHint(accumulator = @DataTypeHint(value = "RAW", bridgedTo = AvgAcc.class),
input = @DataTypeHint(inputGroup = InputGroup.ANY))
public void accumulate(AvgAcc acc, Object iValueO) {...}
Hope this helps someone!