-4
aws s3api list-objects-v2 --bucket cw-milenko-tests | grep 'tick_c'

output shows

    "Key": "Json_gzips/tick_calculated_3_2020-05-27T11-50-22.json.gz",
    "Key": "Json_gzips/tick_calculated_3_2020-05-27T11-52-59.json.gz",
    "Key": "Json_gzips/tick_calculated_3_2020-05-27T11-55-08.json.gz",
    "Key": "Json_gzips/tick_calculated_3_2020-05-27T11-57-30.json.gz",
    "Key": "Json_gzips/tick_calculated_3_2020-05-27T11-59-59.json.gz",
    "Key": "Json_gzips/tick_calculated_4_2020-05-27T09-14-28.json.gz",
    "Key": "Json_gzips/tick_calculated_4_2020-05-27T11-35-38.json.gz",  

With wc -l

aws s3api list-objects-v2 --bucket cw-milenko-tests | grep 'tick_c' | wc -l
457

I can read one file into data frame.

val path ="tick_calculated_2_2020-05-27T00-01-21.json"
scala> val tick1DF = spark.read.json(path)
tick1DF: org.apache.spark.sql.DataFrame = [aml_barcode_canc: string, aml_barcode_payoff: string ... 70 more fields]

I was surprised to see negative votes. What I want to know is how to load 457 files into RDD? I saw this SO question. Is it possible at all? What are the limitations? This is what I tried so far.

val rdd1 = sc.textFile("s3://cw-milenko-tests/Json_gzips/tick_calculated*.gz")

If I go for s3a

val rdd1 = sc.textFile("s3a://cw-milenko-tests/Json_gzips/tick_calculated*.gz")
rdd1: org.apache.spark.rdd.RDD[String] = s3a://cw-milenko-tests/Json_gzips/tick_calculated*.gz MapPartitionsRDD[3] at textFile at <console>:27

Doesn't work either.

Try to inspect my RDD.

scala> rdd1.take(1)
java.io.IOException: No FileSystem for scheme: s3
  at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)

FileSytem was not recognized.

My GOAL:

s3://json.gz -> rdd -> parquet
  • like any other JSON file in Spark, I'd say? Did you read the docs? https://spark.apache.org/docs/latest/sql-data-sources-json.html - it's pretty straightforward to implement and the docs are quite nice and should help you here – UninformedUser Jun 10 '20 at 14:57
  • @UninformedUser Take a look at my new edit,I explained what I really want to achieve. – miki_cloud Jun 11 '20 at 08:59

2 Answers2

-1

Try this-

  /**
      * /Json_gzips
      * |-  spark-test-data1.json.gz
      * --------------------
      * {"id":1,"name":"abc1"}
      * {"id":2,"name":"abc2"}
      * {"id":3,"name":"abc3"}
      */

    /**/Json_gzips
      *|-   spark-test-data2.json.gz
      * --------------------
      * {"id":1,"name":"abc1"}
      * {"id":2,"name":"abc2"}
      * {"id":3,"name":"abc3"}
      */
    val path = getClass.getResource("/Json_gzips").getPath
    // path till the root directory which contains the all .gz files
    spark.read.json(path).show(false)

    /**
      * +---+----+
      * |id |name|
      * +---+----+
      * |1  |abc1|
      * |2  |abc2|
      * |3  |abc3|
      * |1  |abc1|
      * |2  |abc2|
      * |3  |abc3|
      * +---+----+
      */

You can convert this df to rdd if required

Som
  • 6,193
  • 1
  • 11
  • 22
-1
from pyspark.sql import SparkSession

//Create Spark Session
spark = SparkSession 
    .builder 
    .appName("Python Spark SQL basic example") 
    .getOrCreate()

//To read all files inside from S3 in under Json_gzips key
df = spark.read.json("s3a://cw-milenko-tests/Json_gzips/tick_calculated*.gz")
df.show()
rdd = df.rdd // to convert it to rdd

use s3a instead s3

why s3a over s3?

Also add dependency for hadoop-aws 2.7.3 and AWS SDK Add AWS S3 supporting JARs

QuickSilver
  • 3,915
  • 2
  • 13
  • 29