phantom-dsl

executing a load of reads on cassandra getting BusyPoolException


I am trying to execute a high load of queries using Phantom version 2.14.1 e.g :

case class Foo(id: String, x: Long, y: Long)
val list: List[Foo] = _
list.size = 100000
def find(id: String, x: Long, y:Long )
    select
      .where(_.id eqs id)
      .and(_.ts >= x)
      .and(_.ts < y)
      .fetch()
  }

list.map(f => find(f.id, f.x, f.y)

I am getting this exception :

[pool-2-thread-91] ERROR com.outworkers.phantom - Failed to execute query SELECT * FROM my_table WHERE id = 'some_uuid' AND x >= 1503501104 AND y < 1503501224;
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/0:0:0:0:0:0:0:1:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [localhost/0:0:0:0:0:0:0:1] Pool is busy (no available connection and the queue has reached its max size 256)))
    at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:220)
    at com.datastax.driver.core.RequestHandler.access$1200(RequestHandler.java:50)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:291)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution$1.onFailure(RequestHandler.java:358)
    at com.google.common.util.concurrent.Futures$6.run(Futures.java:1764)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
    at com.google.common.util.concurrent.Futures$ImmediateFuture.addListener(Futures.java:153)
    at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1776)
    at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1713)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.query(RequestHandler.java:313)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:283)
    at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:118)
    at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:98)
    at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
    at com.outworkers.phantom.ScalaGuavaAdapter$.statementToPromise(ScalaGuavaAdapter.scala:70)
    at com.outworkers.phantom.ScalaGuavaAdapter$.statementToFuture(ScalaGuavaAdapter.scala:32)
    at com.outworkers.phantom.ScalaGuavaAdapter$.fromGuava(ScalaGuavaAdapter.scala:90)
    at com.outworkers.phantom.ScalaGuavaAdapter$.fromGuava(ScalaGuavaAdapter.scala:26)
    at com.outworkers.phantom.builder.query.execution.GuavaAdapter$class.fromGuava(ExecutableStatements.scala:44)
    at com.outworkers.phantom.ScalaGuavaAdapter$.fromGuava(ScalaGuavaAdapter.scala:26)
    at com.outworkers.phantom.builder.query.execution.QueryInterface.future(QueryInterface.scala:71)
    at com.outworkers.phantom.builder.query.execution.ResultQueryInterface.fetch(ResultQueryInterface.scala:131)

I tried to configure contact points as suggested but seems that any number that I put in the configuration have zero impact

val connector = ContactPoint.LOCAL.withClusterBuilder( ).withoutJMXReporting().withoutMetrics().withPoolingOptions(
      new PoolingOptions()
        .setMaxConnectionsPerHost(HostDistance.LOCAL, 1)
        .setMaxConnectionsPerHost(HostDistance.REMOTE, 2)
        .setMaxRequestsPerConnection(HostDistance.LOCAL,100)
        .setMaxRequestsPerConnection(HostDistance.REMOTE,200)
    )
  ).keySpace(KeySpaceSerializer(keyspace).ifNotExists().`with`(replication eqs SimpleStrategy.replication_factor(1))
    .and(durable_writes eqs true))

Solution

  • I don't think what you are seeing here is related to streaming per se, the problem is the high volume of requests is monopolising the internal thread pool of the Datastax Java Driver, so basically the driver is running out of concurrent connections to Cassandra.

    This is to a degree tunable.

    PoolingOptions.setMaxRequestsPerConnection(HostDistance, int): maximum number of requests per connection;
    
    PoolingOptions.setMaxConnectionsPerHost(HostDistance, int): maximum number of connections in the pool;
    
    PoolingOptions.setMaxQueueSize(int): maximum number of enqueued requests before this exception is thrown.
    

    You would set them via the ClusterBuilder like this.

      val connector = ContactPoint.local
        .noHeartbeat()
        .withClusterBuilder(_.withoutJMXReporting()
          .withoutMetrics().withPoolingOptions(
            new PoolingOptions()
              .setMaxConnectionsPerHost(HostDistance.LOCAL, 15))
              .setMaxRequestsPerConnection(100)
        ).keySpace(KeySpaceSerializer(space).ifNotExists()
          .`with`(replication eqs SimpleStrategy.replication_factor(1))
          .and(durable_writes eqs true)
        )
    

    I haven't actually checked whether the values are correct all, merely showing you the DSL configuration.