I am trying to write a piece of code which does following:-
Sample record in input csv:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
My input case class which represents a record in input csv:
case class InputRecord(recordId: String, name: String, salary: Long)
Sample record in output csv (that needs to be written):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
My output case class which represents a record in input csv:
case class OutputRecord(recordId: String, name: String, designation: String)
Reading a record using akka stream csv (uses Alpakka reactive s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
Now I have a function to process the records:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
Function to write the OutputRecord as csv
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
Function to send email notification:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
Stitching it all together
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
On Line 15 & 16 I am getting an error, I am either able to add Line 15 or Line 16 but not both since both notify
& writeOutput
needs outputRecord
. Once notify is called I loose my outputRecord
.
Is there a way I can add both notify
and writeOutput
to same graph?
I am not looking for parallel execution as I want to first call notify
and then only writeOutput
. So this is not helpful: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
The use case seems very simple to me but some how I am not able to find a clean solution.
The output of notify
is a PushResult
, but the input of writeOutput
is ByteString
. Once you change that it will compile. In case you need ByteString
, get the same from OutputRecord
.
BTW, in the sample code that you have provided, a similar error exists in readCSV
and process
.