I am writing data to accumulo storage natively using Geomesa Native Client. Here is my java code
package org.locationtech.geomesa.api;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryFactory;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.geotools.factory.CommonFactoryFinder;
import org.geotools.feature.AttributeTypeBuilder;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.junit.Assert;
import org.junit.Test;
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex$;
import org.locationtech.geomesa.utils.index.IndexMode$;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.type.AttributeDescriptor;
import org.opengis.filter.FilterFactory2;
import javax.annotation.Nullable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
public class WorkerBeta {
public static void main(String[] args){
try {
DomainObjectValueSerializer dovs = new DomainObjectValueSerializer();
final GeoMesaIndex<DomainObject> index = AccumuloGeoMesaIndex.buildWithView(
"aj_v14",
"localhost:2181",
"hps",
"root", "9869547580",
false,
dovs,
new SimpleFeatureView<DomainObject>() {
AttributeTypeBuilder atb = new AttributeTypeBuilder();
private List<AttributeDescriptor> attributeDescriptors =
Lists.newArrayList(atb.binding(Integer.class).buildDescriptor("rId")
, atb.binding(String.class).buildDescriptor("dId")
, atb.binding(Integer.class).buildDescriptor("s")
, atb.binding(Integer.class).buildDescriptor("a")
, atb.binding(Integer.class).buildDescriptor("e")
);
@Override
public void populate(SimpleFeature f, DomainObject domainObject, String id, byte[] payload, Geometry geom, Date dtg) {
f.setAttribute("rId", domainObject.rideId);
f.setAttribute("dId", domainObject.deviceId);
f.setAttribute("s", domainObject.speed);
f.setAttribute("a", domainObject.angle);
f.setAttribute("e", domainObject.error);
}
@Override
public List<AttributeDescriptor> getExtraAttributes() {
return attributeDescriptors;
}
}
);
//Inserting
final DomainObject one = new DomainObject(1, "AJJASsP", 12, 40, 1);
final GeometryFactory gf = JTSFactoryFinder.getGeometryFactory();
System.out.println(index.insert(
one,
gf.createPoint(new Coordinate(-74.0, 34.0)),
date("2017-03-31T01:15:00.000Z")
));
//Read
GeoMesaQuery q = GeoMesaQuery.GeoMesaQueryBuilder.builder()
.within(-90.0, -180, 90, 180)
.during(date("2017-01-01T00:00:00.000Z"), date("2017-04-01T00:00:00.000Z"))
.build();
Iterable<DomainObject> results = index.query(q);
int counter = 0;
for(DomainObject dm : results){
counter += 1;
System.out.println("result counter: " + counter);
dovs.toBytes(dm);
}
}
catch (Exception ex){
ex.printStackTrace();
}
index.close();
}
public static class DomainObject {
public final int rideId;
public final String deviceId;
public final int angle;
public final int speed;
public final int error;
public DomainObject(int rideId, String deviceId, int angle, int speed, int error) {
this.rideId = rideId;
this.deviceId = deviceId;
this.angle = angle;
this.speed = speed;
this.error = error;
}
}
public static class DomainObjectValueSerializer implements ValueSerializer<DomainObject> {
public static final Gson gson = new Gson();
@Override
public byte[] toBytes(DomainObject o) {
return gson.toJson(o).getBytes();
}
@Override
public DomainObject fromBytes(byte[] bytes) {
return gson.fromJson(new String(bytes), DomainObject.class);
}
}
public static Date date(String s) {
return Date.from(ZonedDateTime.parse(s).toInstant());
}
}
The problem with this code is, I need to create index
object every time for a new insert request and call index.close()
to reflect the same. But I can't execute insert()
agin, once index.close()
is called. However i will be accepting insert request from queue at very high rate and I don't want to create index
object every time. How can i do that?
In short how i can flush writes without calling close()
.
I created geomesa Client class file to use geomesa natively. Below is the partial implementation of the same which shows how you can flush with AccumuloAppendFeatureWriter
without calling to close.
public class GeomesaClient {
private AccumuloDataStore ds = null;
private AccumuloAppendFeatureWriter fw = null;
private SimpleFeatureSource sfs = null;
private String tableName = "";
private FeatureStore fst = null;
private SimpleFeatureType sft;
public GeomesaClient(Map<String, String> dsConf) throws Exception {
this.ds = (AccumuloDataStore) DataStoreFinder.getDataStore(dsConf);
this.tableName = dsConf.get("tableName");
sft = createFeatureType();
if(!Arrays.asList(this.ds.getTypeNames()).contains(sft.getTypeName())){
ds.createSchema(sft);
}
this.fst = (FeatureStore)sfs;
this.fw = (AccumuloAppendFeatureWriter) (this.ds.getFeatureWriterAppend(sft.getTypeName(),
Transaction.AUTO_COMMIT));
this.sfs = ds.getFeatureSource(sft.getTypeName());
}
/*
Flush with AccumuloAppendFeatureWriter
*/
public void flush(boolean force) {
fw.flush();
}
}