14

I am trying to load a CSV file into a JanusGraph. As I understand it, I need to create my graph and the schema, then use the BulkLoaderVertexProgram with my own custom groovy script to parse a csv file. Doing that, it seems to work as I can see the vertices, but the edges are not created.

My configuration seems to be pretty much the same as all the examples I can find, but there must be something I'm missing.

Is it possible to bulk load edges from a CSV file?


Here is my setup:

I'm starting cassandra with the default bin/janusgraph.sh script

My gremlin commands:

gremlin> :load data/defineNCBIOSchema.groovy
==>true
gremlin> graph = JanusGraphFactory.open('conf/gremlin-server/socket-janusgraph-apr-test.properties')
==>standardjanusgraph[cassandrathrift:[127.0.0.1]]
gremlin> defineNCBIOSchema(graph)
==>null
gremlin> graph.close()
==>null

gremlin> graph = GraphFactory.open('conf/hadoop-graph/apr-test-hadoop-script.properties')
==>hadoopgraph[scriptinputformat->graphsonoutputformat]
gremlin> blvp = BulkLoaderVertexProgram.build().bulkLoader(OneTimeBulkLoader).writeGraph('conf/gremlin-server/socket-janusgraph-apr-test.properties').create(graph)
==>BulkLoaderVertexProgram[bulkLoader=IncrementalBulkLoader, vertexIdProperty=bulkLoader.vertex.id, userSuppliedIds=false, keepOriginalIds=true, batchSize=0]
gremlin> graph.compute(SparkGraphComputer).workers(1).program(blvp).submit().get()
==>result[hadoopgraph[scriptinputformat->graphsonoutputformat],memory[size:0]]
gremlin> graph.close()
==>null

gremlin> graph = GraphFactory.open('conf/hadoop-graph/apr-test-hadoop-load.properties')
==>hadoopgraph[cassandrainputformat->gryooutputformat]
gremlin> g = graph.traversal().withComputer(SparkGraphComputer)
==>graphtraversalsource[hadoopgraph[cassandrainputformat->gryooutputformat], sparkgraphcomputer]
gremlin> g.E() <--- returns nothing

My JanusGraph: (conf/gremlin-server/socket-janusgraph-apr-test.properties)

gremlin.graph=org.janusgraph.core.JanusGraphFactory
storage.backend=cassandrathrift
storage.hostname=127.0.0.1
cache.db-cache = true
cache.db-cache-clean-wait = 20
cache.db-cache-time = 180000
cache.db-cache-size = 0.25
index.search.backend=elasticsearch
index.search.directory=/tmp/searchindex
index.search.elasticsearch.client-only=false
index.search.elasticsearch.local-mode=true
index.search.hostname=127.0.0.1

My graph for the bulkLoader: (conf/hadoop-graph/apr-test-hadoop-script.properties)

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.jarsInDistributedCache=true

gremlin.hadoop.inputLocation=data/apr-test-doc.csv
gremlin.hadoop.scriptInputFormat.script=data/apr-test-CSVInputScript.groovy
gremlin.hadoop.outputLocation=output

query.fast-property=false

spark.master=local[*]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer

The read graph: (conf/hadoop-graph/apr-test-hadoop-load.properties)

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.janusgraph.hadoop.formats.cassandra.CassandraInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat

gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
janusgraphmr.ioformat.conf.storage.backend=cassandra
janusgraphmr.ioformat.conf.storage.hostname=localhost
janusgraphmr.ioformat.conf.storage.port=9160
janusgraphmr.ioformat.conf.storage.cassandra.keyspace=janusgraph
cassandra.thrift.framed.size_mb=60
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
spark.master=local[*]
spark.serializer=org.apache.spark.serializer.KryoSerializer

My groovy script

class Globals {
    static String[] h = [];
    static int lineNumber = 0;
}

def parse(line, factory) {
    def vertexType = 'Disease'
    def edgeLabel = 'parent'
    def parentsIndex = 2;
    
    Globals.lineNumber++

    // columns ignoring quoted ,
    def c = line.split(/,(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)/)
    
    //  if first column is Class ID ignore the line, it is the header line
    if (c[0] == /ClassID/) {
        Globals.h = c
        return null
    }
    
    def v1 = graph.addVertex(T.id, c[0], T.label, vertexType)
    
    for (i = 0; i < c.length; ++i) {
        if (i != parentsIndex) { // Ignore parent
            def f = removeInvalidChar(c[i])
            if (f?.trim()) {
                v1.property(Globals.h[i], f)
            }
        }
    }
    
    def parents = []    
    if (c.length > parentsIndex) {
        parents = c[parentsIndex].split(/\|/)
    }
    
    for (i = 0; i < parents.size(); ++i) {
        def v2 = graph.addVertex(T.id, parents[i], T.label, vertexType)
        v1.addInEdge(edgeLabel, v2)             
    }

    return v1
}

def removeInvalidChar(col) {

    def f = col.replaceAll(/^\"|\"$/, "") // Remove quotes
    f = f.replaceAll(/\{/, /(/) // Remove {
    f = f.replaceAll(/\}/, /)/) // Remove }
    
    if (f == /label/) {
        f = /label2/
    }

    return f
}

The schema

def defineNCBIOSchema(graph) {
    
    mgmt = graph.openManagement()
    
    // vertex labels
    vertexLabel = mgmt.makeVertexLabel('Disease').make()
    
    // edge labels
    parent = mgmt.makeEdgeLabel('parent').multiplicity(MULTI).make()
    
    // vertex and edge properties
    blid = mgmt.makePropertyKey('bulkLoader.vertex.id').dataType(String.class).make()
    classID = mgmt.makePropertyKey('ClassID').dataType(String.class).cardinality(Cardinality.SINGLE).make()
    preferedLabel = mgmt.makePropertyKey('PreferredLabel').dataType(String.class).cardinality(Cardinality.SINGLE).make()
    
    // global indices
    mgmt.buildIndex('ClassIDIndex', Vertex.class).addKey(classID).unique()

    mgmt.commit()
}

The CSV

ClassID,PreferredLabel,Parents
Vertex3,Prefered Label 3,
Vertex2,Prefered Label 2,Vertex3
Vertex1,Prefered Label 1,Vertex2|Vertex3
mazaneicha
  • 8,794
  • 4
  • 33
  • 52
  • 1
    You mention the config is 'pretty much the same as the example'. Could you perhaps try to run an actual example and indicate if the problem also occurs in that situation? Then we know whether the problem is in the logic, or the context. – Dennis Jaheruddin Nov 21 '21 at 10:49

0 Answers0