43

I have a csv file in S3 and I'm trying to read the header line to get the size (these files are created by our users so they could be almost any size). Is there a way to do this using boto? I thought maybe I could us a python BufferedReader, but I can't figure out how to open a stream from an S3 key. Any suggestions would be great. Thanks!

John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
gignosko
  • 977
  • 1
  • 8
  • 12
  • 1
    Would `key.size` not work for you? Returns the key size in bytes. If you want the header, you could also stream the first chunk only, like this: http://stackoverflow.com/a/7625197/786559 . – Ciprian Tomoiagă Apr 07 '17 at 01:46

10 Answers10

39

Here's a solution which actually streams the data line by line:

from io import TextIOWrapper
from gzip import GzipFile
...

# get StreamingBody from botocore.response
response = s3.get_object(Bucket=bucket, Key=key)
# if gzipped
gzipped = GzipFile(None, 'rb', fileobj=response['Body'])
data = TextIOWrapper(gzipped)

for line in data:
    # process line
kooshywoosh
  • 561
  • 4
  • 6
  • 8
    The gzip requirement wasn't in the initial question, but this is exactly what I needed for my use case. Thanks! – killdash9 Mar 27 '18 at 17:53
  • This looks like a much neater way to go than using an external package. Sad that 1) your answer did not get checked as the actual answer and 2) that the smart_open solution got so many ups. – Christophe Sep 08 '19 at 11:12
  • 3
    It does not work if we don't use `GzipFile` to decompress it, as `StreamingBody` does not have a `readable` property which `TextIOWrapper` requires – peon Oct 31 '19 at 03:58
  • this should be the accepted answer... smart_open appears to be significantly slower: https://github.com/RaRe-Technologies/smart_open/issues/457 in my own testing files stream about 5x faster using the approach described in this answer than using smart_open. it's a super cool library but doesn't seem to be up to par for this particular use case. – zyd Nov 11 '20 at 02:44
34

You may find https://pypi.python.org/pypi/smart_open useful for your task.

From documentation:

for line in smart_open.smart_open('s3://mybucket/mykey.txt'):
    print line
Michael Korbakov
  • 2,147
  • 1
  • 18
  • 20
  • 4
    this is so much easier ! And is supports gzipped files by default ! – Ciprian Tomoiagă Apr 07 '17 at 01:50
  • 1
    Thanks for sharing! – oshaiken May 10 '17 at 16:09
  • @Michael Korbakov This is giving me an exception TypeError: a bytes-like object is required, not 'str' – Shek Sep 07 '17 at 15:19
  • 8
    What is the cost? Do you end up downloading the entire file just to read a few lines? – Leonid Dec 03 '17 at 19:07
  • 5
    Using external libraries is usually bad practice - you want to have minimum dependencies so that you can upgrade and move forward easy. – Ivailo Bardarov Mar 16 '20 at 21:11
  • @IvailoBardarov is it possible to read data from S3 with avoiding saving all the bytes in memory, without smart_open? – Amundeep Singh Dec 03 '20 at 20:07
  • 1
    @AmundeepSingh Yes, you just do s3.get_object(), then passing the response["Body"] to a io.TextIOWrapper and then read it by line by line. The memory will stay constant based on the buffers set. Here is how I read compressed gz files and read them line by line https://gist.github.com/gudata/da5d0553a309836d998a56c73c60575c It is a few lines of code using only whats in python and boto – Ivailo Bardarov Dec 06 '20 at 12:42
  • What about .dat or any complex file read? Can you help with this: https://stackoverflow.com/questions/72287301/how-to-read-dat-file-from-aws-s3-using-mdfreader ? – Aakash Basu May 18 '22 at 12:58
29

I know it's a very old question.

But as for now, we can just use s3_conn.get_object(Bucket=bucket, Key=key)['Body'].iter_lines()

