0

I want to convert dataframe to Array of Json using Java and Spark version 1.6, for which am converting the data from Dataframe -> Json -> RDD -> Array where the data looks like this.

[  
   {  
      "prtdy_pgm_x":"P818_C",
      "prtdy_pgm_x":"P818",
      "prtdy_attr_c":"Cost",
      "prtdy_integer_r":0,
      "prtdy_cds_d":"prxm",
      "prtdy_created_s":"2018-05-12 04:12:19.0",
      "prtdy_created_by_c":"brq",
      "prtdy_create_proc_x":"w_pprtdy_security_t",
      "snapshot_d":"2018-05-12-000018"
   },
   {  
      "prtdy_pgm_x":"P818_I",
      "prtdy_pgm_x":"P818",
      "prtdy_attr_c":"Tooling",
      "prtdy_integer_r":0,
      "prtdy_cds_d":"prxm",
      "prtdy_created_s":"2018-05-12 04:12:20.0",
      "prtdy_created_by_c":"brq",
      "prtdy_create_proc_x":"w_pprtdy_security_t",
      "snapshot_d":"2018-05-12-000018"
   },
   {  
      "prtdy_pgm_x":"P818_W",
      "prtdy_pgm_x":"P818",
      "prtdy_attr_c":"Weight",
      "prtdy_integer_r":0,
      "prtdy_cds_d":"prxm",
      "prtdy_created_s":"2018-05-12 04:12:20.0",
      "prtdy_created_by_c":"brq",
      "prtdy_create_proc_x":"w_pprtdy_security_t",
      "snapshot_d":"2018-05-12-000018"
   },
   ......
]

so I wrote my code something like this.

if(cmnTableNames != null && cmnTableNames.length > 0)
    {
        for(int i=0; i < cmnTableNames.length; i++)
        {
            String cmnTableName = cmnTableNames[i];
            DataFrame cmnTableContent = null;

            if(cmnTableName.contains("PTR_security_t"))
            {
                cmnTableContent = hiveContext.sql("SELECT * FROM " + cmnTableName + " where fbrn04_snapshot_d = '" + snapshotId + "'");
            }
            else
            {
                cmnTableContent = hiveContext.sql("SELECT * FROM " + cmnTableName);
            }
            String cmnTable = cmnTableName.substring(cmnTableName.lastIndexOf(".") + 1);
            if (cmnTableContent.count() > 0)
            {
                String cmnStgTblDir = hdfsPath + "/staging/" + rptName + "/common/" + cmnTable;
                JavaRDD<String> cmnTblCntJson = cmnTableContent.toJSON().toJavaRDD();
                String result = cmnTblCntJson.reduce((ob1, ob2) -> (String)ob1+","+(String)ob2); //This Part, takes more time than usual contains large set of data.
                String output = "["+result+"]";

                ArrayList<String> outputList = new ArrayList<String>();
                outputList.add(output);
                JavaRDD<String> finalOutputRDD = sc.parallelize(outputList);

                String cmnStgMrgdDir = cmnStgTblDir + "/mergedfile";

                if(dfs.exists(new Path(cmnStgTblDir + "/mergedfile"))) dfs.delete(new Path(cmnStgTblDir + "/mergedfile"), true);

                finalOutputRDD.coalesce(1).saveAsTextFile(cmnStgMrgdDir, GzipCodec.class);

                fileStatus = dfs.getFileStatus(new Path(cmnStgMrgdDir + "/part-00000.gz"));
                dfs.setPermission(fileStatus.getPath(),FsPermission.createImmutable((short) 0770));

                dfs.rename(new Path(cmnStgMrgdDir + "/part-00000.gz"), new Path(CommonPath + "/" + cmnTable + ".json.gz"));
            }
            else
            {
                System.out.println("There are no records in " + cmnTableName);
            }
        }

    }
    else
    {
        System.out.println("The common table lists are null.");
    }

    sc.stop();

