I'm working on generalizing the DynamoDB + JanusGraph tutorial from AWS such that, given a standard .txt file with a standard convention, the program can ingest the data (as a Triple) and create the vertices, properties, and edges. Normally I wouldn't post such a lengthy problem, but it appears that these are all related to 4-5 lines within the same class, ObjectCreationCommand, which I created.
An example Triple looks like this: "name:Jim Henson \t isCreatorOf \t televisionshow:The Muppets"
Although the program compiles and runs, I get several exceptions thrown that prevent the graph from being filled. When I run the Factory program, it reads all of my triples and puts them into a Hash Set, but then the following error occurs (10 times, but this is only 1 example):
57338 [pool-10-thread-2] ERROR org.janusgraph.graphdb.database.StandardJanusGraph - Could not commit transaction [10] due to exception
org.janusgraph.diskstorage.locking.TemporaryLockingException: tx 0x181404008c7c already locked key-column ( 16-165-160-114-116- 30- 98-114- 97-110-100-116-121-112-229, 0) when tx 0x181408349015 tried to lock
at com.amazon.janusgraph.diskstorage.dynamodb.AbstractDynamoDBStore.acquireLock(AbstractDynamoDBStore.java:132)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore$4.call(MetricInstrumentedStore.java:155)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore$4.call(MetricInstrumentedStore.java:153)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.runWithMetrics(MetricInstrumentedStore.java:217)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.acquireLock(MetricInstrumentedStore.java:152)
at org.janusgraph.diskstorage.keycolumnvalue.KCVSProxy.acquireLock(KCVSProxy.java:52)
at org.janusgraph.diskstorage.BackendTransaction.acquireIndexLock(BackendTransaction.java:255)
at org.janusgraph.graphdb.database.StandardJanusGraph.prepareCommit(StandardJanusGraph.java:565)
at org.janusgraph.graphdb.database.StandardJanusGraph.commit(StandardJanusGraph.java:694)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.commit(StandardJanusGraphTx.java:1363)
at org.janusgraph.graphdb.database.management.ManagementSystem.commit(ManagementSystem.java:235)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:59)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Next, a similar exception is thrown:
57427 [pool-10-thread-10] ERROR com.amazon.janusgraph.example.MarvelGraphFactory - Error processing line Could not commit transaction due to exception during persistence tx 0x181404008c7c already locked key-column ( 16-165-160-114-116- 30- 98-114- 97-110-100-116-121-112-229, 0) when tx 0x181403f69b77 tried to lock
org.janusgraph.core.JanusGraphException: Could not commit transaction due to exception during persistence
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.commit(StandardJanusGraphTx.java:1374)
at org.janusgraph.graphdb.database.management.ManagementSystem.commit(ManagementSystem.java:235)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:59)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.janusgraph.core.JanusGraphException: Unexpected exception
at org.janusgraph.graphdb.database.StandardJanusGraph.commit(StandardJanusGraph.java:798)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.commit(StandardJanusGraphTx.java:1363)
... 6 more
Caused by: org.janusgraph.diskstorage.locking.TemporaryLockingException: tx 0x181404008c7c already locked key-column ( 16-165-160-114-116- 30- 98-114- 97-110-100-116-121-112-229, 0) when tx 0x181403f69b77 tried to lock
at com.amazon.janusgraph.diskstorage.dynamodb.AbstractDynamoDBStore.acquireLock(AbstractDynamoDBStore.java:132)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore$4.call(MetricInstrumentedStore.java:155)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore$4.call(MetricInstrumentedStore.java:153)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.runWithMetrics(MetricInstrumentedStore.java:217)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.acquireLock(MetricInstrumentedStore.java:152)
at org.janusgraph.diskstorage.keycolumnvalue.KCVSProxy.acquireLock(KCVSProxy.java:52)
at org.janusgraph.diskstorage.BackendTransaction.acquireIndexLock(BackendTransaction.java:255)
at org.janusgraph.graphdb.database.StandardJanusGraph.prepareCommit(StandardJanusGraph.java:565)
at org.janusgraph.graphdb.database.StandardJanusGraph.commit(StandardJanusGraph.java:694)
... 7 more
And then a schema-related exception is thrown:
58030 [pool-10-thread-4] ERROR com.amazon.janusgraph.example.MarvelGraphFactory - Error processing line Adding this property for key [~T$SchemaName] and value [rtbrandtype] violates a uniqueness constraint [SystemIndex#~T$SchemaName]
org.janusgraph.core.SchemaViolationException: Adding this property for key [~T$SchemaName] and value [rtbrandtype] violates a uniqueness constraint [SystemIndex#~T$SchemaName]
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.addProperty(StandardJanusGraphTx.java:791)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.addProperty(StandardJanusGraphTx.java:720)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.makeSchemaVertex(StandardJanusGraphTx.java:847)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.makePropertyKey(StandardJanusGraphTx.java:867)
at org.janusgraph.graphdb.types.StandardPropertyKeyMaker.make(StandardPropertyKeyMaker.java:100)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:47)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
And lastly, an exception that I really don't understand is thrown:
58512 [pool-10-thread-8] ERROR com.amazon.janusgraph.example.MarvelGraphFactory - Error processing line Could not find type for id: 11529
java.lang.IllegalStateException: Could not find type for id: 11529
at com.google.common.base.Preconditions.checkState(Preconditions.java:197)
at org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex.name(JanusGraphSchemaVertex.java:59)
at org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex.asIndexType(JanusGraphSchemaVertex.java:177)
at org.janusgraph.graphdb.database.management.ManagementSystem.getGraphIndexDirect(ManagementSystem.java:412)
at org.janusgraph.graphdb.database.management.ManagementSystem.getGraphIndex(ManagementSystem.java:422)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:55)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Because the final transaction is Null, a NullPointerException is thrown and the transaction is never committed; thus, my graph is initialized but empty.
Normally I wouldn't post such a lengthy problem, but it appears that these are all related to 4-5 lines within the same class, ObjectCreationCommand, which I created.
package com.amazon.janusgraph.creator;
import com.amazon.janusgraph.example.TravelGraphFactory;
import com.codahale.metrics.MetricRegistry;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.janusgraph.core.JanusGraph;
import com.amazon.janusgraph.triple.Triple;
import org.janusgraph.core.Multiplicity;
import org.janusgraph.core.PropertyKey;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.slf4j.Logger;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
public class ObjectCreationCommand implements Runnable {
public static JanusGraph graph;
private static Triple triple;
private static MetricRegistry REGISTRY;
public static Logger LOG;
private static final String TIMER_LINE = "TravelGraphFactory.line";
private static final String TIMER_CREATE = "TravelGraphFactory.create_";
private static final String COUNTER_GET = "TravelGraphFactory.get_";
public ObjectCreationCommand(JanusGraph graph, Triple triple, MetricRegistry REGISTRY, Logger LOG) {
this.graph = graph;
this.triple = triple;
this.REGISTRY = REGISTRY;
this.LOG = LOG;
}
@Override
public void run() {
JanusGraphManagement mgmt = graph.openManagement();
if (mgmt.getGraphIndex(triple.getRightObjectProperty()) == null) {
final PropertyKey hotelKey = mgmt.makePropertyKey(triple.getRightObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getRightObjectProperty(), Vertex.class).addKey(hotelKey).unique().buildCompositeIndex();
}
if (mgmt.getEdgeLabel(triple.getRelationship()) == null) {
mgmt.makeEdgeLabel(triple.getRelationship()).multiplicity(Multiplicity.MANY2ONE).make();
}
if (mgmt.getGraphIndex(triple.getLeftObjectProperty()) == null) {
final PropertyKey brandTypeKey = mgmt.makePropertyKey(triple.getLeftObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getLeftObjectProperty(), Vertex.class).addKey(brandTypeKey).unique().buildCompositeIndex();
}
mgmt.commit();
long start = System.currentTimeMillis();
String RIGHT_OBJECT_PROPERTY = triple.getRightObjectProperty();
Vertex rightObject = graph.addVertex();
rightObject.property(RIGHT_OBJECT_PROPERTY, triple.getRightObject());
REGISTRY.counter(COUNTER_GET + RIGHT_OBJECT_PROPERTY).inc();
String LEFT_OBJECT_PROPERTY = triple.getLeftObjectProperty();
Vertex leftObject = graph.addVertex();
rightObject.property(LEFT_OBJECT_PROPERTY, triple.getLeftObject());
REGISTRY.counter(COUNTER_GET + LEFT_OBJECT_PROPERTY).inc();
try {
processRelationship(graph, triple);
} catch (Throwable e) {
Throwable rootCause = ExceptionUtils.getRootCause(e);
String rootCauseMessage = null == rootCause ? "" : rootCause.getMessage();
LOG.error("Error processing line {} {}", e.getMessage(), rootCauseMessage, e);
}
long end = System.currentTimeMillis();
long time = end - start;
REGISTRY.timer(TIMER_CREATE + RIGHT_OBJECT_PROPERTY).update(time, TimeUnit.MILLISECONDS);
}
private static void processRelationship(JanusGraph graph, Triple triple) {
Vertex left = get(graph, triple.getLeftObjectProperty(), triple.getLeftObject());
if (null == left) {
REGISTRY.counter("error.missingLeftObject." + triple.getLeftObject()).inc();
left = graph.addVertex();
left.property(triple.getLeftObjectProperty(), triple.getLeftObject());
}
Vertex right = get(graph, triple.getRightObjectProperty(), triple.getRightObject());
if (null == right) {
REGISTRY.counter("error.missingRightObject." + triple.getRightObject()).inc();
right = graph.addVertex();
right.property(triple.getRightObjectProperty(), triple.getRightObject());
}
left.addEdge(triple.getRelationship(), right);
}
private static Vertex get(final JanusGraph graph, final String key, final String value) {
final GraphTraversalSource g = graph.traversal();
final Iterator<Vertex> it = g.V().has(key, value);
return it.hasNext() ? it.next() : null;
}
}
The exceptions above show that all the errors come from lines 47, 55 or 59 of that class:
JanusGraphManagement mgmt = graph.openManagement();
if (mgmt.getGraphIndex(triple.getRightObjectProperty()) == null) {
[47] final PropertyKey hotelKey = mgmt.makePropertyKey(triple.getRightObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getRightObjectProperty(), Vertex.class).addKey(hotelKey).unique().buildCompositeIndex();
}
if (mgmt.getEdgeLabel(triple.getRelationship()) == null) {
mgmt.makeEdgeLabel(triple.getRelationship()).multiplicity(Multiplicity.MANY2ONE).make();
}
[55] if (mgmt.getGraphIndex(triple.getLeftObjectProperty()) == null) {
final PropertyKey brandTypeKey = mgmt.makePropertyKey(triple.getLeftObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getLeftObjectProperty(), Vertex.class).addKey(brandTypeKey).unique().buildCompositeIndex();
}
[59] mgmt.commit();
Can anyone help identify what I'm doing wrong in this class? Whatever I'm doing is locking up tables and creating schema problems.
First, your original run() method is executed for each line in the text file. If the first thing you do is roll back, the vertices and edges you created in the previous iteration get blown away.
Second, on the next line you tried to setup the schema for each and every line, even though you only need to set the schema once for the graph right after you instantiate it. This was the source of your Schema exceptions, as index names must be unique (and do not necessarily need to be named after the properties they are indexing).
Third, on lines 78 and 83, you assumed you were creating the vertices anew for every line of the text file you processed. The uniqueness constraint applies there just as much as it did on lines 107 and 114, where you respected the uniqueness constraint while processing the relationships.
Finally, for a graph with order ~100 vertices and ~100 edges, it is a bit over the top to setup batching and an executor. I submitted a PR that fixes all of these issues and proposes two different ways to approach the data load at hand.