1

I need to read 150 times from my S3 bucket

df1 = spark.read.json('s3://mybucket/f1')
df2 = spark.read.json('s3://mybucket/f2')
...
df150 = spark.read.json('s3://mybucket/f150')

How to automate this process?

spark.read.json produces Spark Dataframe.

If I try what Oscar suggested

import spark
your_dfs_list = [spark.read.json("s3://cw-mybuc/RECORDS/FULL_RECEIVED/2020/07/01/00"+str(x)) for x in range(1,38)]

AttributeError: module 'spark' has no attribute 'read'
MikiBelavista
  • 2,470
  • 10
  • 48
  • 70

3 Answers3

4

I think you can use a list comprehension that returns a list of dataframes and from there you can iterate on them

your_dfs_list = [spark.read.json("s3://mybucket/f"+str(x)) for x in range(1,150)]

I guess doing that in Scala could be easier to apply a map (or foreach later) on the dataframes, but that depends on your preference :)

(1 to 150).map(v => spark.read.json(s"s3://mybucket/f$v"))
Oscar Lopez M.
  • 585
  • 3
  • 11
  • Take a look at my edit, spark.read in Python code should be resolved. – MikiBelavista Jul 14 '20 at 12:07
  • The problem is that my files have different names. How to read them one by one? – MikiBelavista Jul 14 '20 at 13:11
  • 1
    If their names are completely different (I mean not like f1, f2... but like fileA1, anotherName, nameDifferent, etc) you can prepare a list with their names and traverse that list instead of the range(1,150). Or even you can have a csv or a table in a database and create a list from reading them – Oscar Lopez M. Jul 14 '20 at 18:13
2

I think you should provide more detail. How often do you want to read, what is the reason for that etc. If you give some context we might be able to help better?

From your code snippet it seems like a loop would be the easiest to do. and read it in an array.

list = []

for i in range(150):
  list.append(spark.read.json('s3://mybucket/f' + (i + 1)))

However if you provide some more detail I am pretty sure this answer can be improved

Edit based on comments

If the case is that you want to use union on DataFrames the easiest would be to import the implicits:

import spark.implicits._

var df = spark.emptyDataFrame

for i in range(150):
  df = df.union(spark.read.json('s3://mybucket/f' + (i + 1))))

Note this should work with spark 2.x and above:

https://sparkbyexamples.com/spark/spark-how-to-create-an-empty-dataframe/ https://sparkbyexamples.com/spark/spark-dataframe-union-and-union-all/

godzsa
  • 2,105
  • 4
  • 34
  • 56
  • Should I import spark.implicits? List is python list? – MikiBelavista Jul 14 '20 at 11:29
  • It depends on what you want to do with the results later. A python list might be just as sufficient and you don't need something fancy. Depends on what you want, what is the data, how you want to process it. – godzsa Jul 14 '20 at 11:31
  • This is Ok.The problem is that my files have json.gz format, but different names 01-06,or 21-08. That makes things more complicated. – MikiBelavista Jul 14 '20 at 13:15
  • 1
    you could define a list with the filenames `var names = ["f01-06", "f123"]` and then you can index it in the for loop instead of `(i+1)` you need `names[i]`. Or write an algorithm that will give the correct file names... – godzsa Jul 14 '20 at 13:21
  • 1
    If you are planning to run union over a large amount of dataframes, please be aware that it may cause a big lineage on the Spark execution plan, which will lower down performance. To reduce it, run union on buckets of 10 or 20 dataframes and cache the result of each bucket union – Oscar Lopez M. Jul 14 '20 at 18:19
2

Step 1: Create a list of all the json.gz files. In current versions of spark, .gzipped files are read automatically, so that's not an issue. If you're reading all the files in an S3 bucket, you can use the following solution:

from boto.s3.connection import S3Connection

fs = [] # Emtpy list of files

conn = S3Connection('access-key','secret-access-key')
bucket = conn.get_bucket('bucket')
for key in bucket.list():
    fs.append[key.name.encode('utf-8')]

Step 2: Iterate over each of the files from (1) unioning each of the resulting data frames as you move along. A version of Godza's solution should do the trick:

# Read first file
df = spark.read.json(fs[0]) 

# Then union the rest
for f in fs[1:]:
  df = df.union(spark.read.json(f))
Lars Skaug
  • 1,376
  • 1
  • 7
  • 13
  • 2
    This is also a good solution but you may have to consider that running a union on a lot of dataframes creates a large lineage and would make Spark to lower down performance. To prevent it, you can consider using union in buckets of 10 or 20 dataframes and cache/uncache each bucket to break the lineage – Oscar Lopez M. Jul 14 '20 at 18:17
  • @OscarLopezM. It seems that you are right. Should I add 20 by 20 dataframes? How to cache? – MikiBelavista Jul 15 '20 at 08:07
  • I don't think that there is a "magic number" (like 20) for that, because that can depend on the size of the dataframes, cluster resources, etc. But I think it's worth starting with that number and try others. To cache intermediate results you can call the `cache()`method once you have the buckets on each of them. Also, please remember to call the uncache method whenever you have your buckets ready. I say that because it may consume resources that you could need later for further processing – Oscar Lopez M. Jul 15 '20 at 08:23