2

I have about 200 files in S3, e.g., a_file.json.bz2, each line of these file is a record in JSON format but some fields were serialised by pickle.dumps, e.g. a datetime field. Each file is about 1GB after bzip compression. Now I need to process these files in Spark (pyspark, actually) but I couldn't even get each record out. So what would be the best practice here?

The ds.take(10) gives

[(0, u'(I551'),
 (6, u'(dp0'),
 (11, u'Vadv_id'),
 (19, u'p1'),
 (22, u'V479883'),
 (30, u'p2'),
 (33, u'sVcpg_id'),
 (42, u'p3'),
 (45, u'V1913398'),
 (54, u'p4')]

Apparently the splitting is not by each record.

Thank you.

shuaiyuancn
  • 2,744
  • 3
  • 24
  • 32

3 Answers3

4

I had this issue reading gpg-encrypted files. You can use wholeTextFiles as Daniel suggests, but you have to be careful when reading large files as the entire file will be loaded to memory before processing. If the file is too large, it can crash the executor. I used parallelize and flatMap. Maybe something along the lines of

def read_fun_generator(filename):
    with bz2.open(filename, 'rb') as f:
        for line in f:
            yield line.strip()

bz2_filelist = glob.glob("/path/to/files/*.bz2")
rdd_from_bz2 = sc.parallelize(bz2_filelist).flatMap(read_fun_generator)
Community
  • 1
  • 1
santon
  • 4,395
  • 1
  • 24
  • 43
1

You can access the input file-by-file (instead of line-by-line) via SparkContext.wholeTextFiles. You can then use flatMap to uncompress and parse the lines in your own code.

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
0

In fact it is a problem caused by pickle. By looking at the file content after the compression, it is indeed

(I551
(dp0
Vadv_id
p1
V479883
p2
sVcpg_id
p3
V1913398
p4

which gives me trouble to parse. I know I can just pick.load(file) multiple times to get the objects out, but cannot find a quick solution in Spark where I can only access the loaded files line by line. Also, the records in this file have variable fields and lengths which makes it more difficult to hack.

I ended up re-generating these bz2 files from the source because it is actually easier and faster. And I learnt that Spark and hadoop supports bz2 compression perfectly so there is no additional action required.

shuaiyuancn
  • 2,744
  • 3
  • 24
  • 32