Questions tagged [scala-spark]

49 questions
2
votes
2 answers

How to move files from one S3 bucket directory to another directory in same bucket? Scala/Java

I want to move all files under a directory in my s3 bucket to another directory within the same bucket, using scala. Here is what I have: def copyFromInputFilesToArchive(spark: SparkSession) : Unit = { val sourcePath = new…
1
vote
1 answer

How to improve spark filter() performance on an array of struct?

I am working on a spark project and have some performance issue that I am struggling with, any help will be appreciated. I have a column Collection that is an array of struct: root |-- Collection: array (nullable = true) | |-- element: struct…
Yue Wang
  • 21
  • 4
1
vote
2 answers

Performance Implications of spark read

I am trying to understand if there is any difference in the following approaches, in terms of memory usage, optimisation, parallelism etc. Scenario: CSV files in an S3 bucket. 100 columns, more than 200 million rows in total Read Option 1: val df =…
Shiv Konar
  • 43
  • 5
1
vote
1 answer

MountVolume.SetUp failed for volume "spark-conf-volume-driver" : configmap "spark-drv-27c9b887c306cb9c-conf-map" not found

I am trying to run scala distributed code using spark-submit with cluster mode in minikube. 1.I used this dockerfile FROM datamechanics/spark:2.4.6-hadoop-3.1.0-java-8-scala-2.12-python-3.7-dm18 WORKDIR /opt/application RUN mkdir /tmp/data-pvc COPY…
1
vote
0 answers

Exporting file from Google Dataproc Job

I'm trying to export a png file using google bucket path but when I execute the job it tells me java.io.FileNotFoundException: gs://...//test.png I used the same code in local mode and it works. Here it is: val test = LinePlot.series(data, "WSSSE…
1
vote
1 answer

Performance degraded after upgrading from spark-cassandra-connector 2.4.0 to 3.1.0

Context: Working on a message processing application which processes millions of messages every day. The application is built using scala, spark, and uses Kafka, Cassandra DB. Multiple DB queries are executed while processing each message. The…
1
vote
2 answers

Get the column names of the max and second value for each record in a scala dataframe

I have a dataframe with a lot of columns, but for this example, we can use this one: `val dfIn = sqlContext.createDataFrame(Seq(("r0", 0, 2, 3, "a"),("r1", 1, 0, 0, "a"),("r2", 0, 2, 2, "a"))).toDF("prev_column", "c0", "c1", "c2",…
isaga
  • 11
  • 4
1
vote
2 answers

Different behaviour of same query in Spark 2.3 vs Spark 3.2

I am running a simple query in two versions of spark, 2.3 & 3.2. The code is as below spark-shell --master yarn --deploy-mode client val df1 = sc.parallelize(List((1,2,3,4,5),(1,2,3,4,5))).toDF("id","col2","col3","col4", "col5") val op_cols =…
ASR
  • 53
  • 6
1
vote
1 answer

spark-submit error loading class with fatjar on macOS

I am trying to run a simple hello world spark application This is my code package com.sd.proj.executables import org.apache.spark.sql.functions.lit import org.apache.spark.sql.{DataFrame, SparkSession} class SparkConn { def…
dsam05
  • 31
  • 2
1
vote
2 answers

How to set Spark Config in an AWS Glue job, using Scala Spark?

When running my job, I am getting the following exception: Exception in User Class: org.apache.spark.SparkException : Job aborted due to stage failure: Task 32 in stage 2.0 failed 4 times, most recent failure: Lost task 32.3 in stage 2.0 (TID 50)…
jamesbascle
  • 854
  • 1
  • 10
  • 17
1
vote
0 answers

spark GroupBy throws StateSchemaNotCompatible exception with different "Existing key schema"

I am reading and writing events from EventHub in spark after trying to aggregated based on few keys like this: val df1 = df0 .groupBy( colKey, colTimestamp ) .agg( collect_list( struct( …
1
vote
2 answers

Spark merge two columns that are arrays of different structs with overlapping field

I have a question I was unable to solve when working with Scala Spark (or PySpark). How can we merge two fields that are arrays of structs of different fields. For example, if I have schema like so: df.printSchema() root |-- arrayOne: array…
1
vote
1 answer

CSV Coma Delimiter Split in Spark RDD but NOT to split coma with in double quotes

I have a CSV file with data as below id,name,comp_name 1,raj,"rajeswari,motors" 2,shiva,amber kings my requirement is to read this file to spark RDD, then do map split with coma delimiter. but giving code this splits all comas val splitdata =…
Roy John
  • 11
  • 2
1
vote
1 answer

Which method is more memory efficient createOrReplaceView or saveAsTable

I've a dataframe from hive table I'm doing some changes to it, then while saving it again in hive as a new table which method should I use ? Assume this dataframe has 70 million record, I want to make saving process memory & time efficient. For eg.…
0
votes
1 answer

Scala - Create Dataframe with only 1 row from a List using for comprehension

For some weird reasons I need to get the column names of a dataframe and insert it as the first row(I cannot just import without header). I tried using for comprehension to create a dataframe that only has 1 row and 30 columns(there are 30 headers)…
1
2 3 4