but while reduce function is applied it's taking more time

JavaRDD<String> cmnTblCntJson = cmnTableContent.toJSON().toJavaRDD();

String result = cmnTblCntJson.reduce((ob1, ob2) -> (String)ob1+","+(String)ob2); //This Part, takes more time than usual contains large set of data.

the table with the partition "PTR_security_t" is huge and takes a lot of time compared to other tables which don't have partitions (40-50 mins odd for 588kb)

I Tried Applying Lambda but i ended up with Task not serializable error. Check the code below.

if(cmnTableNames != null && cmnTableNames.length > 0)
        {
            List<String> commonTableList = Arrays.asList(cmnTableNames);

            DataFrame commonTableDF = sqc.createDataset(commonTableList,Encoders.STRING()).toDF();
            commonTableDF.toJavaRDD().foreach(cmnTableNameRDD -> {
            DataFrame cmnTableContent = null;
                String cmnTableName = cmnTableNameRDD.mkString();

                        if(cmnTableName.contains("PTR_security_t"))
                        {
                            cmnTableContent = hiveContext.sql("SELECT * FROM " + cmnTableName + " where fbrn04_snapshot_d = '" + snapshotId + "'");
                        }
                        else
                        {
                            cmnTableContent = hiveContext.sql("SELECT * FROM " + cmnTableName);
                        }

                        String cmnTable = cmnTableName.substring(cmnTableName.lastIndexOf(".") + 1);

                        if (cmnTableContent.count() > 0)
                        {   
                            String cmnStgTblDir = hdfsPath + "/staging/" + rptName + "/common/" + cmnTable;
                            JavaRDD<String> cmnTblCntJson = cmnTableContent.toJSON().toJavaRDD();
                            String result = cmnTblCntJson.reduce((ob1, ob2) -> (String)ob1+","+(String)ob2);
                            String output = "["+result+"]";

                            ArrayList<String> outputList = new ArrayList<String>();
                            outputList.add(output);
                            JavaRDD<String> finalOutputRDD = sc.parallelize(outputList);
                            String cmnStgMrgdDir = cmnStgTblDir + "/mergedfile";
                            if(dfs.exists(new Path(cmnStgTblDir + "/mergedfile"))) dfs.delete(new Path(cmnStgTblDir + "/mergedfile"), true);

                            finalOutputRDD.coalesce(1).saveAsTextFile(cmnStgMrgdDir, GzipCodec.class);

                            fileStatus = dfs.getFileStatus(new Path(cmnStgMrgdDir + "/part-00000.gz"));
                            dfs.setPermission(fileStatus.getPath(),FsPermission.createImmutable((short) 0770));
                            dfs.rename(new Path(cmnStgMrgdDir + "/part-00000.gz"), new Path(CommonPath + "/" + cmnTable + ".json.gz"));
                        }

                        else
                        {
                            System.out.println("There are no records in " + cmnTableName);
                        }
                });
        }
        else
        {
            System.out.println("The common table lists are null.");
        }
        sc.stop();

is there any efficient way where i can enhance my Performance ?

Nisarg
  • 1,631
  • 6
  • 19
  • 31
Irthiza Khan
  • 23
  • 1
  • 7
  • 1
    I think this would be a better fit for [codereview.stackexchange.com](https://codereview.stackexchange.com/) as your code is seemingly working but you want to enhance the performance. – Ben May 15 '18 at 13:16
  • Check [Spark performance for Scala vs Python](https://stackoverflow.com/q/32464122), specifically "What is wrong with code provided in the question" part. `.reduce((ob1, ob2) -> (String)ob1+","+(String)ob2)` is a bad idea – zero323 May 15 '18 at 13:40
  • The code is running fine it's just that .reduce((ob1, ob2) -> (String)ob1+","+(String)ob2) this particular part is taking more time for very less memory. Am trying to apply the whole thing in Java alone. – Irthiza Khan May 15 '18 at 13:46

0 Answers0