verticavsql

Create function in vertica


I checked through many sources but cudnt get result.

How to create a function in vertica to return count of all sessions in database?

Can someone povide some idea or possible expamples in this topic.


Solution

  • create C++(count_it.cpp )

    #include "Vertica.h"
    #include <time.h>
    #include <sstream>
    #include <iostream>
    
    using namespace Vertica;
    using namespace std;
    
    class Average : public AggregateFunction
    {
        virtual void initAggregate(ServerInterface &srvInterface, IntermediateAggs &aggs)
        {
            try {
                VNumeric &sum = aggs.getNumericRef(0);
                sum.setZero();
    
                vint &cnt = aggs.getIntRef(1);
                cnt = 0;
            } catch(exception& e) {
                 vt_report_error(0, "Exception while initializing intermediate aggregates: [%s]", e.what());
            }
        }
    
        void aggregate(ServerInterface &srvInterface, BlockReader &argReader, IntermediateAggs &aggs)
        {
            try {
                VNumeric &sum = aggs.getNumericRef(0);
                vint     &cnt = aggs.getIntRef(1);
    
                do {
                    const VNumeric &input = argReader.getNumericRef(0);
                    if (!input.isNull()) {
                        sum.accumulate(&input);
                        sum.setZero();
                        cnt++;
                    }
                } while (argReader.next());
            } catch(exception& e) {
                vt_report_error(0, "Exception while processing aggregate: [%s]", e.what());
            }
        }
    
        virtual void combine(ServerInterface &srvInterface,IntermediateAggs &aggs,MultipleIntermediateAggs &aggsOther)
        {
            try {
                VNumeric       &mySum      = aggs.getNumericRef(0);
                vint           &myCount    = aggs.getIntRef(1);
    
                do {
                    const VNumeric &otherSum   = aggsOther.getNumericRef(0);
                    const vint     &otherCount = aggsOther.getIntRef(1);
    
                    mySum.accumulate(&otherSum);
                    mySum.setZero();
                    myCount += otherCount;
    
                } while (aggsOther.next());
            } catch(exception& e) {
                vt_report_error(0, "Exception while combining intermediate aggregates: [%s]", e.what());
            }
        }
    
        virtual void terminate(ServerInterface &srvInterface, BlockWriter &resWriter, IntermediateAggs &aggs)
        {
            try {
                const VerticaType  &numtype = aggs.getTypeMetaData().getColumnType(0);
                const VNumeric     &sum     = aggs.getNumericRef(0);
                sum.setZero();
    
                uint64* tmp = (uint64*)malloc(numtype.getMaxSize() / sizeof(uint64));
                VNumeric cnt(tmp, numtype.getNumericPrecision(), numtype.getNumericScale());
                cnt.copy(aggs.getIntRef(1));
                VNumeric &out = resWriter.getNumericRef();
                if (cnt.isZero())
                    out.setZero();
                else
                    out.add(&cnt,&sum);
            } catch(exception& e) {
                vt_report_error(0, "Exception while computing aggregate output: [%s]", e.what());
            }
        }
    
        InlineAggregate()
    };
    
    
    class count_itFactory : public AggregateFunctionFactory
    {
        virtual void getPrototype(ServerInterface &srvfloaterface,ColumnTypes &argTypes,ColumnTypes &returnType)
        {
            argTypes.addNumeric();
            returnType.addNumeric();
        }
    
        virtual void getReturnType(ServerInterface &srvfloaterface,const SizedColumnTypes &inputTypes,SizedColumnTypes &outputTypes)
        {
            int int_part = inputTypes.getColumnType(0).getNumericPrecision();
            int frac_part = inputTypes.getColumnType(0).getNumericScale();
            outputTypes.addNumeric(int_part+frac_part, frac_part);
        }
    
        virtual void getIntermediateTypes(ServerInterface &srvInterface,const SizedColumnTypes &inputTypes,SizedColumnTypes &intermediateTypeMetaData)
        {
            int int_part = inputTypes.getColumnType(0).getNumericIntegral();
            int frac_part = inputTypes.getColumnType(0).getNumericFractional();
            intermediateTypeMetaData.addNumeric(int_part+frac_part, frac_part);
            intermediateTypeMetaData.addInt();
        }
    
        virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface)
        { return vt_createFuncObject<Average>(srvfloaterface.allocator); }
    
    };
    
    RegisterFactory(count_itFactory);
    

    compile C++

    g++ -D HAVE_LONG_INT_64 -I /opt/vertica/sdk/include -Wall -shared -Wno-unused-value -fPIC -o count_it.so /home/dbadmin/libs/count_it.cpp /opt/vertica/sdk/include/Vertica.cpp
    

    create library

    CREATE LIBRARY count_it AS '/home/dbadmin/libs/count_it.so';
    

    create function

    CREATE AGGREGATE FUNCTION count_it AS LANGUAGE 'C++' NAME 'count_itFactory' LIBRARY count_it;
    

    use function

      select count_it(client_id) from sesions;