1

I would like to create a dataframe from a csv file that I will retrieve via streaming:

import requests

url = "https://{0}:8443/gateway/default/webhdfs/v1/{1}?op=OPEN".format(host, filepath)

r = requests.get(url, 
                 auth=(username, password), 
                 verify=False, 
                 allow_redirects=True, 
                 stream=True)

chunk_size = 1024
for chunk in r.iter_content(chunk_size):
    # how to load the data

How can the data be loaded into spark from the http stream?

Note that it isn't possible to use HDFS format for retrieving the data - WebHDFS must be used.

Chris Snow
  • 23,813
  • 35
  • 144
  • 309
  • You are looking to create a DataFrame while streaming the results you are receiving, correct ? You might want to look into Spark Streaming capabilities. – Jonathan Taws Jul 04 '16 at 13:18
  • One the file has been imported the data is going to be analysed using core spark. – Chris Snow Jul 04 '16 at 13:24

1 Answers1

2

You can pre-generate the RDD of chunks' boundaries, then use it to process the file inside the worker. For examples:

def process(start, finish):
   // Download file
   // Process downloaded content in range [start, finish)
   // Return a list of item

partition_size = file_size / num_partition
boundaries = [(i, i+paritition_size - 1) for i in range(0, file_size, partition_size)]
rrd = sc.parallelize(boundaries).flatMap(process)
df = sqlContext.createDataFrame(rrd)
Kien Truong
  • 11,179
  • 2
  • 30
  • 36
  • Much nicer than my solution, which results in a stackoverflow: http://stackoverflow.com/questions/38187333/unionall-resulting-in-stackoverflow – Chris Snow Jul 04 '16 at 14:43