I have a .txt file say list.txt which consists of list of source and destination URL in the format
google.de/2011/10/Extract-host link.de/2011/10/extact-host
facebook.de/2014/11/photos facebook.de/2014/11/name.jpg
community.cloudera.com/t5/ community.cloudera.com/t10/
facebook.de/2014/11/photos link.de/2011/10/extact-host
With the help of this post, How to create a VertexId in Apache Spark GraphX using a Long data type? I tried to create node and edges like :
val test = sc.textFile("list.txt") //running
val arrayForm = test.map(_.split("\t")) // running
val nodes: RDD[(VertexId, Option[String])] = arrayForm.flatMap(array => array).
map((_.toLong None))
val edges: RDD[Edge[String]] = arrayForm.
map(line => Edge(line(0), line(1), ""))
The problem here is I don't really know how to create VertexId and similarly edge from string datatype. Please let me know how to resolve this.
The answer is hashing. Since your VertexIDs are strings you can hash them using MurmurHash3
, make a graph, do what you want to do and then match the hash values with original strings.
Example code
package com.void
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.VertexId
import scala.util.hashing.MurmurHash3
object Main {
def main( args: Array[ String ] ): Unit = {
val conf =
new SparkConf()
.setAppName( "SO Spark" )
.setMaster( "local[*]" )
.set( "spark.driver.host", "localhost" )
val sc = new SparkContext( conf )
val file = sc.textFile("data/pr_data.txt");
val edgesRDD: RDD[(VertexId, VertexId)] =
file
.map( line => line.split( "\t" ) )
.map( line => (
MurmurHash3.stringHash( line( 0 ).toString ), MurmurHash3.stringHash( line( 1 ).toString )
)
)
val graph = Graph.fromEdgeTuples( edgesRDD, 1 )
// graph.triplets.collect.foreach( println )
// println( "####" )
val ranks =
graph
.pageRank( 0.0001 )
.vertices
ranks.foreach( println )
println( "####" )
val identificationMap =
file
.flatMap( line => line.split( "\t" ) )
.distinct
.map( line => ( MurmurHash3.stringHash( line.toString ).toLong, line ) )
identificationMap.foreach( println )
println( "####" )
val fullMap =
ranks
.join( identificationMap )
fullMap.foreach( println )
sc.stop()
}
}
Results
(-1578471469,1.2982456140350878)
(1547760250,0.7017543859649124)
(1657711982,1.0000000000000002)
(1797439709,0.7017543859649124)
(996122257,0.7017543859649124)
(-1127017098,1.5964912280701753)
####
(1547760250,community.cloudera.com/t5/)
(-1127017098,link.de/2011/10/extact-host)
(1657711982,facebook.de/2014/11/name.jpg)
(1797439709,facebook.de/2014/11/photos)
(-1578471469,community.cloudera.com/t10/)
(996122257,google.de/2011/10/Extract-host)
####
(-1578471469,(1.2982456140350878,community.cloudera.com/t10/))
(1797439709,(0.7017543859649124,facebook.de/2014/11/photos))
(1547760250,(0.7017543859649124,community.cloudera.com/t5/))
(996122257,(0.7017543859649124,google.de/2011/10/Extract-host))
(1657711982,(1.0000000000000002,facebook.de/2014/11/name.jpg))
(-1127017098,(1.5964912280701753,link.de/2011/10/extact-host))
You can remove hashed IDs from the RDD by mapping them out but I believe that PageRank isn't your end goal so you'll probably need them later.