peon
  • 1,833
  • 2
  • 19
  • 21
  • yeah if this question were asked today this and .iter_chunks() would be the answer +1 – zyd Aug 28 '20 at 15:04
  • The `iter_lines()` method is part of botocore.response package: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/response.html – Amit Tendulkar Mar 26 '21 at 09:23
  • One question/concern I have: Does `iter_lines` make another GET request every time it needs the next chunk? (Also asked here: https://stackoverflow.com/q/60422708) – Dominus.Vobiscum Nov 22 '22 at 19:57
  • 1
    @Dominus.Vobiscum It loads a fixed chunk size(in bytes) for each request, if there is another line in the loaded content cache, the `__next__` call to the generator returns it, otherwise it will make another request until there is no remaining content. See https://github.com/boto/botocore/blob/dfda41c08e3ed5354dce9f958b6db06e6cce99ed/botocore/response.py#L135-L148 – peon Nov 28 '22 at 03:03
14

The codecs module in the stdlib provides a simple way to encode a stream of bytes into a stream of text and provides a generator to retrieve this text line-by-line. It can be used with S3 without much hassle:

import codecs

import boto3


s3 = boto3.resource("s3")
s3_object = s3.Object('my-bucket', 'a/b/c.txt')
line_stream = codecs.getreader("utf-8")

for line in line_stream(s3_object.get()['Body']):
    print(line)
alukach
  • 5,921
  • 3
  • 39
  • 40
10

It appears that boto has a read() function that can do this. Here's some code that works for me:

>>> import boto
>>> from boto.s3.key import Key
>>> conn = boto.connect_s3('ap-southeast-2')
>>> bucket = conn.get_bucket('bucket-name')
>>> k = Key(bucket)
>>> k.key = 'filename.txt'
>>> k.open()
>>> k.read(10)
'This text '

The call to read(n) returns the next n bytes from the object.

Of course, this won't automatically return "the header line", but you could call it with a large enough number to return the header line at a minimum.

John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
  • Thanks, John. This was going to be my fallback solution if I couldn't find a way to stream the file. I'll just take a guess about the max size of the header and go from there. – gignosko Feb 20 '15 at 13:40
  • @John Rotenstein - do not you have to close the file after read()? – bartekch Jan 04 '19 at 14:39
9

With boto3 you can access a raw stream and read line by line. Just note raw stream is a private property for some reason

s3 = boto3.resource('s3', aws_access_key_id='xxx', aws_secret_access_key='xxx')
obj = s3.Object('bucket name', 'file key')

obj.get()['Body']._raw_stream.readline() # line 1
obj.get()['Body']._raw_stream.readline() # line 2
obj.get()['Body']._raw_stream.readline() # line 3...
robertzp
  • 1,415
  • 1
  • 13
  • 10
  • 8
    As is hinted by `_raw_stream` property beginning with an underscore, this is not how you should access the stream contents. They can be read in entirety by calling `obj.get()["body"].read()` or iterated as a generator with `obj.get()["body"].iter_lines()` – Alex Jan 28 '19 at 18:54
  • For some python APIs, this is the only way to go (e.g. `pickle.load` which expects to find both `.read()` and `.readline()` defined on its argument) – Max Gasner Mar 22 '19 at 23:42
  • @Alex it's `obj.get()["Body"].read()` note the capital B in Body – John_Krampf Mar 22 '21 at 21:23
7

Using boto3:

s3 = boto3.resource('s3')
obj = s3.Object(BUCKET, key)
for line in obj.get()['Body']._raw_stream:
    # do something with line
hansaplast
  • 11,007
  • 2
  • 61
  • 75
6

If you want to read multiple files (line by line) with a specific bucket prefix (i.e., in a "subfolder") you can do this:

s3 = boto3.resource('s3', aws_access_key_id='<key_id>', aws_secret_access_key='<access_key>')

    bucket = s3.Bucket('<bucket_name>')
    for obj in bucket.objects.filter(Prefix='<your prefix>'):
        for line in obj.get()['Body'].read().splitlines():
            print(line.decode('utf-8'))

Here lines are bytes so I am decoding them; but if they are already a string, you can skip that.

oneschilling
  • 463
  • 5
  • 11
2

The most dynamic and low cost way to read the file is to read each byte until you find the number of lines you need.

line_count = 0
line_data_bytes = b''

while line_count < 2 :

    incoming = correlate_file_obj['Body'].read(1)
    if incoming == b'\n':
        line_count = line_count + 1

    line_data_bytes = line_data_bytes + incoming

logger.debug("read bytes:")
logger.debug(line_data_bytes)

line_data = line_data_bytes.split(b'\n')

You won't need to guess about header size if the header size can change, you won't end up downloading the whole file, and you don't need 3rd party tools. Granted you need to make sure the line delimeter in your file is correct and you are reading the right number of bytes to find it.

KiteCoder
  • 2,364
  • 1
  • 13
  • 29
1

Expanding on kooshywoosh's answer: using TextIOWrapper (which is very useful) on a StreamingBody from a plain binary file directly isn't possible, as you'll get the following error:

"builtins.AttributeError: 'StreamingBody' object has no attribute 'readable'"

However, you can use the following hack mentioned in this long standing issue on botocore's github page, and define a very simple wrapper class around StreamingBody:

from io import RawIOBase
...

class StreamingBodyIO(RawIOBase):
"""Wrap a boto StreamingBody in the IOBase API."""
def __init__(self, body):
    self.body = body

def readable(self):
    return True

def read(self, n=-1):
    n = None if n < 0 else n
    return self.body.read(n)

Then, you can simply use the following code:

from io import TextIOWrapper
...

# get StreamingBody from botocore.response
response = s3.get_object(Bucket=bucket, Key=key)
data = TextIOWrapper(StreamingBodyIO(response))
for line in data:
    # process line
Dean Gurvitz
  • 854
  • 1
  • 10
  • 24