I am trying to create an edge between two vertices which are already part of OreintDB. My edge data is in a MySQL table.
Here is my oetl json.
{
"config": {
"log": "info"
},
"source": { "file": { "path": "/Users/RP/user_invited_data.csv" } },
"extractor": { "csv": {"columnsOnFirstLine": true, "columns":["user_id:string", "invited_by:string", "invited_date:datetime"] } },
"transformers" : [
{ "vertex": { "class": "User", "skipDuplicates": true} },
{ "edge": { "class": "INVITED", "direction" : "in",
"joinFieldName": "invited_by",
"lookup":"select expand(u) from (match {class: User, as: u} return u) where u.user_id = ?;",
"unresolvedLinkAction":"NOTHING",
"edgeFields": { "invited_date": "${input.invited_date}" },
"skipDuplicates": true
}
},
{ "field":
{ "fieldNames":
[ "invited_by", "invited_date"],
"operation": "remove"
}
}
],
"loader" : {
"orientdb": {
"dbURL": "remote:localhost/abcd_graph",
"dbUser": "root",
"dbPassword": "root",
"dbType": "graph",
"dbAutoCreate": false,
"batchCommit": 1000
}
}
}
When I run the above json, it is throwing ORecordDuplicatedException
for the User vertex. I have a unique index created on user_id
and have the skipDuplicates = true
. Any suggestions would be greatly appreciated.
UPDATE:
Gem of OrientDB, skipDuplicates
actually works when your log
level is not DEBUG
. But the problem is not solved yet. No errors now but the edges are not created. I will keep debugging it and see if I can fix it tonight.
UPDATE After debugging a bit deeper, I got an exception deeper at the storage level.
com.orientechnologies.orient.core.exception.ODatabaseException: Impossible to serialize invalid link #-1:-1
DB name="abcd_graph"
at com.orientechnologies.orient.core.serialization.serializer.record.binary.ORecordSerializerBinaryV0.writeOptimizedLink(ORecordSerializerBinaryV0.java:867)
at com.orientechnologies.orient.core.serialization.serializer.record.binary.ORecordSerializerBinaryV0.serializeValue(ORecordSerializerBinaryV0.java:754)
at com.orientechnologies.orient.core.serialization.serializer.record.binary.ORecordSerializerBinaryV0.serialize(ORecordSerializerBinaryV0.java:385)
at com.orientechnologies.orient.core.serialization.serializer.record.binary.ORecordSerializerBinary.toStream(ORecordSerializerBinary.java:99)
at com.orientechnologies.orient.core.record.impl.ODocument.toStream(ODocument.java:2381)
at com.orientechnologies.orient.core.record.impl.ODocument.toStream(ODocument.java:664)
at com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx.executeSaveRecord(ODatabaseDocumentTx.java:2183)
at com.orientechnologies.orient.core.tx.OTransactionNoTx.saveRecord(OTransactionNoTx.java:191)
at com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx.save(ODatabaseDocumentTx.java:2758)
at com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx.save(ODatabaseDocumentTx.java:102)
at com.orientechnologies.orient.core.record.impl.ODocument.save(ODocument.java:1805)
at com.orientechnologies.orient.core.record.impl.ODocument.save(ODocument.java:1801)
at com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx.addEdgeInternal(OrientGraphNoTx.java:242)
at com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx.addEdgeInternal(OrientGraphNoTx.java:137)
at com.tinkerpop.blueprints.impls.orient.OrientVertex.addEdge(OrientVertex.java:741)
at com.tinkerpop.blueprints.impls.orient.OrientVertex.addEdge(OrientVertex.java:688)
at com.orientechnologies.orient.etl.transformer.OEdgeTransformer.createEdge(OEdgeTransformer.java:203)
at com.orientechnologies.orient.etl.transformer.OEdgeTransformer.executeTransform(OEdgeTransformer.java:123)
at com.orientechnologies.orient.etl.transformer.OAbstractTransformer.transform(OAbstractTransformer.java:39)
at com.orientechnologies.orient.etl.OETLPipeline.execute(OETLPipeline.java:110)
at com.orientechnologies.orient.etl.OETLProcessor$OETLPipelineWorker.call(OETLProcessor.java:620)
at com.orientechnologies.orient.etl.OETLProcessor$OETLPipelineWorker.call(OETLProcessor.java:601)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
UPDATE I have changed the extractor from DB to CSV, so that it will be easier to reproduce.
Create Schema:
CREATE class User IF NOT EXISTS extends V;
create property User.user_id IF NOT EXISTS String;
create property User.name IF NOT EXISTS String;
create index user_idx on User(user_id) unique;
insert into User set user_id = '1000_USER1', name = 'Bob';
insert into User set user_id = '1001_USER2', name = 'Robert';
Sample CSV:
user_id, ivited_by, invited_date
1001_USER2, 1000_USER1,
After some struggle and reread the whole ETL documentation and some debugging, I figured it out.
We need to use MERG
transformer instead of VERTEX
. Merge transformer will lookup for a Vertex
instead of creating it.
Here is my json looks like
"transformers" : [
{ "merge": { "joinFieldName": "user_id", "lookup": "User.user_id" } },
{ "edge": { "class": "INVITED", "direction" : "out",
"joinFieldName": "invited_by",
"lookup": "SELECT expand(u) from (match {class: User, as: u} return u) where u.user_id = ?",
"unresolvedLinkAction":"NOTHING",
"edgeFields": { "invited_date": "${input.invited_date}" },
"skipDuplicates": true
}
},
{ "field":
{ "fieldNames":
[ "invited_by", "invited_date"],
"operation": "remove"
}
}
]
I still have one another problem, but I will take it as a separate thing and research about it. The problem is it is creating duplicate edges between the same two vertices
I will tackle it as a separate problem.
One thing I observed all the times with OrientDB is, the stuff is there, but it is hard to figure it out.