I want to make a streaming JSON response via HTTP. The purpose is to send current time in a given city every second.
TL;DR: I need help with sending a result in effect F
and a function, which returns this effect, should be called every 1 second. Is there a simple way you know of?
I've already tried several approaches and all of them do not work.
def timeStreamingRoutes[F[_]: Sync : Timer](times: Times[F]): HttpRoutes[F] = {
val dsl = new Http4sDsl[F]{}
import dsl._
HttpRoutes.of[F] {
case GET -> Root / "streaming" / city =>
val throttling = Stream.awakeEvery[F](1.second)
for {
timeOrErr <- times.get(city.toUpperCase)
resp <- timeOrErr match {
case Right(time) => Ok(throttling.map(_ => time.asJson))
case Left(error) => BadRequest(throttling.map(_ => error.asJson))
}
} yield resp
}
}
Here, my times.get(city.toUpperCase)
function has the following signature:
def get(city: String): F[Either[Times.CurrentTimeError, Times.CurrentTime]]
, where CurrentTimeError
and CurrentTime
are my custom case classes.The problem is that I get time only once in timeOrErr <- times.get(city.toUpperCase)
line. So, every second it sends absolutely identical value (like, 2:43:31, 2:43:31, etc. And I want it to be 2:43:31, 2:43:32, etc.). And I have no idea how to make this function being called every second.
def timeStreamingRoutes[F[_]: Sync : Timer](times: Times[F]): HttpRoutes[F] = {
val dsl = new Http4sDsl[F]{}
import dsl._
HttpRoutes.of[F] {
case GET -> Root / "streaming" / city =>
val throttling = Stream.awakeEvery[F](1.second)
val payload = Stream(
for {
timeOrErr <- times.get(city.toUpperCase)
resp <- timeOrErr match {
case Right(time) => Ok(time.asJson)
case Left(error) => BadRequest(error.asJson)
}
} yield resp
)
val stream = throttling.zipRight(payload)
Ok(stream)
}
}
The problem here is the hell with nested monads. stream
has the Stream[F, F[Response[F]]]
type. And I can't make it a proper F[Response[F]]
because fs2 Stream does not provide functions like sequence
or traverse
. If I try to return Ok(stream)
, then Circe cannot serialize F
because it's abstract, so it is not even compiled.
HttpRoutes.of[F] {
case GET -> Root / "streaming" / city =>
val throttling = Stream.awakeEvery[F](1.second)
val payload = Stream(
for {
timeOrErr <- times.get(city.toUpperCase)
resp <- timeOrErr match {
case Right(time) => time.asJson
case Left(error) => error.asJson
}
} yield resp
)
val stream = throttling.map(_ => payload)
Ok(stream)
}
Well, number 3 is not compiled either. Primarily because I can't compose monads in payload
. That is, case Right(time) => time.asJson
and case Left(error) => error.asJson
must be something like case Right(time) => SomethingThatcanBeUsedAsALastWrapperInThisForComprehension(time.asJson)
.
Unfortunately, official docs has little info about it. I'll be glad to hear any suggestions!
For what I could understand you want something like this:
import cats.effect.{ContextShift, IO, Timer}
import fs2.Stream
import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder
import org.http4s.{EntityEncoder, HttpRoutes}
import org.http4s.circe.streamJsonArrayEncoderOf
import org.http4s.dsl.Http4sDsl
import scala.concurrent.duration._ // Provides the second extension method.
type Error = String
final case class Time(data: Long)
trait Times {
def get(cityName: String): IO[Either[Error, Time]]
}
object MyService extends Http4sDsl[IO] {
// JSON Encoders.
private implicit final val CirceEncoder: Encoder[Time] =
deriveEncoder
private implicit final val timeOrErrorCirceEncoder: Encoder[Either[Error, Time]] =
Encoder.either[Error, Time](leftKey = "error", rightKey = "time")
private implicit final val timeOrErrorEntityEncoder: EntityEncoder[IO, Stream[IO, Either[Error, Time]] =
streamJsonArrayEncoderOf
/** Returns the service itself. */
def apply(times: Times)
(implicit ev1: ContextShift[IO], ev2: Timer[IO]):HttpRoutes[IO] = HttpRoutes.of[IO] {
Root / "streaming" / city =>
Ok(Stream.awakeEvery[IO](1.second).evalMap(_ => times.get(city.toUpperCase)))
}
}