javascalarx-javascala-java-interoprx-scala

Call Java library (rxjava-extras) code in RxScala application


I am consuming messages from Kafka in an RxScala application and would like to process the messages in a sliding window of dynamic size, i.e. I want to group all messages that were created within the same 2 seconds. RxScala does not provide this kind of operator as far as I know. This repository (https://github.com/davidmoten/rxjava-extras) does provide a function “toListWhile” but in the form of Java Code.

A simplified try to call the needed function fails:

import com.github.davidmoten.rx.Transformers
import rx.functions.Func1
import rx.lang.scala.JavaConversions._
import rx.lang.scala.Observable

val o = Observable.from(List(1, 2, 3, 4, 5, 6))

val predicate = new Func1[java.lang.Integer, java.lang.Boolean] {
  override def call(t: Integer): Boolean = true
}

val p = o.compose(Transformers.toListWhile(predicate))

This is the error I am getting:

Error:(75, 45) type mismatch;
 found   : rx.functions.Func1[Integer,Boolean]
 required: rx.functions.Func1[_ >: _$7, Boolean] where type _$7 <: Int
Note: Integer <: Any, but Java-defined trait Func1 is invariant in type T.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
val p = o.compose(Transformers.toListWhile(predicate))
                                           ^

Could some one help me with calling that code from Scala, please? Thanks!

The solution

import java.lang.Boolean

import com.github.davidmoten.rx.Transformers
import rx.functions.Func1
import rx.lang.scala.JavaConversions._
import rx.lang.scala.Observable

val o = Observable.from(List[java.lang.Integer](1, 2, 3, 4, 5, 6))

val predicate = new Func1[java.lang.Integer, java.lang.Boolean] {
  override def call(t: Integer): Boolean = true
}

val p = o.compose[java.util.List[java.lang.Integer]](Transformers.toListWhile(predicate))

Solution

  • java.lang.Integer and Int are different types, though they are convertible to each other. Use either

    val o = Observable.from(List[java.lang.Integer](1, 2, 3, 4, 5, 6))
    

    or

    val predicate = new Func1[Int, java.lang.Boolean] ...
    

    The first should definitely work, the second might not (there are problems with using Java-defined generics with type arguments extending AnyVal, like Int).