fiwarefiware-cygnusfiware-sth-comet

Fiware Cygnus - Error: collection already exists when Cygnus receives notifications two or more times


I am using Fiware Cygnus configured with STH. When Cygnus receives two or more notifications of changes in any entity it can't send the new value to STH. It logs the error collection already exists and the change is not saved. In the first notification everything goes well.

I have performed the following steps:

At the first time, the Cygnus works fine, so I change the attribute again and the Cygnus shows me the error.

How to solve this problem?

The full message is:

* I am using cygnus in a container docker as described in the documentation.

* I used mongodb-ip:27017 instead real ip to post here.

time=2017-03-08T11:51:05.164Z | lvl=ERROR |
corr=53d86140-03f5-11e7-a70e-080027f6529d |
trans=236416c2-776e-4cc0-91dc-29bca203ea2a | srv=red | subsrv=/red/red |
comp=cygnus-ngsi | op=processRollbackedBatches |msg=com.telefonica.iot.cygnus.sinks.NGSISink[394] :
Persistence error. Message: -, Command failed with error -1:
'collection already exists' on server <mongodb-ip>:27017. The full response is
{ "ok" : 0.0, "errmsg" : "collection already exists" }, Stack trace:
 [com.telefonica.iot.cygnus.sinks.NGSISTHSink.persistOne(NGSISTHSink.java:158),
com.telefonica.iot.cygnus.sinks.NGSISTHSink.persistBatch(NGSISTHSink.java:93), 
com.telefonica.iot.cygnus.sinks.NGSISink.processRollbackedBatches(NGSISink.java:387), 
com.telefonica.iot.cygnus.sinks.NGSISink.process(NGSISink.java:370),
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68), 
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147),
java.lang.Thread.run(Thread.java:745)]
time=2017-03-08T11:51:05.164Z | lvl=INFO | corr=53d86140-03f5-11e7-a70e-080027f6529d | trans=236416c2-776e-4cc0-91dc-29bca203ea2a | srv=red | subsrv=/red/red | comp=cygnus-ngsi | op=doRollbackAgain | msg=com.telefonica.iot.cygnus.sinks.NGSISink[464] : Finishing internal transaction (53d86140-03f5-11e7-a70e-080027f6529d), this was retry #10

The configuration that I am using in my agent.conf (Cygnus) is:

cygnus-ngsi.sources = http-source
cygnus-ngsi.sinks = sth-sink
cygnus-ngsi.channels = sth-channel

cygnus-ngsi.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnus-ngsi.sources.http-source.channels = sth-channel
cygnus-ngsi.sources.http-source.port = 5050
cygnus-ngsi.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnus-ngsi.sources.http-source.handler.notification_target = /notify
cygnus-ngsi.sources.http-source.handler.default_service = default
cygnus-ngsi.sources.http-source.handler.default_service_path = /
cygnus-ngsi.sources.http-source.interceptors = ts gi
cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp
cygnus-ngsi.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor$Builder
cygnus-ngsi.sources.http-source.interceptors.gi.grouping_rules_conf_file = /opt/apache-flume/conf/grouping_rules.conf

cygnus-ngsi.sinks.sth-sink.type = com.telefonica.iot.cygnus.sinks.NGSISTHSink
cygnus-ngsi.sinks.sth-sink.channel = sth-channel
#cygnus-ngsi.sinks.sth-sink.enable_encoding = false
#cygnus-ngsi.sinks.sth-sink.enable_grouping = false
#cygnus-ngsi.sinks.sth-sink.enable_name_mappings = false
#cygnus-ngsi.sinks.sth-sink.enable_lowercase = false
cygnus-ngsi.sinks.sth-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.sth-sink.mongo_hosts = <mongodb-ip>:27017
cygnus-ngsi.sinks.sth-sink.mongo_username =
cygnus-ngsi.sinks.sth-sink.mongo_password =
cygnus-ngsi.sinks.sth-sink.db_prefix = sth_
cygnus-ngsi.sinks.sth-sink.collection_prefix = sth_
cygnus-ngsi.sinks.sth-sink.resolutions = day,hour,minute
#cygnus-ngsi.sinks.sth-sink.batch_size = 1
#cygnus-ngsi.sinks.sth-sink.batch_timeout = 30
#cygnus-ngsi.sinks.sth-sink.batch_ttl = 10
#cygnus-ngsi.sinks.sth-sink.data_expiration = 0
#cygnus-ngsi.sinks.sth-sink.ignore_white_spaces = true


cygnus-ngsi.channels.sth-channel.type = com.telefonica.iot.cygnus.channels.CygnusMemoryChannel
cygnus-ngsi.channels.sth-channel.capacity = 1000
cygnus-ngsi.channels.sth-channel.transactionCapacity = 100

In the STH side I have the following conf:

var config = {};

// STH server configuration
//--------------------------
config.server = {

  host: '10.0.2.15',

  port: '8666',


  defaultService: 'testservice',


  defaultServicePath: '/testservicepath',


  filterOutEmpty: 'true',

  aggregationBy: ['day', 'hour', 'minute'],

  temporalDir: 'temp'
};

// Database configuration
//------------------------
config.database = {

  dataModel: 'collection-per-entity',
 user: '',

  password: '',

  URI: 'localhost:27017',

  replicaSet: '',

  prefix: 'sth_',


  collectionPrefix: 'sth_',

  poolSize: '5',


  shouldStore: 'both',
  truncation: {


    expireAfterSeconds: '0',
    size: '0',

    max: '0'
  },

  ignoreBlankSpaces: 'true',
  nameMapping: {

    enabled: 'false',

    configFile: './name-mapping.json'
  },

  nameEncoding: 'false'
};

// Logging configuration
//------------------------
config.logging = {
  level: 'info',
  NODE_ENV variable is set to 'development'.

  format: 'pipe',


  proofOfLifeInterval: '60'
};

module.exports = config;

Solution

  • MongoDB error handling is based on exception messages that may vary from version to vesion.

    Cygnus has been coded to detect the string "code" : 48 within those exception messages, which means the collection already exists and nothing is done. "Nothing is done" means no collection is created (because it already exists) and the original exception is not progressed.

    The problem is when a MongoDB version returns a message not containing "code" : 48 string. In that case, the exception is assumed to be different than "collection already existent", thus it is progressed. This is what has happen in your case.