scalatwitterapache-sparkspark-streaming

twitterStream not found


I'm trying to compile my first scala program and I'm using twitterStream to get tweets, here is a snippet of my code:

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import TutorialHelper._

object Tutorial {
  def main(args: Array[String]) {

    // Location of the Spark directory 
    val sparkHome = "/home/shaza90/spark-1.1.0"

    // URL of the Spark cluster
    val sparkUrl = TutorialHelper.getSparkUrl()

    // Location of the required JAR files 
    val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar"

    // HDFS directory for checkpointing
    val checkpointDir = TutorialHelper.getHdfsUrl() + "/checkpoint/" 
    // Configure Twitter credentials using twitter.txt
    TutorialHelper.configureTwitterCredentials()
 val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, Seq(jarFile))

        val tweets = ssc.twitterStream()
val statuses = tweets.map(status => status.getText())
    statuses.print()
    ssc.checkpoint(checkpointDir)
    ssc.start()
      }
    }

When compiling I'm getting this error message:

value twitterStream is not a member of org.apache.spark.streaming.StreamingContext

Do you know if I'm missing any library or dependency?


Solution

  • In this case you want a stream of tweets. We all know that Sparks provides Streams. Now, lets check if Spark itself provides something for interacting with twitter specifically.

    Open Spark API-docs -> http://spark.apache.org/docs/1.2.0/api/scala/index.html#package

    Now search for twitter and bingo... there is something called TwitterUtils in package org.apache.spark.streaming. Now since it is called TwitterUtils and is in package org.apache.spark.streaming, I think it will provide helpers to create stream from twitter API's.

    Now lets click on TwitterUtils and goto -> http://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.streaming.dstream.ReceiverInputDStream

    And yup... it has a method with following signature

    def createStream(
                     ssc: StreamingContext,
                     twitterAuth: Option[Authorization],
                     filters: Seq[String] = Nil,
                     storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
                    ): ReceiverInputDStream[Status]
    

    It returns a ReceiverInputDStream[ Status ] where Status is twitter4j.Status.

    Parameters are further explained

    ssc
    StreamingContext object

    twitterAuth
    Twitter4J authentication, or None to use Twitter4J's default OAuth authorization; this uses the system properties twitter4j.oauth.consumerKey, twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and twitter4j.oauth.accessTokenSecret

    filters
    Set of filter strings to get only those tweets that match them

    storageLevel
    Storage level to use for storing the received objects

    This means you need to look a little( at least getting started part ) into twitter4j documentation too.