I have a basic scala akka http CRUD application. See below for the relevant classes.
I'd simply like to write an entity id and some data (as json) to a Kafka topic whenever, for example, an entity is created/updated.
I'm looking at http://doc.akka.io/docs/akka-stream-kafka/current/producer.html, but am new to scala and akka, and unsure of how to integrate it into my application?
For example, from the docs above, this is the example of a producer writing to kafka, so I think I need to something similar, but whereabouts in my application should this go? Can I just add another map call in the create method in my service after I have created the user?
val done = Source(1 to 100)
.map { elem =>
new ProducerRecord[Array[Byte], String]("topic1", elem)
Or do I need to do something like the example here https://github.com/hseeberger/accessus in the bindAndHandle() method in my Server.scala?
object System {
implicit val system = ActorSystem()
implicit val dispatcher = system.dispatcher
implicit val actorMaterializer = ActorMaterializer()
object WebServer extends App {
import System._
val config = new ApplicationConfig() with ConfigLoader
ConfigurationFactory.setConfigurationFactory(new LoggingConfFileConfigurationFactory(config.loggingConfig))
val injector = Guice.createInjector(new MyAppModule(config))
val routes = injector.getInstance(classOf[Router]).routes
Http().bindAndHandle(routes, config.httpConfig.interface, config.httpConfig.port)
def routes(): Route = {
post {
entity(as[User]) { user =>
val createUser = userService.create(user)
onSuccess(createUser) {
case Invalid(y: NonEmptyList[Err]) => {
throw new ValidationException(y)
case Valid(u: User) => {
complete(ToResponseMarshallable((StatusCodes.Created, u)))
} ~
// More routes here, left out for example
def create(user: User): Future[MaybeValid[User]] = {
for {
validating <- userValidation.validateCreate(user)
result <- validating match {
case Valid(x: User) =>
.map(dbUser => Valid(UserConverters.fromUserRow(x)))
case y: DefInvalid =>
} yield result
def create(user: User): Future[User] = {
userTable returning userTable.map(_.userId)
into ((user, id) => user.copy(userId = id)) +=
user.copy(createdDate = Some(Timestamp.valueOf(LocalDateTime.now())))
Since you have written your Route
to unmarshall just 1 User
from the Entity
I don't think you need Producer.plainSink
. Rather, I think Producer.send
will work just as well. Also, as a side note, throwing exceptions is not "idiomatic" scala. So I changed the logic for invalid user:
val producer : KafkaProducer = new KafkaProducer(producerSettings)
val routes : Route =
post {
entity(as[User]) { user =>
val createUser = userService.create(user)
onSuccess(createUser) {
case Invalid(y: NonEmptyList[Err]) =>
complete(BadRequest -> "invalid user")
case Valid(u: User) => {
val producerRecord =
new ProducerRecord[Array[Byte], String]("topic1",s"""{"userId" : ${u.userId}, "entity" : "User"}""")
onComplete(producer send producerRecord) { _ =>
complete(ToResponseMarshallable((StatusCodes.Created, u)))