29

I am migrating from Impala to SparkSQL, using the following code to read a table:

my_data = sqlContext.read.parquet('hdfs://my_hdfs_path/my_db.db/my_table')

How do I invoke SparkSQL above, so it can return something like:

'select col_A, col_B from my_table'
mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
Edamame
  • 23,718
  • 73
  • 186
  • 320

4 Answers4

48

After creating a Dataframe from parquet file, you have to register it as a temp table to run sql queries on it.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.parquet("src/main/resources/peopleTwo.parquet")

df.printSchema

// after registering as a table you will be able to run sql queries
df.registerTempTable("people")

sqlContext.sql("select * from people").collect.foreach(println)
bob
  • 4,595
  • 2
  • 25
  • 35
  • Is collect necessary (or a good idea)? Because if the data is big, we don't want to collect everything to the driver? – Edamame Dec 21 '16 at 02:57
  • 1
    its just an example how sql can be used. It depends on you how you wanna use it. you may change the query or do .take() also to get the required data on the driver – bob Dec 21 '16 at 04:40
  • btw `Symbol SQLContext is deprecated. Use SparkSession.builder instead` – THIS USER NEEDS HELP Aug 05 '20 at 17:56
38

With plain SQL

JSON, ORC, Parquet, and CSV files can be queried without creating the table on Spark DataFrame.

//This Spark 2.x code you can do the same on sqlContext as well
val spark: SparkSession = SparkSession.builder.master("set_the_master").getOrCreate

spark.sql("select col_A, col_B from parquet.`hdfs://my_hdfs_path/my_db.db/my_table`")
   .show()
mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
  • I do see this error "File not found. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved." How do I resolve this? – passionate Nov 04 '17 at 00:05
  • Doesn't help if I do spark.sqlContext().setConf("spark.sql.parquet.cacheMetadata", "false"); – passionate Nov 04 '17 at 00:06
  • 3
    Works! Just replace `hdfs://my_hdfs_path/my_db.db/my_table` with you file path. :) – Cherry Dec 29 '17 at 12:59
  • This is awesome, could you point to some further documentation on this type of behavior? – MichaelChirico Jan 15 '18 at 10:13
  • 1
    I found it from spark code base in github. Not sure about documentation on it. – mrsrinivas Aug 15 '18 at 01:56
  • How do you set the schema with this approach? – Union find Jun 18 '19 at 17:13
  • Not possible to set schema explicitly. If schema not found then all columns are `String` type during dataframe creation. – mrsrinivas Jun 19 '19 at 00:58
  • Can we query a TSV file directly using Spark SQL? – Benjamin Du Jul 02 '19 at 17:56
  • Thanks so much for posting this example. I notice this works also with `csv` but I can't find documentation on the functionality anywhere. For options like csv, there are a couple parameters like headers, delimiters, etc, and I don't know how to configure them or change the defaults. – aaronsteers Nov 19 '19 at 19:49
  • This is great, thanks! Any idea how this can be adapted for parquet files split across sub-directories according to a partition (from Impala)? The field is not present in the actual parquet files since its value is in the format dir_name=value... Not sure how to include this into the dataframe or the sql query... – sg1234 Jun 17 '21 at 21:31
1

Suppose that you have the parquet file ventas4 in HDFS:

hdfs://localhost:9000/sistgestion/sql/ventas4

In this case, the steps are:

  1. Charge the SQL Context:

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
  2. Read the parquet File:

    val ventas=sqlContext.read.parquet("hdfs://localhost:9000/sistgestion/sql/ventas4")
    
  3. Register a temporal table:

    ventas.registerTempTable("ventas")
    
  4. Execute the query (in this line you can use toJSON to pass a JSON format or you can use collect()):

    sqlContext.sql("select * from ventas").toJSON.foreach(println(_))
    
    sqlContext.sql("select * from ventas").collect().foreach(println(_))
    
0

Use the following code in intellij:

def groupPlaylistIds(): Unit ={
    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession.builder.appName("FollowCount")
      .master("local[*]")
      .getOrCreate()

    val sc = spark.sqlContext

    val d = sc.read.format("parquet").load("/Users/CCC/Downloads/pq/file1.parquet")
    d.printSchema()

    val d1 = d.select("col1").filter(x => x!='-')
    val d2 = d1.filter(col("col1").startsWith("searchcriteria"));
    d2.groupBy("col1").count().sort(col("count").desc).show(100, false)
  }
KayV
  • 12,987
  • 11
  • 98
  • 148
  • The question is to use SparkSQL. What this answer seems to be is structuredAPI and not inline with the ask. Also, I do not think that Intellij or any IDE is relevant here – Ed Bighands Jul 03 '21 at 06:04