I am trying to create a function which counts and sums values by day (to later calculate the average). I got this far:
CREATE OR REPLACE FUNCTION state_group_count_and_sum( state map<timestamp, frozen<tuple<bigint,double>>>, timestamp timestamp, value double )
CALLED ON NULL INPUT
RETURNS map<timestamp, frozen<tuple<bigint,double>>>
LANGUAGE java AS '
Date date = (Date) timestamp;
Calendar cal = Calendar.getInstance(); // locale-specific
cal.setTime(date);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
date = cal.getTime();
TupleValue tupleValue = state.get(date);
Long count = (Long) tupleValue.getLong(0);
if (count == null) count = 1L;
else count = count + 1L;
Double sum = (Double) tupleValue.getDouble(1);
if (sum == null) sum = value;
else sum = sum + value;
//if (tupleValue == null) ?
tupleValue.setLong(0, count);
tupleValue.setDouble(1, sum);
state.put(date, tupleValue);
return state; ' ;
CREATE OR REPLACE AGGREGATE group_count_and_sum(timestamp, double)
SFUNC state_group_count_and_sum
STYPE map<timestamp, frozen<tuple<bigint,double>>>
INITCOND {};
This fails because tupleValue is null at every new day which is not in the map yet. How do I instantiate a tuple value in a UDA?
Fixed it.
CREATE OR REPLACE FUNCTION state_group_count_and_sum( state map<timestamp, frozen<tuple<bigint,double>>>, timestamp timestamp, value double )
CALLED ON NULL INPUT
RETURNS map<timestamp, frozen<tuple<bigint,double>>>
LANGUAGE java AS '
Date date = (Date) timestamp;
Calendar cal = Calendar.getInstance(); // locale-specific
cal.setTime(date);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
date = cal.getTime();
TupleValue tupleValue = state.get(date);
if (tupleValue == null) {
com.datastax.driver.core.TupleType tupleType = com.datastax.driver.core.TupleType.of(com.datastax.driver.core.ProtocolVersion.NEWEST_SUPPORTED, com.datastax.driver.core.CodecRegistry.DEFAULT_INSTANCE, com.datastax.driver.core.DataType.bigint(), com.datastax.driver.core.DataType.cdouble());
tupleValue = tupleType.newValue(0L, 0.0);
}
Long count = (Long) tupleValue.getLong(0);
if (count == null) count = 1L;
else count = count + 1L;
Double sum = (Double) tupleValue.getDouble(1);
if (sum == null) sum = value;
else sum = sum + value;
tupleValue.setLong(0, count);
tupleValue.setDouble(1, sum);
state.put(date, tupleValue);
return state; ' ;
CREATE OR REPLACE AGGREGATE group_count_and_sum(timestamp, double)
SFUNC state_group_count_and_sum
STYPE map<timestamp, frozen<tuple<bigint,double>>>
INITCOND {};