I am using datastax java driver 3.1.0 to connect to cassandra cluster and my cassandra cluster version is 2.0.10. I am writing asynchronously with QUORUM consistency.
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
My above save method will be called from multiple threads at very fast speed.
Question:
I want to throttle the request to executeAsync
method which writes asynchronously into Cassandra. If I write at very high speed than my Cassandra cluster can handle then it will start throwing errors and I want all my writes should go successfully into cassandra without any loss.
I saw this post where solution is to use Semaphore
with fixed number of permits. But I am not sure how and what is the best way to implement that. I have never used Semaphor before. This is the logic. Can anyone provide an example with Semaphore basis on my code or if there is any better way/option, then let me know as well.
In the context of writing a dataloader program, you could do something like the following:
- To keep things simple use a Semaphore or some other construct with a fixed number of permits (that will be your maximum number of inflight requests). Whenever you go to submit a query using executeAsync, acquire a permit. You should really only need 1 thread (but may want to introduce a pool of # cpu cores size that does this) that acquires the permits from the Semaphore and executes queries. It will just block on acquire until there is an available permit.
- Use Futures.addCallback for the future returned from executeAsync. The callback should call Sempahore.release() in both onSuccess and onFailure cases. By releasing a permit, this should allow your thread in step 1 to continue and submit the next request.
Also I have seen couple of other post where they have talked about using RingBuffer
or Guava RateLimitter
so which one is better and I should be using? Below are the options I can think of:
Can anyone help me with an example of how we can throttle the request or get backpressure for cassandra writes and making sure all writes goes successfully into cassandra?
Not an authoritative answer but maybe it would be helpful. First you should consider what would you do when query cannot be executed right away. No matter which rate limiting you chose if you get requests at higher rate than you can write to Cassandra eventually you'll get your process clogged with waiting requests. And at that moment you would need to tell your clients to hold their requests for a while ("push back"). E.g. if they are coming via HTTP then response status would be 429 "Too Many Requests". If you generate requests in same process then decide what longest timeout is acceptable. That said if Cassandra cannot keep up then it's time to scale (or tune) it.
Maybe before implementing rate limits it's worth to experiment a little and add artificial delays to your threads before call to save
method (e.g. using Thread.sleep(...)) and see whether it is indeed your problem or something else is needed.
Query returning error is back-pressure from Cassandra. But you may choose or implement RetryPolicy to determine when to retry failed queries.
Also you may look at connection pool options (and especially Monitoring and tuning the pool). One can tune number of asynchronous requests per connection. However documentation says that for Cassandra 2.x this parameter caps to 128 and one should not change it (I'd experiment with it though :)
Implementation with Semaphore looks like
/* Share it among all threads or associate with a thread for per-thread limits
Number of permits is to be tuned depending on acceptable load.
*/
final Semaphore queryPermits = new Semaphore(20);
public void save(String process, int clientid, long deviceid) {
....
queryPermits.acquire(); // Blocks until a permit is available
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
queryPermits.release();
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
queryPermits.release(); // Permit should be released in all cases.
logger.logError("error= ", t);
}
}, executorService);
....
}
(In real code I'd create a decorator that would call wrapped method and then release the permits.)
Guava's RateLimiter is similar to semaphore but allows temporary bursts after underutilization periods and limits requests based on timing (not total number of active queries).
However requests will fail for various reasons anyway so probably it's better to have a plan how to retry them (in case of intermittent errors).
It might not be appropriate in your case but I'd try to use some queue or buffer to enqueue requests (e.g. java.util.concurrent.ArrayBlockingQueue
). "Buffer full" would mean that clients should wait or give up the request. Buffer would also be used to re-enqueue failed requests. However to be more fair failed requests probably should be put to front of queue so they are retried first. Also one should somehow handle situation when queue is full and there are new failed requests at the same time. A single-threaded worker then would pick requests form queue and send them to Cassandra. Since it should not do much it's unlikely that it becomes a bottle-neck. This worker can also apply it's own rate limits, e.g. based on timing with com.google.common.util.concurrent.RateLimiter
.
If one would want to avoid losing messages as much as possible they can put a message broker with persistence (e.g. Kafka) in front of Cassandra. This way incoming messages can survive even long outages of Cassandra. But, I guess, it's overkill in your case.