I have a data frame with two columns : id
and value
. I want to update the value based on another map.
df.collect.foreach({
df[value] = if (df[id] != 'unknown') mapper.value(df[id]) else df[value]
})
Is this correct way of using ?
I tried :
import com.mapping.data.model.MappingUtils
import com.mapping.data.model.CountryInfo
val mappingPath = "s3://.../"
val input = sc.textFile(mappingPath)
The input is list of jsons where each line is json which I am mapping to the POJO class CountryInfo using MappingUtils which takes care of JSON parsing and conversion:
val MappingsList = input.map(x=> {
val countryInfo = MappingUtils.getCountryInfoString(x);
(countryInfo.getItemId(), countryInfo)
}).collectAsMap
MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo]
def showCountryInfo(x: Option[CountryInfo]) = x match {
case Some(s) => s
}
val events = sqlContext.sql( "select itemId EventList")
val itemList = events.map(row => {
val itemId = row.getAs[String](1);
val çountryInfo = showTitleInfo(MappingsList.get(itemId));
val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry()
val language = countryInfo.getLanguage()
Row(itemId, country, language)
})
But I keep getting this error :
org.apache.thrift.transport.TTransportException at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) at
org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:362) at
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:284) at
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:191) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_interpret(RemoteInterpreterService.java:220) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.interpret(RemoteInterpreterService.java:205) at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.interpret(RemoteInterpreter.java:211) at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:207) at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:304) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
I am using Spark 1.6