1

I am doing a Spark project. In the following code, I have a string which I use to collect my results in order to write to a file later on (I know this is not the correct way, I am just checking what is inside a Tuple3 returned by a method). The string gets truncated after a for each loop. Here is the relevant part of my code:

val newLine = sys.props("line.separator") // also tried "\n". I am using OS X.

var str = s"*** ${newLine}"

for (tuple3 <- ArrayOfTuple3s) {
  for (list <- tuple3._3) {
    for (strItem <- list) {
      str += s"${strItem}, "
    }
    str += s"${newLine}"
  }
  str += s"${newLine}"
  println(tempStr)
}

print("str=" + str)

The first println method call prints the correct value of the string (the concatenated result), but when the loop ends, the value of str is *** (the same value assigned to it before the first loop).

Edit: I replaced the str immutable String object with a StringBuilder, but no change in the result:

val newLine: String = sys.props("line.separator")

var str1: StringBuilder = new StringBuilder(15000)

for (tuple3 <- ArrayOfTuple3s) {
  for (list <- tuple3._3) {
    for (str <- list) {
      str1.append(s"${str}, ")
    }
    str1.append(s"${newLine}")
  }
  str1.append(s"${newLine}")
  println(str1.toString())
}

print("resulting str1=" + str1.toString())

Edit 2: I mapped the RDD to take the Tuple3's third field directly. This field itself is an RDD of Arrays of Lists. I changed the code accordingly, but I am still getting the same result (the resulting string is empty, although inside the for loop it is not).

val rddOfArraysOfLists = getArrayOfTuple3s(mainRdd).map(_._3)

for (arrayOfLists <- rddOfArraysOfLists) {
  for (list <- arrayOfLists) {
    for (field <- list) {
      str1.append(s"${field}, ")
    }
    str1.append(" -- ")
  }
  str1.append(s"${newLine}")
  println(str1.toString())
}

Edit 4: I think the problem is not with strings at all. There seems to be a problem with all types of variables.

var count = 0

for (arrayOfLists <- myArray) {
  count = arrayOfLists.last(3).toInt
  println(s"count=$count")
}

println(s"count=$count")

The value is non-zero inside the loop, but it is 0 outside the loop. Any idea?

Edit 5: I cannot publish the whole code (due to confidentiality restrictions), but here is the major part of it. If it matters, I am running Spark on my local machine in Intellij Idea (for debugging).

System.setProperty("spark.cores.max", "8")
System.setProperty("spark.executor.memory", "15g")    
val sc = new SparkContext("local", getClass.getName)            
val samReg = sc.objectFile[Sample](sampleLocation, 200).distinct

val samples = samReg.filter(f => f.uuid == "dce03545e8034242").sortBy(_.time).cache()

val top3Samples = samples.take(3)
for (sample <- top3Samples) {
  print("sample: ")
  println(s"uuid=${sample.uuid}, time=${sample.time}, model=${sample.model}")
}

val firstTimeStamp = samples.first.time
val targetTime = firstTimeStamp + 2592000 // + 1 month in seconds (samples during the first month)

val rddOfArrayOfSamples = getCountsRdd(samples.filter(_.time <= targetTime)).map(_._1).cache()
// Due to confidentiality matters, I cannot reveal the code, 
// but here is a description:
// I have an array of samples. Each sample has a few String fields 
// and is represented by a List[String]
// The above RDD is of the type RDD[Array[List[String]]]. 
// It contains only a single array of samples
// (because I passed a filtered set of samples to the function), 
// but it may contain more.
// The fourth field of each sample (list) is an increasing number (count)

println(s"number of arrays in the RDD: ${rddOfArrayOfSamples.count()}")

var maxCount = 0
for (arrayOfLists <- rddOfArrayOfSamples) {
  println(s"Last item of the array (a list)=${arrayOfLists.last}")
  maxCount = arrayOfLists.last(3).toInt
  println(s"maxCount=${maxCount}")
}
println(s"maxCount=${maxCount}")

The output:

sample: uuid=dce03545e8034242, time=1360037324, model=Nexus 4

sample: uuid=dce03545e8034242, time=1360037424, model=Nexus 4

sample: uuid=dce03545e8034242, time=1360037544, model=Nexus 4

number of arrays in the RDD: 1

Last item of the array (a list)=List(dce03545e8034242, Nexus 4, 1362628767, 32, 2089, 0.97, 0.15999999999999992, 0)

maxCount=32

maxCount=0

