0

I have a data frame with two columns : id and value. I want to update the value based on another map.

 df.collect.foreach({
    df[value] = if (df[id] != 'unknown') mapper.value(df[id]) else df[value]
    })

Is this correct way of using ?

I tried :

import com.mapping.data.model.MappingUtils
import com.mapping.data.model.CountryInfo


val mappingPath = "s3://.../"    
val input = sc.textFile(mappingPath)

The input is list of jsons where each line is json which I am mapping to the POJO class CountryInfo using MappingUtils which takes care of JSON parsing and conversion:

val MappingsList = input.map(x=> {
                    val countryInfo = MappingUtils.getCountryInfoString(x);
                    (countryInfo.getItemId(), countryInfo)
                 }).collectAsMap

MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo] 


def showCountryInfo(x: Option[CountryInfo]) = x match {
      case Some(s) => s
   }


val events = sqlContext.sql( "select itemId  EventList")

val itemList =  events.map(row => {
    val itemId = row.getAs[String](1);
    val çountryInfo =  showTitleInfo(MappingsList.get(itemId));
    val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry()
    val language = countryInfo.getLanguage()

    Row(itemId, country, language)
      })

But I keep getting this error :

    org.apache.thrift.transport.TTransportException at
 org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) 
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) at 

org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:362) at
 org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:284) at

 org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:191) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_interpret(RemoteInterpreterService.java:220) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.interpret(RemoteInterpreterService.java:205) at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.interpret(RemoteInterpreter.java:211) at 

org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:207) at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:304) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 

I am using Spark 1.6

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
Swetha
  • 177
  • 1
  • 1
  • 14

1 Answers1

0

Your question is bit ambiguous.

Don’t collect large RDDs unnecessarily.

When a collect operation is issued on a RDD, the dataset is copied to the driver, i.e. the master node. A memory exception will be thrown if the dataset is too large to fit in memory; take or takeSample can be used to retrieve only a capped number of elements instead.

The way you are doing by collect method is not correct(if it is large DataFrame it may lead to OOM)..

1) To update any column or add new column you can use withColumn

DataFrame   withColumn(java.lang.String colName, Column col)
Returns a new DataFrame by adding a column or replacing the existing column that has the same name.

2) To check the condition based on another datastructure..

you can use when otherwise syntax like below

Apache Spark, add an "CASE WHEN ... ELSE ..." calculated column to an existing DataFrame example :

import org.apache.spark.sql.functions._
val sqlcont = new org.apache.spark.sql.SQLContext(sc)
val df1 = sqlcont.jsonRDD(sc.parallelize(Array(
      """{"year":2012, "make": "Tesla", "model": "S", "comment": "No Comment", "blank": ""}""",
      """{"year":1997, "make": "Ford", "model": "E350", "comment": "Get one", "blank": ""}""",
      """{"year":2015, "make": "Chevy", "model": "Volt", "comment": "", "blank": ""}"""
    )))

val makeSIfTesla = udf {(make: String) => 
  if(make == "Tesla") "S" else make
}
df1.withColumn("make", makeSIfTesla(df1("make"))).show

The above can be also achieved like this..

 val rdd = sc.parallelize(
      List( (2012,"Tesla","S"), (1997,"Ford","E350"), (2015,"Chevy","Volt"))
  )
  val sqlContext = new SQLContext(sc)

  // this is used to implicitly convert an RDD to a DataFrame.
  import sqlContext.implicits._

  val dataframe = rdd.toDF()

  dataframe.foreach(println)

 dataframe.map(row => {
    val row1 = row.getAs[String](1)
    val make = if (row1.toLowerCase == "tesla") "S" else row1
    Row(row(0),make,row(2))
  }).collect().foreach(println)

//[2012,S,S]
//[1997,Ford,E350]
//[2015,Chevy,Volt]
Community
  • 1
  • 1
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • It works with small queries but when I do complex I keep getting error – Swetha Sep 12 '16 at 15:51
  • you mean to say using `collect` you are getting error with large data ? – Ram Ghadiyaram Sep 12 '16 at 15:55
  • @Swetha: The question was about approach of what you are doing using `collect` and other ways I explained... but this thrift error definitely not corelated error for this. I think you are using zeppelin... Its due to some other thing.. – Ram Ghadiyaram Sep 12 '16 at 16:13