cassandrauser-defined-functionsuser-defined-aggregate

Instantiate tuple value in Cassandra UDA function with map and tuple value (for daily average)


I am trying to create a function which counts and sums values by day (to later calculate the average). I got this far:

CREATE OR REPLACE FUNCTION state_group_count_and_sum( state map<timestamp, frozen<tuple<bigint,double>>>, timestamp timestamp, value double )
CALLED ON NULL INPUT
RETURNS map<timestamp, frozen<tuple<bigint,double>>>
LANGUAGE java AS '
Date date = (Date) timestamp;
Calendar cal = Calendar.getInstance(); // locale-specific
cal.setTime(date);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
date = cal.getTime();

TupleValue tupleValue = state.get(date);

Long count = (Long) tupleValue.getLong(0);
if (count == null) count = 1L;
else count = count + 1L;

Double sum = (Double) tupleValue.getDouble(1);
if (sum == null) sum = value;
else sum = sum + value;

//if (tupleValue == null) ?
tupleValue.setLong(0, count);
tupleValue.setDouble(1, sum);
state.put(date, tupleValue);
return state; ' ;

CREATE OR REPLACE AGGREGATE group_count_and_sum(timestamp, double) 
SFUNC state_group_count_and_sum 
STYPE map<timestamp, frozen<tuple<bigint,double>>>
INITCOND {};

This fails because tupleValue is null at every new day which is not in the map yet. How do I instantiate a tuple value in a UDA?


Solution

  • Fixed it.

    CREATE OR REPLACE FUNCTION state_group_count_and_sum( state map<timestamp, frozen<tuple<bigint,double>>>, timestamp timestamp, value double )
    CALLED ON NULL INPUT
    RETURNS map<timestamp, frozen<tuple<bigint,double>>>
    LANGUAGE java AS '
    Date date = (Date) timestamp;
    Calendar cal = Calendar.getInstance(); // locale-specific
    cal.setTime(date);
    cal.set(Calendar.HOUR_OF_DAY, 0);
    cal.set(Calendar.MINUTE, 0);
    cal.set(Calendar.SECOND, 0);
    cal.set(Calendar.MILLISECOND, 0);
    date = cal.getTime();
    
    TupleValue tupleValue = state.get(date);
    if (tupleValue == null) {
    com.datastax.driver.core.TupleType tupleType = com.datastax.driver.core.TupleType.of(com.datastax.driver.core.ProtocolVersion.NEWEST_SUPPORTED, com.datastax.driver.core.CodecRegistry.DEFAULT_INSTANCE, com.datastax.driver.core.DataType.bigint(), com.datastax.driver.core.DataType.cdouble());
    tupleValue = tupleType.newValue(0L, 0.0);
    }
    
    Long count = (Long) tupleValue.getLong(0);
    if (count == null) count = 1L;
    else count = count + 1L;
    
    Double sum = (Double) tupleValue.getDouble(1);
    if (sum == null) sum = value;
    else sum = sum + value;
    
    tupleValue.setLong(0, count);
    tupleValue.setDouble(1, sum);
    state.put(date, tupleValue);
    return state; ' ;
    
    CREATE OR REPLACE AGGREGATE group_count_and_sum(timestamp, double) 
    SFUNC state_group_count_and_sum 
    STYPE map<timestamp, frozen<tuple<bigint,double>>>
    INITCOND {};