I am trying to add Elasticsearch HTTP access to the Titan ES client using JEST. titan-es only supports ES' local and transport (TCP) mode. But I would like to support communication over ES' HTTP interface. That would allow client libraries like titan-es
to use AWS Elasticsearch as an indexing backend which only provides a HTTP(S) interface. See this post for more information.
I am looking for some feedback on the approach I am considering so far:
ElasticsearchHttpClient
that implements the org.elasticache.client.Client
interface. The new class will use the JestClient
as it's internal client. This way it will communicate with ES over HTTP. The new class will likely extend ES' AbstractClient
to reduce the methods that have to be implemented to: admin()
, settings()
, execute()
, threadPool()
, and close()
.HTTP_CLIENT
to ElasticSearchSetup
connect()
method on HTTP_CLIENT
returns an instance of Connection
which contains proper values for node
and client
. The client
member would be an instance of the new ElasticsearchHttpClient
class.ElasticSearchIndex.interfaceConfiguration()
method retrieves the correct instance of Connection
(containing the new ElasticsearchHttpClient
) if the INTERFACE
is configured as HTTP_CLIENT
. From that point on the rest of the code should continue to work over the new protocol. Does that sound like it should work? The 1st step is my biggest concern - I am not confident that I can implement all Client methods using the JestClient.
package com.thinkaurelius.titan.diskstorage.es;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.config.HttpClientConfig;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.*;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
public class ElasticsearchHttpClient extends AbstractClient {
private final JestClient internalClient;
private final ThreadPool pool;
public ElasticsearchHttpClient(String hostname, int port) {
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(new HttpClientConfig
.Builder(String.format("http://%s:%d", hostname, port))
.multiThreaded(true)
.build());
JestClient client = factory.getObject();
this.pool = new ThreadPool("jest");
this.internalClient = client;
}
@Override
public AdminClient admin() {
return null;
}
@Override
public Settings settings() {
return null;
}
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, Client> action, Request request) {
try {
JestResult response = internalClient.execute(convertRequest(action, request));
return convertResponse(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
execute(action, request);
}
private <Response extends ActionResponse> ActionFuture<Response> convertResponse(JestResult result) {
// TODO How to convert a JestResult a Elasticsearch ActionResponse/ActionFuture?
return null;
}
private <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> io.searchbox.action.Action<JestResult> convertRequest(Action<Request, Response, RequestBuilder, Client> action, Request request) {
// TODO How to convert an Elasticsearch Action<..> and Request to a Jest Action<JestResult>?
return null;
}
@Override
public ThreadPool threadPool() {
return pool;
}
@Override
public void close() throws ElasticsearchException {
pool.shutdownNow();
}
}
[I also asked this on the Titan mailing list and Elasticsearch forum.]
I've posted an answer in the Titan mailing list.
What you'd need to do from a Titan perspective is implement the IndexProvider interface. My guess is that it isn't feasible to make Jest look like a full Elasticsearch client.
I think you would use JestHttpClient -- you don't need to implement the Jest interface. IndexProvider has methods to create/drop/mutate/query an index, which you should be able to do over HTTP. Check the Elasticsearch HTTP documentation to see if you can do all the required methods on IndexProvider with JestHttpClient.
There's already an ElasticSearchIndex implementation of IndexProvider, which does NODE and TRANSPORT. You're trying to add an HTTP or JEST option. So you might consider shoehorning your changes into ElasticSearchIndex, but I'm not sure how well that will work out since the 2 existing impls are both full Elasticsearch clients. Perhaps consider creating a separate ElasticSearchHttpIndex implements IndexProvider if it's cleaner.