2

I am trying to insert and read data from accumulo storage using GeoMesa Native API. I have created a class file to use geomesa accumulo storage natively. Here is my java code :

package org.locationtech.geomesa.api;

import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryFactory;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.geotools.factory.CommonFactoryFinder;
import org.geotools.feature.AttributeTypeBuilder;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.junit.Assert;
import org.junit.Test;
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex$;
import org.locationtech.geomesa.utils.index.IndexMode$;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.type.AttributeDescriptor;
import org.opengis.filter.FilterFactory2;

import javax.annotation.Nullable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;

public class WorkerBeta {
    public static void main(String[] args){
        try {
            DomainObjectValueSerializer dovs = new DomainObjectValueSerializer();
            final GeoMesaIndex<DomainObject> index = AccumuloGeoMesaIndex.buildWithView(
            "aj_v14",
            "localhost:2181",
            "hps",
            "root", "9869547580",
            false,
            dovs,
            new SimpleFeatureView<DomainObject>() {
              AttributeTypeBuilder atb = new AttributeTypeBuilder();
              private List<AttributeDescriptor> attributeDescriptors =
                Lists.newArrayList(atb.binding(Integer.class).buildDescriptor("rId")
                    , atb.binding(String.class).buildDescriptor("dId")
                    , atb.binding(Integer.class).buildDescriptor("s")
                    , atb.binding(Integer.class).buildDescriptor("a")
                    , atb.binding(Integer.class).buildDescriptor("e")
                );
              @Override
              public void populate(SimpleFeature f, DomainObject domainObject, String id, byte[] payload, Geometry geom, Date dtg) {
                f.setAttribute("rId", domainObject.rideId);
                f.setAttribute("dId", domainObject.deviceId);
                f.setAttribute("s", domainObject.speed);
                f.setAttribute("a", domainObject.angle);
                f.setAttribute("e", domainObject.error);
              }

              @Override
              public List<AttributeDescriptor> getExtraAttributes() {
                return attributeDescriptors;
              }
            }
        );

        //Inserting 
        final DomainObject one = new DomainObject(1, "AJJASsP", 12, 40, 1);
        final GeometryFactory gf = JTSFactoryFinder.getGeometryFactory();
        System.out.println(index.insert(
                one,
                gf.createPoint(new Coordinate(-74.0, 34.0)),
                date("2017-03-31T01:15:00.000Z")
            ));

            //Read 
            GeoMesaQuery q = GeoMesaQuery.GeoMesaQueryBuilder.builder()
                .within(-90.0, -180, 90, 180)
                .during(date("2017-01-01T00:00:00.000Z"), date("2017-04-01T00:00:00.000Z"))
                .build();
            Iterable<DomainObject> results = index.query(q);
            int counter = 0;
            for(DomainObject dm : results){
                counter += 1;
                System.out.println("result counter: " + counter);
                dovs.toBytes(dm);
            }
        }
        catch (Exception ex){
      ex.printStackTrace();
        }
    }
    public static class DomainObject {
      public final int rideId;
      public final String deviceId;
      public final int angle;
      public final int speed;
      public final int error;

      public DomainObject(int rideId, String deviceId, int angle, int speed, int error) {
          this.rideId = rideId;
          this.deviceId = deviceId;
          this.angle = angle;
          this.speed = speed;
          this.error = error;
      }
    }
    public static class DomainObjectValueSerializer implements ValueSerializer<DomainObject> {
        public static final Gson gson = new Gson();
        @Override
        public byte[] toBytes(DomainObject o) {
            return gson.toJson(o).getBytes();
        }
        @Override
        public DomainObject fromBytes(byte[] bytes) {
            return gson.fromJson(new String(bytes), DomainObject.class);
        }
    }
    public static Date date(String s) {
        return Date.from(ZonedDateTime.parse(s).toInstant());
    }
}

Logs for command is :

suresh@hpss-MacBook-Air:~/GeomesaAccumuloNativeClient $ java -cp target/geomesa-native-api_2.11-1.3.2-SNAPSHOT.jar org.locationtech.geomesa.api.WorkerBeta
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:host.name=192.168.1.103
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_121
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.class.path=target/geomesa-native-api_2.11-1.3.2-SNAPSHOT.jar
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.library.path=/Users/suresh/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/var/folders/yk/h858t8h57nz42t6t4nqmwhcc0000gp/T/
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA>
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.name=Mac OS X
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.arch=x86_64
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.version=10.12.3
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.name=suresh
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.home=/Users/suresh
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.dir=/Users/suresh/GeomesaAccumuloNativeClient
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.accumulo.fate.zookeeper.ZooSession$ZooWatcher@73eb439a
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x15aea0c41f601a1, negotiated timeout = 30000
17/04/01 15:11:52 WARN data.AccumuloDataStore: Configured server-side iterators do not match client version - client version: 1.3.2-SNAPSHOT, server version: 1.3.0
50fa12fb-11f8-4776-bb35-95b32da9225d
[]

But when i try to verify the inserted record, I am unable to find any specific entries in created related to data inserted in Tables of accumulo web interface. Here is the screen shot for the accumulo tablesenter image description here. Please correct me if I am missing anything. Tons of thanks in advance.

Suresh Prajapati
  • 3,991
  • 5
  • 26
  • 38

2 Answers2

1

Two quick notes:

  1. Your type doesn't haven't a field named 'dtg' and GeoMesaQuery assumes one. To work around this easily, you can use 'GeoMesaQuery.GeoMesaQueryBuilder.builder().include().build()'. Long term, the native api could use some improvements to make it easy do to what you want fluently.

  2. To see if the records were written to Accumulo, you can use the Accumulo shell and scan the individual tables. If there is nothing in the tables, it might be worth debugging this code.

GeoJim
  • 1,320
  • 7
  • 12
1

Likely your insert is not getting flushed to disk. Accumulo uses a batch writer for performance - this will periodically write to disk once its internal buffer is filled. Since you are only inserting a single record, this isn't happening. To fix, you can call close on your GeoMesaIndex instance. This will flush any existing records to disk. You would then need to instantiate a new instance to do your query.

Emilio Lahr-Vivaz
  • 1,439
  • 6
  • 5
  • Great!!. It's working. Can you give me the reference of this info or how you came to this answer. – Suresh Prajapati Apr 03 '17 at 19:03
  • Can i use flush() without closing the index?. I am listening a queue channel for insert request and don't want create index object for each request – Suresh Prajapati Apr 04 '17 at 06:03
  • I tried calling flush(), but it doesn't work. As per the [story](https://geomesa.atlassian.net/browse/GEOMESA-885?focusedCommentId=16309&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16309) we can send mutations to accumulo without closing the writer. – Suresh Prajapati Apr 04 '17 at 06:14
  • I looked into the source code to figure out the problem - I'm a developer on the project, so I had an idea where to look. `flush` isn't implemented in the native-api yet. If you'd like to contribute, feel free to open a PR! As a work-around, you can configure the underlying batch writer properties to flush periodically through [system properties](http://www.geomesa.org/documentation/user/accumulo/configuration.html#batch-writer-properties) – Emilio Lahr-Vivaz Apr 04 '17 at 16:24