2

I am trying to read multiple parquet files from multiple partitions via , and concatenate them to one big data frame. The files look like,

 hdfs dfs -ls /data/customers/odysseyconsultants/logs_ch_blade_fwvpn
Found 180 items
drwxrwxrwx   - impala impala          0 2018-03-01 10:31 /data/customers/odysseyconsultants/logs_ch_blade_fwvpn/_impala_insert_staging
drwxr-xr-x   - impala impala          0 2017-08-23 17:55 /data/customers/odysseyconsultants/logs_ch_blade_fwvpn/cdateint=20170822
drwxr-xr-x   - impala impala          0 2017-08-24 05:57 /data/customers/odysseyconsultants/logs_ch_blade_fwvpn/cdateint=20170823
drwxr-xr-x   - impala impala          0 2017-08-25 06:00 /data/customers/odysseyconsultants/logs_ch_blade_fwvpn/cdateint=20170824
drwxr-xr-x   - impala impala          0 2017-08-26 06:04 /data/customers/odysseyconsultants/logs_ch_blade_fwvpn/cdateint=20170825

Each partition has either one or multiple parquet files, i.e.

hdfs dfs -ls /data/customers/odysseyconsultants/logs_ch_blade_fwvpn/cdateint=20170822
Found 1 items
-rw-r--r--   2 impala impala   72252308 2017-08-23 17:55 /data/customers/odysseyconsultants/logs_ch_blade_fwvpn/cdateint=20170822/5b4bb1c5214fdffd-cc8dbcf600000008_1393229110_data.0.parq

What I m trying to create is a generic function that will take a from - to argument and load and concatenate all the parquet files of that time range in a big data frame.

I can create the files to be read,

def read_files(table,from1,to):
     s1 = ', '.join('/data/customers/odysseyconsultants/' + table + '/' + 'cdateint=' + str(i) for i in range(from1, to+1))
     return s1.split(', ')

If I attempt to read the files, as follows, I get an exception

for i in read_files('logs_ch_blade_fwvpn', 20170506, 20170510):
...  sqlContext.read.parquet(i).show()

If I try to read it

x = read_files('logs_cs_blade_fwvpn', 20180109, 20180110)
d1 = sqlContext.read.parquet(*x)

I get error

pyspark.sql.utils.AnalysisException: u'Path does not exist: hdfs://nameservice1/data/customers/odysseyconsultants/logs_cs_blade_fwvpn/cdateint=20180109;'

Sotos
  • 51,121
  • 6
  • 32
  • 66
  • Any info about the exception you are getting? – Vladislav Varslavans Mar 01 '18 at 11:05
  • @VladislavVarslavans, It complains about basepath so I read [this](https://stackoverflow.com/questions/37257111/reading-parquet-files-from-multiple-directories-in-pyspark) but still no luck. – Sotos Mar 01 '18 at 11:07

2 Answers2

1

Here is one way of doing it, although I am open to alternatives

import subprocess
from datetime import date, timedelta
from pyspark.sql import SQLContext


def read_data(customer, table, start_date, end_date):
    def run_cmd(args_list):
        #Run linux commands
        print('Running system command: {0}'.format(' '.join(args_list)))
        proc = subprocess.Popen(args_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        s_output, s_error = proc.communicate()
        s_return = proc.returncode
        return s_return, s_output, s_error

    #Generate a list with the dates to access the parquet files
    d1 = date(int(start_date[0:4]), int(start_date[4:6]), int(start_date[6:8]))
    d2 = date(int(end_date[0:4]), int(end_date[4:6]), int(end_date[6:8]))
    dates = [d1 + timedelta(days=x) for x in range((d2-d1).days + 1)]
    #Loop through the dates and load the parquet files
    files = []
    for i in dates:
        path = '/data/customers/' + customer + '/' + table + '/cdateint=' + str(i).replace('-','')
        (ret, out, err) = run_cmd(['hdfs','dfs','-find',path,'-name','*.parq'])
        files.append(out.split('\n'))
    c=0
    for i in files:
        print(c)
        for j in i:
            print j
            if c == 0:
                if len(j) > 0:
                    df = sqlContext.read.parquet(j)
            else:
                if len(j) > 0:
                    df_temp = sqlContext.read.parquet(j)
                    df = df.union(df_temp)
                    del(df_temp)
            c += 1
    return df
Sotos
  • 51,121
  • 6
  • 32
  • 66
1

What about using directories names as partitions? For example:

table = 'logs_ch_blade_fwvpn'
sqlContext.read.parquet('/data/customers/odysseyconsultants/' + table) \
    .where(col('cdateint').between('20170822', '20170825')).show()
Mariusz
  • 13,481
  • 3
  • 60
  • 64