Javad
  • 5,755
  • 4
  • 41
  • 51
  • The first cannot be the code you actually run (as it mentions tempStr in the println). Can you post a complete example? Most likely you have two things named str1. – The Archetypal Paul Jan 16 '15 at 07:53
  • Also, I think your loops could be replaced with `ArrayOfTuples..map(t => t._3.mkString("", ", ", newLine)).mkString("", newLine, "")` – The Archetypal Paul Jan 16 '15 at 08:11
  • Thank you @Paul. The naming is correct in my main code, I did a partial rename when I was creating this post (to make the name shorter and less distractive), but in my main code, all of the names are identical (tempStr). – Javad Jan 16 '15 at 22:27
  • Regarding the mkstring, I think I cannot use it, because my RDD is not flat. See the code in my last edit. The third object of the tuple itself is an RDD **(list) of arrays of lists**. – Javad Jan 16 '15 at 22:53
  • @Paul, please see my last edit. – Javad Jan 17 '15 at 04:12
  • There's something odd going on but it's not easy to workout because of the lack of info about types. Can you please post a *complete* example? – The Archetypal Paul Jan 17 '15 at 08:29
  • @Paul please see the new edit. – Javad Jan 19 '15 at 03:26
  • What happens if you println just the variable (i.e. without string interpolation)? – The Archetypal Paul Jan 19 '15 at 08:32
  • nothing, the same result (zero value outside the loop). – Javad Jan 19 '15 at 15:09
  • It would really help if you could produce a runnable example. I understand the confidentiality point, but if we can't run something to reproduce your problem it's really difficult to make progress. – The Archetypal Paul Jan 19 '15 at 15:56
  • 1
    So I don't really know the architecture of Spark, but what I think is happening is: in general the `map` (that the `for` translates into) can execute in multiple nodes. So there can't really be one `maxCount` that's shared by all executions of the body of the for/map. So the `maxCount` you're modifying isn't the same `maxCount` you're declaring. Imagine if there were many lines in your rddOfArrayOfSamples, and the map/for was distributed across nodes, and for every row, a single/shared maxCount was updated - it would be fairly random which one got updated. – The Archetypal Paul Jan 19 '15 at 16:02
  • That seems to be the only plausible explanation. If a for-each loop translates to a map, then that might be true, because Spark's map happens on the worker nodes, not on the master node. Thank you for your comment! – Javad Jan 19 '15 at 19:12
  • I added it as an answer, with a bit more explanation – The Archetypal Paul Jan 19 '15 at 19:13

2 Answers2

1

Uprating my explanation in a comment to an answer:

See this answer to a somewhat-related question:

Not to get into too many details, but when you run different transformations on a RDD (map, flatMap, filter and others), your transformation code (closure) is:

serialized on the driver node,
shipped to the appropriate nodes in the cluster,
deserialized,
and finally executed on the nodes

The for in your code is just syntactic sugar for a map.

Because of this, the maxCount that each execution updates is not the same maxCount in your invoking program. That one never changes.

The lesson here is don't use closures (blocks) that update vars outside the block

Community
  • 1
  • 1
The Archetypal Paul
  • 41,321
  • 20
  • 104
  • 134
  • I am wondering why this is happening even when I am running Spark on a single node (my local machine). Probably even when you run Spark on a single machine, there are two distinct threads/processes: a master process and a worker process, and probably they use different segments of the memory and they don't share memory? – Javad Jan 19 '15 at 19:20
  • By the way, then what approach do you suggest for handling this problem? – Javad Jan 19 '15 at 19:23
  • Of course one approach (or more accurately, a hack) that I used was to write the value I am fetching into a file, inside the closure/block. – Javad Jan 19 '15 at 19:25
  • 1
    If your use case is to calculate the max, `map` the RDD to just the counts, then use `reduce`. Or call `.max()`, passing in an appropriate Ordering. – The Archetypal Paul Jan 19 '15 at 19:57
  • Wonderful! I also found this useful: http://spark.apache.org/docs/1.2.0/programming-guide.html#shared-variables – Javad Jan 19 '15 at 21:25
0

Since you didn't post a complete example I had to arbitrate some portion of the code.

For your 4th edit I made:

val myArray = Array(
  List(List(0, 0, 0, 0), List(0, 0, 0, 0), List(0, 0, 0, 0)),
  List(List(1, 1, 1, 1), List(1, 1, 1, 1), List(1, 1, 1, 1)),
  List(List(2, 2, 2, 2), List(2, 2, 2, 2), List(2, 2, 2, 2))
)

Running in the REPL:

var count = 0

for (arrayOfLists <- myArray) {
  count = arrayOfLists.last(3).toInt
  println(s"count=$count")
}

println(s"count=$count")

I get:

scala> for (arrayOfLists <- myArray) {
     |   count = arrayOfLists.last(3).toInt
     |   println(s"count=$count")
     | }
count=0
count=1
count=2

scala> println(s"count=$count")
count=2

The value is non-zero inside the loop and non-zero outside the loop.

If you please post a complete example, maybe we can help you more.