I am consuming messages from Kafka in an RxScala application and would like to process the messages in a sliding window of dynamic size, i.e. I want to group all messages that were created within the same 2 seconds. RxScala does not provide this kind of operator as far as I know. This repository (https://github.com/davidmoten/rxjava-extras) does provide a function “toListWhile” but in the form of Java Code.
A simplified try to call the needed function fails:
import com.github.davidmoten.rx.Transformers
import rx.functions.Func1
import rx.lang.scala.JavaConversions._
import rx.lang.scala.Observable
val o = Observable.from(List(1, 2, 3, 4, 5, 6))
val predicate = new Func1[java.lang.Integer, java.lang.Boolean] {
override def call(t: Integer): Boolean = true
}
val p = o.compose(Transformers.toListWhile(predicate))
This is the error I am getting:
Error:(75, 45) type mismatch;
found : rx.functions.Func1[Integer,Boolean]
required: rx.functions.Func1[_ >: _$7, Boolean] where type _$7 <: Int
Note: Integer <: Any, but Java-defined trait Func1 is invariant in type T.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
val p = o.compose(Transformers.toListWhile(predicate))
^
Could some one help me with calling that code from Scala, please? Thanks!
The solution
import java.lang.Boolean
import com.github.davidmoten.rx.Transformers
import rx.functions.Func1
import rx.lang.scala.JavaConversions._
import rx.lang.scala.Observable
val o = Observable.from(List[java.lang.Integer](1, 2, 3, 4, 5, 6))
val predicate = new Func1[java.lang.Integer, java.lang.Boolean] {
override def call(t: Integer): Boolean = true
}
val p = o.compose[java.util.List[java.lang.Integer]](Transformers.toListWhile(predicate))
java.lang.Integer
and Int
are different types, though they are convertible to each other. Use either
val o = Observable.from(List[java.lang.Integer](1, 2, 3, 4, 5, 6))
or
val predicate = new Func1[Int, java.lang.Boolean] ...
The first should definitely work, the second might not (there are problems with using Java-defined generics with type arguments extending AnyVal
, like Int
).