0

I am trying to use Spark-SQL to read and select data from a JSON string.

Here is what I did:

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("aaa");
sparkConf.setMaster("local[*]");

JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
SparkSession sc = SparkSession.builder().sparkContext(javaSparkContext.sc()).getOrCreate();

String data = "{\"temp\":25, \"hum01\":50, \"env\":{\"lux\":1000, \"geo\":[32.5, 43.8]}}";
String querySql = "select env.lux as abc from testData";

System.out.println("start 01, time is"+System.currentTimeMillis());
List<String> dataList = Arrays.asList(data);
Dataset<String> dataset = sc.createDataset(dataList, Encoders.STRING());
dataset.printSchema();
System.out.println("start 02, time is"+System.currentTimeMillis());
Dataset<Row> df = sc.read().json(dataset);
System.out.println("start 03, time is"+System.currentTimeMillis());
List<String> queryResultJson = null;
try{
  df.createOrReplaceTempView("testData");
  System.out.println("start 04, time is"+System.currentTimeMillis());
  Dataset<Row> queryData = sc.sql(querySql);
  System.out.println("start 05, time is"+System.currentTimeMillis());
  queryResultJson = queryData.toJSON().collectAsList();
  System.out.println("start 06, time is"+System.currentTimeMillis());
}catch (Exception e) {
  e.printStackTrace();
} finally {
  sc.catalog().dropTempView("testData");
}

The result is look like this:

start 01, time is1543457455652
start 02, time is1543457458766
start 03, time is1543457459993
start 04, time is1543457460190
start 05, time is1543457460334
start 06, time is1543457460818

It seems like that the dataset creation process takes too much time. I want to use this function in a streaming data process flow. But the performance is too poor to use.

Is there any way to make dataset creation go faster? Or is there any other method to query a Json data with SQL like language?

blackbishop
  • 30,945
  • 11
  • 55
  • 76
Newman
  • 5
  • 3
  • Using Spark structured streaming you won't create your dataset like that. You will directly read it from the stream. It's also recommended to use a predefined schema instead of relying on Spark inference. – JEY Nov 29 '18 at 09:50
  • Thank you very much! Big help. So with the streaming mode, I can send all the data to a Socket and then build a stream processing all the data received from the Socket. Is this right? Besides, is there examples how I can use a predefined schema? – Newman Nov 29 '18 at 10:53

1 Answers1

0

You won't create your dataset the same way when using spark structured streaming. For example if your source is a socket with a schema describing your data:

SparkSession spark = SparkSession.builder()
    .appName("Simple Application")
    .master("local[*]")
    .getOrCreate();
StructType sensorSchema = new StructType().add("temp", new IntegerType())
        .add("hum01", new IntegerType())
        .add("env", new StructType()
                                .add("lux", new IntegerType())
                                .add("geo", new ArrayType(new FloatType(), false)));
Dataset<Row> socketDF = spark
    .readStream()
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .schema()
    .load()
    .selectExp("temp, hum01, env");

Then you can start to benchmark your algorithm.

JEY
  • 6,973
  • 1
  • 36
  • 51
  • Hi, do I always need a schema if I want to use structed streaming to process Json data? – Newman Dec 11 '18 at 05:41
  • No you don't. It's recommended – JEY Dec 11 '18 at 05:49
  • From https://stackoverflow.com/questions/43297973/how-to-read-records-in-json-format-from-kafka-using-structured-streaming, it is stated that there are two ways to parse Json data with structured streaming. But the first way needs a shcema, the second way only parse a few items. What I want is to parse data with Sql like language(where functions could be used) and I don't know the schema. There may be different types of Json data to process. Could this be possible? – Newman Dec 11 '18 at 11:10
  • If you have heterogeneous data in your stream it's going to be quite complex. You will need to find a way to tel in the stream what kind of data to know how you can handle it. But there is nothing built-in in spark for such usecase (in json at least). Maybe a third party lib could help you with that but i have no idea – JEY Dec 11 '18 at 12:40
  • Thanks a lot. So maybe the sample code is the only way I can use, although it is quit slow... – Newman Dec 12 '18 at 01:25
  • You may need to tweak your cluster size – JEY Dec 12 '18 at 07:47