fiwarefiware-cygnusfiware-cosmos

Cygnus Cosmos integration


We manage to integrate idas-cygnus-orion flow, everything is working perfectly. (Thanks for your help Francisco)

Now we want to add the Cosmos to the integration and I've configured Cygnus with your answer posted here (Part starting with * is added for Cosmos configuration)

When I start Cygnus, we get this error msg

        [root@idas-new centos]# tail -500f nohup.out
Warning: No configuration directory set! Use --conf <dir> to override.
Warning: JAVA_HOME is not set!
+ exec /usr/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/usr/cygnus/lib/*:/usr/cygnus/plugins.d/cygnus/lib/*:/usr/cygnus/agent_test.conf -n cygnusagent
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/cygnus/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/cygnus/plugins.d/cygnus/lib/cygnus-0.12.0_SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
16/05/18 09:26:46 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
16/05/18 09:26:46 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/usr/cygnus/conf/agent_test.con
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink Agent: cygnusagent
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [cygnusagent]
16/05/18 09:26:46 INFO node.AbstractConfigurationProvider: Creating channels
16/05/18 09:26:46 INFO channel.DefaultChannelFactory: Creating instance of channel hdfs-channel type memory
16/05/18 09:26:46 INFO node.AbstractConfigurationProvider: Created channel hdfs-channel
16/05/18 09:26:46 INFO source.DefaultSourceFactory: Creating instance of source http-source, type org.apache.flume.source.http.HTTPS
16/05/18 09:26:46 INFO handlers.OrionRestHandler: Cygnus version (0.12.0_SNAPSHOT.d6ee0e4548ca6bf9116c1dee4c4cbcc7da8b1752)
16/05/18 09:26:46 INFO handlers.OrionRestHandler: Startup completed
16/05/18 09:26:46 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs-sink, type: com.telefonica.iot.cygnus.sinks.OrionHDF
16/05/18 09:26:46 ERROR node.AbstractConfigurationProvider: Sink hdfs-sink has been removed due to an error during configuration
java.lang.NullPointerException
        at com.telefonica.iot.cygnus.sinks.OrionHDFSSink.configure(OrionHDFSSink.java:339)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurati
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
16/05/18 09:26:46 INFO node.AbstractConfigurationProvider: Channel hdfs-channel connected to [http-source]
16/05/18 09:26:46 INFO node.Application: Starting new configuration:{ sourceRunners:{http-source=EventDrivenSourceRunner: { source:oyChannel{name: hdfs-channel}} }
16/05/18 09:26:46 INFO node.Application: Starting Channel hdfs-channel
16/05/18 09:26:46 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: CHANNEL, name: hdfs-channel, regist
16/05/18 09:26:46 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: hdfs-channel started
16/05/18 09:26:46 INFO node.Application: Starting Source http-source
16/05/18 09:26:46 INFO interceptors.GroupingRules: No grouping rules have been read
Exception in thread "Thread-1" java.lang.NullPointerException
        at java.io.File.<init>(Unknown Source)
        at com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$ConfigurationReader.run(GroupingInterceptor.java:244)
16/05/18 09:26:46 INFO mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
16/05/18 09:26:46 INFO mortbay.log: jetty-6.1.26
16/05/18 09:26:47 INFO mortbay.log: Started SocketConnector@0.0.0.0:5050

We have this grouping_rules.conf file in our path, but inside, everything is commented(since we didnt need this file for the CKAN integration) do we need to edit this file for Cosmos?(if yes, how?)

Would you please help us on this issue?

Many thanks Omer

cygnusagent.sources = http-source
cygnusagent.sinks = ckan-sink hdfs-sink
cygnusagent.channels = ckan-channel hdfs-channel

cygnusagent.sources.http-source.channels = ckan-channel hdfs-channel
cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnusagent.sources.http-source.port = 5050
cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.OrionRestHandler
cygnusagent.sources.http-source.handler.notification_target = /notify
cygnusagent.sources.http-source.handler.events_ttl = 2
cygnusagent.sources.http-source.interceptors = ts gi
cygnusagent.sources.http-source.interceptors.ts.type = timestamp
cygnusagent.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$Builder
cygnusagent.sources.http-source.interceptors.gi.gropuing_rules_conf_file = /etc/cygnus/conf/grouping_rules.conf

cygnusagent.channels.ckan-channel.type = memory
cygnusagent.channels.ckan-channel.capacity = 1000
cygnusagent.channels.ckan-channel.transactionCapacity = 100

cygnusagent.sinks.ckan-sink.channel = ckan-channel
cygnusagent.sinks.ckan-sink.type = com.telefonica.iot.cygnus.sinks.OrionCKANSink
cygnusagent.sinks.ckan-sink.enable_grouping = false
cygnusagent.sinks.ckan-sink.api_key = XXXXXXXXXX
cygnusagent.sinks.ckan-sink.ckan_host = data.lab.fiware.org
cygnusagent.sinks.ckan-sink.ckan_port = 443
cygnusagent.sinks.ckan-sink.orion_url = localhost:1026
cygnusagent.sinks.ckan-sink.attr_persistence = row
cygnusagent.sinks.ckan-sink.ssl = true
cygnusagent.sinks.ckan-sink.batch_size = 1
cygnusagent.sinks.ckan-sink.batch_timeout = 10

cygnusagent.sinks.hdfs-sink.type = com.telefonica.iot.cygnus.sinks.OrionHDFSSink
cygnusagent.sinks.hdfs-sink.channel = hdfs-channel
cygnusagent.sinks.hdfs-sink.enable_grouping = false
cygnusagent.sinks.hdfs-sink.hdfs_host = cosmos.lab.fiware.org
cygnusagent.sinks.hdfs-sink.hdfs_port = 14000
cygnusagent.sinks.hdfs-sink.hdfs_username = XXXXX
cygnusagent.sinks.hdfs-sink.hdfs_password = XXX
cygnusagent.sinks.hdfs-sink.oauth2_token = XXXXXXXXX
cygnusagent.sinks.hdfs-sink.file_format = json-column


cygnusagent.channels.hdfs-channel.type = memory
cygnusagent.channels.hdfs-channel.capacity = 1000
cygnusagent.channels.hdfs-channel.transactionCapacity = 100

Solution

  • Everything seems to be OK with the configuration except for:

    cygnusagent.sources.http-source.interceptors.gi.gropuing_rules_conf_file = /etc/cygnus/conf/grouping_rules.
    conf
    

    Try this changes and let see.