I am working on using spark sql context data frames to parallelize the operations. Briefly, I read in a CSV into a data frame df then call df.foreachPartition(testFunc) to do a get-or-create operation on the graph (this is in testFunc).
I am not sure if the cluster and session need to be defined in the function that is passed to the data frame (testFunc) in this instance, as I’m having a bit of trouble locating an example of a graph traversal being done this way.
I can run the graph traversal manually just fine. I can also print out the values in the rows when I call df.foreachPartition(testFunc) if I omit the session.execute_graph("g.V()....") portion.
On top of that, if I just explicitly call testFunc and pass a list of rows (say, testFunc(df.collect())) it will run successfully and get or create the appropriate vertices in the graph.
So I assume it has something to do with the way the graph traversal is being called within the function as it’s passed to the different threads ?
test_sc_CSV.py:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from dse.cluster import Cluster, GraphExecutionProfile, EXEC_PROFILE_GRAPH_DEFAULT, EXEC_PROFILE_GRAPH_SYSTEM_DEFAULT
from dse.cluster import GraphOptions
conf = SparkConf().set("spark.cassandra.connection.host", "*.*.*.*")
#set("spark.cassandra.connection.native.port","9042")
#set('spark.cores.max', 2).set('spark.executor.memory', '8g')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
# Loading data
print '\n--- Loading data...'
df = (sqlContext.read.format("com.databricks.spark.csv").
options(header="true").
load("file:/home/centos/datacube/notebooks/animals.csv"))
def testFunc(rows):
graph_name = 'toy_data'
ep = GraphExecutionProfile(graph_options=GraphOptions(graph_name=graph_name))
cluster = Cluster(['*.*.*.*'],execution_profiles={EXEC_PROFILE_GRAPH_DEFAULT: ep})
session = cluster.connect()
for row in rows:
values = [int(row.app_id),int(row.id),str(row.name),str(row.contact)]
print values
session.execute_graph("g.inject(1).coalesce(V().has('name','n_val',n_val),\
addV('name').property('n_val',n_val))",{'n_val':row.name})
df.foreachPartition(testFunc)
# df.rdd.repartition(2).foreachPartition(testFunc)
error logs:
--- Loading data...
[1001, 1, 'dog', 'monkey']
ERROR 2016-12-08 14:35:48,130 org.apache.spark.executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 759, in func
File "/home/centos/datacube/spark_graph/test_sc_CSV.py", line 31, in testFunc
addV('name').property('n_val',n_val))",{'n_val':row.name})
File "/home/centos/anaconda/lib/python2.7/site-packages/dse/cluster.py", line 177, in execute_graph
return self.execute_graph_async(query, parameters, trace, execution_profile).result()
File "/home/centos/anaconda/lib/python2.7/site-packages/cassandra/cluster.py", line 3781, in result
raise self._final_exception
InvalidRequest: Error from server: code=2200 [Invalid query] message="No such property: g for class: Script10549"
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) ~[spark-core_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) ~[spark-core_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) ~[spark-core_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) ~[spark-core_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) ~[spark-core_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) ~[spark-core_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) ~[spark-core_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.scheduler.Task.run(Task.scala:89) ~[spark-core_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) ~[spark-core_2.10-1.6.2.2.jar:1.6.2.2]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
WARN 2016-12-08 14:35:48,142 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 759, in func
File "/home/centos/datacube/spark_graph/test_sc_CSV.py", line 31, in testFunc
addV('name').property('n_val',n_val))",{'n_val':row.name})
File "/home/centos/anaconda/lib/python2.7/site-packages/dse/cluster.py", line 177, in execute_graph
return self.execute_graph_async(query, parameters, trace, execution_profile).result()
File "/home/centos/anaconda/lib/python2.7/site-packages/cassandra/cluster.py", line 3781, in result
raise self._final_exception
InvalidRequest: Error from server: code=2200 [Invalid query] message="No such property: g for class: Script10549"
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
ERROR 2016-12-08 14:35:48,144 org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
Traceback (most recent call last):
File "/home/centos/datacube/spark_graph/test_sc_CSV.py", line 33, in <module>
df.foreachPartition(testFunc)
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 370, in foreachPartition
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 764, in foreachPartition
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1004, in count
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 995, in sum
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 869, in fold
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 771, in collect
File "/usr/share/dse/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
File "/usr/share/dse/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 759, in func
File "/home/centos/datacube/spark_graph/test_sc_CSV.py", line 31, in testFunc
addV('name').property('n_val',n_val))",{'n_val':row.name})
File "/home/centos/anaconda/lib/python2.7/site-packages/dse/cluster.py", line 177, in execute_graph
return self.execute_graph_async(query, parameters, trace, execution_profile).result()
File "/home/centos/anaconda/lib/python2.7/site-packages/cassandra/cluster.py", line 3781, in result
raise self._final_exception
InvalidRequest: Error from server: code=2200 [Invalid query] message="No such property: g for class: Script10549"
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1838)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1851)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1864)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1935)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 759, in func
File "/home/centos/datacube/spark_graph/test_sc_CSV.py", line 31, in testFunc
addV('name').property('n_val',n_val))",{'n_val':row.name})
File "/home/centos/anaconda/lib/python2.7/site-packages/dse/cluster.py", line 177, in execute_graph
return self.execute_graph_async(query, parameters, trace, execution_profile).result()
File "/home/centos/anaconda/lib/python2.7/site-packages/cassandra/cluster.py", line 3781, in result
raise self._final_exception
InvalidRequest: Error from server: code=2200 [Invalid query] message="No such property: g for class: Script10549"
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
WARN 2016-12-08 14:35:48,346 org.apache.spark.rpc.netty.Dispatcher: Message RemoteProcessDisconnected(ip-172-31-13-58.ec2.internal:43418) dropped.
java.lang.IllegalStateException: RpcEnv already stopped.
at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:159) [spark-core_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.rpc.netty.Dispatcher.postToAll(Dispatcher.scala:109) [spark-core_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.rpc.netty.NettyRpcHandler.connectionTerminated(NettyRpcEnv.scala:622) [spark-core_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.network.server.TransportRequestHandler.channelUnregistered(TransportRequestHandler.java:94) [spark-network-common_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:89) [spark-network-common_2.10-1.6.2.2.jar:1.6.2.2]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:731) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:674) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:329) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.confirmShutdown(SingleThreadEventExecutor.java:628) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:362) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
WARN 2016-12-08 14:35:48,347 org.apache.spark.rpc.netty.Dispatcher: Message RemoteProcessDisconnected(ip-172-31-13-58.ec2.internal:43418) dropped.
java.lang.IllegalStateException: RpcEnv already stopped.
at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:159) [spark-core_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.rpc.netty.Dispatcher.postToAll(Dispatcher.scala:109) [spark-core_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.rpc.netty.NettyRpcHandler.connectionTerminated(NettyRpcEnv.scala:622) [spark-core_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.network.server.TransportRequestHandler.channelUnregistered(TransportRequestHandler.java:94) [spark-network-common_2.10-1.6.2.2.jar:1.6.2.2]
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:89) [spark-network-common_2.10-1.6.2.2.jar:1.6.2.2]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:731) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:674) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:329) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.confirmShutdown(SingleThreadEventExecutor.java:628) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:362) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) [netty-all-4.0.34.Final.jar:4.0.34.Final]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Graph schema:
schema.propertyKey('name').Text().single().create()
schema.propertyKey('person_id').Int().single().create()
schema.propertyKey('app_id').Int().single().create()
schema.propertyKey('score').Int().single().create()
schema.propertyKey('date').Int().single().create()
schema.vertexLabel('name').properties('name').create()
schema.vertexLabel('name').index('personByName').materialized().by('name').add()
schema.vertexLabel('id').properties('person_id').create()
schema.vertexLabel('id').index('IdById').materialized().by('person_id').add()
schema.vertexLabel('application').properties('app_id').create()
schema.vertexLabel('application').index('appByName').materialized().by('app_id').add()
schema.edgeLabel('has_name').multiple().create()
schema.edgeLabel('has_name').connection('id', 'name').add()
schema.edgeLabel('has_contact').multiple().create()
schema.edgeLabel('has_contact').connection('application', 'id').add()
schema.edgeLabel('is_match').multiple().properties('score','date').create()
schema.edgeLabel('is_match').connection('id', 'id').add()
schema.edgeLabel('has_application').multiple().create()
schema.edgeLabel('has_application').connection('id', 'application').add()
schema.propertyKey('hash').Int().single().create()
schema.vertexLabel('bin').properties('hash').create()
schema.vertexLabel('bin').index('binByHash').materialized().by('hash').add()
schema.edgeLabel('has_hash').multiple().create()
schema.edgeLabel('has_hash').connection('name', 'bin').add()
CSV:
,app_id,contact,id,name
0,1001,monkey,1,dog
1,1002,monkey,2,cat
2,1003,mad gorilla,3,mad cat
3,1004,good gorilla,4,good dog
4,1005,bad monkey,5,bad dog
5,1006,dog,6,chicken
6,1007,gorilla,7,chicken
7,1008,hawk,8,cat
8,1009,monkey,9,dog
9,1010,monkey,10,dog
10,1011,monkey,11,chicken
11,1012,cat,12,cat
12,1013,monkey,13,dog
13,1014,dog,14,dog
14,1015,hawk,15,cat