scalaakkaakka-streamalpakka

Code not working for FileTailSource in akka Stream


I have a log file that is fed data about every visit for some websites ( it is actually just a python programm that simaulates this ) , i want to count the number of visits per websites . So i m trying to use FileTailSource but it doesnt print anything to the file Can u help? Tahnks `

import akka.actor._
import akka._
import scala.concurrent._
import java.nio.file._
import akka.stream._
import akka.stream.scaladsl._
import akka.util.ByteString
import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO,Flow, Sink, Source}
import akka.stream.alpakka.file.scaladsl.FileTailSource
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import java.lang

object LogFileAnalyzer {
  def main(args: Array[String]): Unit = {
    // Create actor system and materializer
    implicit val system = ActorSystem("LogFileAnalyzer")
    implicit val materializer = ActorMaterializer()

    // Read the log file as a source of lines
    val logFile = Paths.get("./src/main/scala/log-generator.log")
    val source =   FileTailSource(logFile,maxChunkSize = 4096,startingPosition=0L,pollingInterval = 250.millis).via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 4096, allowTruncation = false)).map { line =>
      val fields = line.utf8String.split(" ")
      val website = fields(2)
      (website,1)
    }.groupBy(8,_._1).mapConcat()
    .reduce { (entry1, entry2) =>
    val (website1, visits1) = entry1
    val (website2, visits2) = entry2
    (website1, visits1 + visits2)}

    .map { case (website, count) => s"($website, $count)" }
    .map(s=>ByteString(s))



    // Parse each line of the log file to extract the website name
    //val websiteFlow = Flow[ByteString]
    
  
val sink = FileIO.toPath(Paths.get("./f.txt")) 
    // Group the website names and count the number of visits for each website
    //case (website, visits) => (website, visits +1)
    source.to(sink).run()
}
 
}

`

when i worked with FileIO.fromPath the code worked fine but it runs one time and doesnt process new streamed data i located the problem and i think it has something to do with the groupby or reduce functions


Solution

  • The main problem (besides the fact that your main method does not wait for your stream to complete and thus your program will terminate too early) is:

    reduce only emits a single value when the stream completes (emits when upstream completes), but the stream will never complete because FileTailSource will keep polling for new lines of the input file forever.

    You might want to use scan instead of reduce (because scan will emit a new item whenever a new item is received from upstream).