0

I'm trying to do a very basic thing, but I get an error for which I can't find a solution.

What I want to do is to be able to execute SQL queries on a dataframe. To do this, I saw in the documentation here, you need to create a view from the dataframe. The problem is that although I do exactly what is written in the documentation, IntelliJ it keeps giving me a compile error.

This is my code:

    @Override
public Long execute() {
    log.info("Starting processing query");
    Instant start = Instant.now();


    Dataset<Row> dataframe = this.hdfsIO.readParquetAsDataframe(vaccineAdministrationSummaryFile);
//    Dataset<Row> dataframe = this.sparkSession.read().parquet(this.hdfsUrl + inputDir + "/" + filename);
    dataframe.createOrReplaceTempView("query");

    Dataset<Row> sqlDF = sparkSession.sql("SELECT * FROM query");

The line sparkSession.sql(SELECT * FROM query); gives me an error and a warning:

  • Warning: No data sources are configured to run this SQL and provide advanced code assistance. Disable this inspection via problem menu (alt+enter).
  • Error: Unable to resolve table 'query' .

Why does this happen if my code is pretty much the same as the one in the documentation?

Running sparkSession.catalog().listTables().show(false); after dataframe.createOrReplaceTempView("query"); i get this:

21/05/31 09:36:29 INFO CodeGenerator: Code generated in 27.591936 ms
21/05/31 09:36:29 INFO CodeGenerator: Code generated in 11.52196 ms
21/05/31 09:36:29 INFO CodeGenerator: Code generated in 15.643281 ms
+-----+--------+-----------+---------+-----------+
|name |database|description|tableType|isTemporary|
+-----+--------+-----------+---------+-----------+
|query|null    |null       |TEMPORARY|true       |
+-----+--------+-----------+---------+-----------+

Istead, running the same command before dataframe.createOrReplaceTempView("query"); i get this:

+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
+----+--------+-----------+---------+-----------+

The function readParquetAsDataframe() is defined as follows, the string passed is the path of the file in the hdfs:

    public Dataset<Row> readParquetAsDataframe(String filename){
        return this.sparkSession.read().parquet(this.hdfsUrl + inputDir + "/" + filename);
}

I also imagined that a problem could lie in how I build the SparkSession because I don't specify any configuration, but if so I wouldn't even know how to configure it properly. The SparkSession is build as follows:

SparkSession spark = SparkSession
    .builder()
    .appName("Project 1")
    .getOrCreate();
fabianod
  • 501
  • 4
  • 17
  • Does this answer your question? [How does createOrReplaceTempView work in Spark?](https://stackoverflow.com/questions/44011846/how-does-createorreplacetempview-work-in-spark) – mazaneicha May 30 '21 at 14:00
  • No, I had already read that answer, but it contains the information that is already available in the documentation, nothing more. – fabianod May 30 '21 at 14:08
  • Do you _really_ need raw sql? Using the available dataframe selection methods are more optimized at compilation time – OneCricketeer May 30 '21 at 15:02
  • Honestly I think I can do without it, but I tried to write this instruction and since I got this error and I don't understand why I posted this question to understand what is happening.. – fabianod May 30 '21 at 15:16
  • What do you see if you put `spark.catalog.listTables.show(false)` right after `createOrReplaceTempView(...)`? – mazaneicha May 30 '21 at 16:04
  • @mazaneicha i added what i get... it seems that the view has been created... – fabianod May 31 '21 at 09:40
  • That seems to indicate that the problem is with `this.hdfsIO.readParquetAsDataframe(...)`. Can you now do a `dataframe.show(false)` before defining a view? – mazaneicha May 31 '21 at 12:43
  • @mazaneicha I added more details. Why do you say the problem is in `this.hdfsIO.readParquetAsDataframe(...)`? – fabianod May 31 '21 at 13:37

1 Answers1

0

Looks like you have java code. Check this JDBC example out and reading parquet is no different once you collect your DataSet<Row>, please pay attention to comments.

public static void main(String[] args) throws Exception {
        
        SparkSession sparkSession = SparkSession.builder()
                                .master("local")
                                .appName("Test App")
                                .getOrCreate();

        // JDBC connection details
        String driver = "com.mysql.cj.jdbc.Driver";
        String url = "jdbc:mysql://192.168.1.113:3306/db";
        String user = "user";
        String pass = "password";
        
        // JDBC Connection and load table in Dataframe
        Dataset<Row> trans = sparkSession.read()
                                .format("jdbc")
                                .option("driver", driver)
                                .option("url", url)
                                //your table name here
                                .option("dbtable", "<table name>")
                                .option("user", user)
                                .option("password", pass).load();
        
        //your table view name here
        trans.createOrReplaceTempView("<table_view_name>");

        Dataset<Row> sqlResult = sparkSession
                                    //your same table view name here
                                    .sql("select firstname,lastname from <table_view_name>");

        sqlResult.foreach(f -> {
            //print the firstname from your table
            System.out.println("firstname -> "+f.get(0));
        });

    }
Chandan
  • 640
  • 4
  • 10