amazon-kinesissnowplow

Snowplow scala collector: Kinesis stream pockinesisfirehose doesn't exist


I am working on a click tracking project and I am using Snowplow (Open Source) for that. I am using Scala snowplow collector to collect the data and route it to Amazon Kinesis. But, when I am launching it with this configuration:

collector {
  # The collector runs as a web service specified on the following interface and port.
  interface = "0.0.0.0"
  port = 5555

  # Configure the P3P policy header.
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  # Optional cross domain policy configuration.
  # To disable, remove the "crossDomain" configuration and the collector will respond with a 404 to
  # the /crossdomain.xml route.
  crossDomain {
    # Domain that is granted access, *.acme.com will match http://acme.com and http://sub.acme.com
    domain = "*"
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = true
  }

  # The collector returns a cookie to clients for user identification
  # with the following domain and expiration.
  cookie {
    enabled = true
    expiration = 365 days
    # Network cookie name
    name = sp
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out this line to tie cookies to
    # the collector's full domain
    #domain = "{{cookieDomain}}"
  }

  # When enabled and the cookie specified above is missing, performs a redirect to itself to check
  # if third-party cookies are blocked using the specified name. If they are indeed blocked,
  # fallbackNetworkId is used instead of generating a new random one.
  cookieBounce {
    enabled = false
    # The name of the request parameter which will be used on redirects checking that third-party
    # cookies work.
    name = "n3pc"
    # Network user id to fallback to when third-party cookies are blocked.
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    # Optionally, specify the name of the header containing the originating protocol for use in the
    # bounce redirect location. Use this if behind a load balancer that performs SSL termination.
    # The value of this header must be http or https. Example, if behind an AWS Classic ELB.
    forwardedProtocolHeader = "X-Forwarded-Proto"
  }

  # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
  # token. All instances of that token are replaced withe the network ID. If the placeholder isn't
  # specified, the default value is `${SP_NUID}`.
  redirectMacro {
    enabled = false
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = pockinesisfirehose

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = pockinesisfirehosee

    # Whether to use the incoming event's ip as the partition key for the good stream/topic
    # Note: Nsq does not make use of partition key.
    useIpAddressAsPartitionKey = false

    # Enable the chosen sink by uncommenting the appropriate configuration
    sink {
      # Choose between kinesis, kafka, nsq, or stdout.
      # To use stdout, comment or remove everything in the "collector.streams.sink" section except
      # "enabled" which should be set to "stdout".
      enabled = kinesis
      # enabled = stdout

      # Region where the streams are located
      region = us-east-1

      # Thread pool size for Kinesis API requests
      threadPoolSize = 10

      # The following are used to authenticate for the Amazon Kinesis sink.
      # If both are set to 'default', the default provider chain is used
      # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      aws {
        accessKey = env
        secretKey = env
      }

      # Minimum and maximum backoff periods
      backoffPolicy {
        minBackoff = 1000
        maxBackoff = 5000
      }

      # Or Kafka
      #brokers = "{{kafkaBrokers}}"
      ## Number of retries to perform before giving up on sending a record
      #retries = 0

      # Or NSQ
      ## Host name for nsqd
      #host = "{{nsqHost}}"
      ## TCP port for nsqd, 4150 by default
      #port = {{nsqdPort}}
    }

    # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since the buffer was last emptied reaches time-limit
    buffer {
      byteLimit = 1024
      recordLimit = 10 # Not supported by Kafka; will be ignored
      timeLimit = 250
    }
  }
}

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
  loglevel = OFF # 'OFF' for no logging, 'DEBUG' for all logging.
  #loggers = ["akka.event.slf4j.Slf4jLogger"]
  loggers = []

  # akka-http is the server the Stream collector uses and has configurable options defined at
  # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
  http.server {
    # To obtain the hostname in the collector, the 'remote-address' header
    # should be set. By default, this is disabled, and enabling it
    # adds the 'Remote-Address' header to every request automatically.
    remote-address-header = on

    raw-request-uri-header = on

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      uri-parsing-mode = relaxed
    }
  }
}

It's giving out an error:

[main] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Creating thread pool of size 10
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Kinesis stream pockinesisfirehose doesn't exist
        at scala.Predef$.require(Predef.scala:224)
        at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.<init>(KinesisSink.scala:114)
        at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$.createAndInitialize(KinesisSink.scala:48)
        at com.snowplowanalytics.snowplow.collectors.scalastream.Collector$.run(Collector.scala:80)
        at com.snowplowanalytics.snowplow.collectors.scalastream.Collector$.main(Collector.scala:63)
        at com.snowplowanalytics.snowplow.collectors.scalastream.Collector.main(Collector.scala)

I am specifying all the parameters correctly, the kinesis stream name, AWS region, etc but not able to connect to my stream. What might I be doing wrong?


Solution

  • Based on the name of your variable, "pockinesisfirehose", it looks like you may be using Kinesis Firehose, not a regular Kinesis Stream. A regular Kinesis Stream is needed for this collector. Kinesis Firehose will take the data coming into the Kinesis Stream, and put it into another Amazon service (S3, Redshift, etc.)