0

EDIT: Tried to do what was suggested in the comments with .toDF got this error:

                                 ^
neo: org.neo4j.spark.Neo4j = org.neo4j.spark.Neo4j@5dfb65d5
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@333e01c6
import sqlContext.implicits._
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = Neo4jRDD partitions Partitions(1,9223372036854775807,9223372036854775807,None) call spatial.withinDistance('geom', {lat:35.8954016,lon:41.5505458}, 500) yield node, distance WITH node, distance match (node:POINT) WHERE node.toDateFormatLong < 20170213 return node as n using Map()
<console>:48: error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
       val df = rdd.toDF()

I am running this simple scala code:

import org.neo4j.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf



val conf = new SparkConf.setMaster("local").setAppName("neo4jspark")
val sc = new SparkContext(conf)
val neo = Neo4j(sc)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

//val rdd = neo.cypher("call spatial.withinDistance('geom', {lat:35.8954016,lon:41.5505458}, 500) yield node, distance WITH node, distance match (node:POINT) WHERE node.toDateFormatLong < 20170213 return node as n").loadRowRdd

val df = neo.cypher("call spatial.withinDistance('geom', {lat:35.8954016,lon:41.5505458}, 500) yield node, distance WITH node, distance match (node:POINT) WHERE node.toDateFormatLong < 20170213 return node as n").loadDataFrame

It gives a few errors, the conf issue is an error but seems to work when i loadRDD. But here I get the error too, still gives me a count of items in the call though then I get the serialization errors. Not sure if there are steps or things I am missing running off this sample here:

https://blog.knoldus.com/2016/10/05/neo4j-with-scala-awesome-experience-with-spark/

Loading neo4jspark.scala...
import org.neo4j.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
<console>:38: error: not found: value SparkConf
       val conf = new SparkConf.setMaster("local").setAppName("neo4jspark")
                      ^
<console>:38: error: not found: value conf
       val sc = new SparkContext(conf)
                                 ^
neo: org.neo4j.spark.Neo4j = org.neo4j.spark.Neo4j@5dfb65d5
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = Neo4jRDD partitions Partitions(1,9223372036854775807,9223372036854775807,None) MATCH (p:POINT) RETURN p using Map()
res0: Long = 53118
17/08/25 14:31:15 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.io.NotSerializableException: org.neo4j.driver.internal.InternalNode
Serialization stack:
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<5>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<5>])
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.Row;, size 5)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:383)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/08/25 14:31:15 ERROR TaskSetManager: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: org.neo4j.driver.internal.InternalNode
Serialization stack:
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<5>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<5>])
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.Row;, size 5); not retrying
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: org.neo4j.driver.internal.InternalNode
Serialization stack:
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<5>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<5>])
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.Row;, size 5)
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
  ... 79 elided
Codejoy
  • 3,722
  • 13
  • 59
  • 99
  • Looks like your forgot to import sql implicits val sqlCtx = new SQLContext(sc) import sqlCtx.implicits._ – sgireddy Aug 26 '17 at 06:31
  • updated code with what I have got the same error, maybe I implemented it wrong? – Codejoy Aug 28 '17 at 02:37
  • I don't think this is a SparkConf issue unless neo4j is expecting specific configuration for its DataFrame API. could you try the following (try extracting df from RDD)? Just to identify the root cause if its Neo4J or Spark. Also could you try a different Neo4j query to rule out the query is not at fault? val rdd = neo.cypher("call spatial.withinDistance('geom', {lat:35.8954016,lon:41.5505458}, 500) yield node, distance WITH node, distance match (node:POINT) WHERE node.toDateFormatLong < 20170213 return node as n").loadRowRdd val df = rdd.toDF – sgireddy Aug 28 '17 at 03:30
  • i tried with the toDF but on the big query, will try another query tomorrow (though the entered query did work in the neo4j browser) – Codejoy Aug 28 '17 at 06:39
  • ran it with a much simpler query and same error: `val df = neo.cypher("MATCH (p:POINT) RETURN p").loadDataFrame` – Codejoy Aug 28 '17 at 22:49
  • I found out the issue is the query is indeed to complex. https://github.com/neo4j-contrib/neo4j-spark-connector/issues/40 – Codejoy Aug 29 '17 at 16:20

0 Answers0