7

I have an iterable of bytes, such as

bytes_iter = (
    b'col_1,',
    b'c',
    b'ol_2\n1',
    b',"val',
    b'ue"\n',
)

(but typically this would not be hard coded or available all at once, but supplied from a generator say) and I want to convert this to an iterable of str lines, where line breaks are unknown up front, but could be any of \r, \n or \r\n. So in this case would be:

lines_iter = (
    'col_1,col_2',
    '1,"value"',
)

(but again, just as an iterable, not so it's all in memory at once).

How can I do this?

Context: my aim is to then pass the iterable of str lines to csv.reader (that I think needs whole lines?), but I'm interested in this answer just in general.

Michal Charemza
  • 25,940
  • 14
  • 98
  • 165
  • Why the `csv` tag is related? – balderman Jan 09 '22 at 08:09
  • @balderman I've added some more detail on that – Michal Charemza Jan 09 '22 at 08:12
  • Did you look at generator? - https://www.programiz.com/python-programming/generator – balderman Jan 09 '22 at 08:15
  • Read piece by piece from the bytes generator, concatenate the pieces together, detect line breaks and if you find some, `yield` the accumulated string…? – deceze Jan 09 '22 at 08:18
  • Is there a reason you do not want to process the input all at once, e.g. `map(bytes.decode, b"".join(bytes_iter).splitlines())`? – hilberts_drinking_problem Jan 09 '22 at 08:22
  • 2
    I think the trickiest part is that it's possible for a `'\r\n'` sequence to be split across two input chunks. If you don't handle that correctly, that'll be treated as two line breaks. – user2357112 Jan 09 '22 at 08:23
  • 1
    @hilberts_drinking_problem Yes: memory. The data could be several GBs – Michal Charemza Jan 09 '22 at 08:24
  • 1
    An [incremental decoder](https://docs.python.org/3/library/codecs.html#incrementaldecoder-objects) would be the best way to handle non-ASCII input. A single character could easily be split across multiple input chunks, and with non-ASCII-compatible encodings, searching for `b'\r'` or `b'\n'` in the bytestream is incorrect. – user2357112 Jan 09 '22 at 08:25
  • @user2357112supportsMonica, thanks for the suggestion to use an incremental decoder, I put that together along with o11c's ReadableIterator class to get what I think is a good solution. – Zach Young Jan 10 '22 at 05:46

4 Answers4

6

Use the io module to do most of the work for you:

class ReadableIterator(io.IOBase):
    def __init__(self, it):
        self.it = iter(it)
    def read(self, n):
        # ignore argument, nobody actually cares
        # note that it is *critical* that we suppress the `StopIteration` here
        return next(self.it, b'')
    def readable(self):
        return True

then just call io.TextIOWrapper(ReadableIterator(some_iterable_of_bytes)).

o11c
  • 15,265
  • 4
  • 50
  • 75
  • Honestly I'm kind of surprised my own code actually worked here. – o11c Jan 09 '22 at 08:28
  • Thanks for teaching me something new. As an aside, I get `['1\n', '2']`with the following input: `iter((b'1\r\n', b'2'))`. – hilberts_drinking_problem Jan 09 '22 at 08:33
  • 3
    For a file-like object that will be passed to the `csv` module, the TextIOWrapper should be created with `newline=''`, to avoid converting all line break sequences to `'\n'`. (`csv` needs to do its own newline handling to make sure quoted line breaks are handled properly.) – user2357112 Jan 09 '22 at 08:36
  • Would there be any problems from the fact that the `n` parameter to `read` is ignored? – Michal Charemza Jan 09 '22 at 09:41
  • If the iterable has an empty input chunk, i.e.`b''`, then no lines after it would be yielded? – Michal Charemza Jan 09 '22 at 10:59
  • Just realised also, this answer doesn't remove the newline characters from each string? – Michal Charemza Jan 09 '22 at 11:28
  • On not respecting `n`, I've asked a question at https://stackoverflow.com/questions/70640983/textiowrapper-what-if-the-buffers-read-method-returns-more-than-whats-asked – Michal Charemza Jan 09 '22 at 11:52
  • @MichalCharemza: Not removing the newlines is good if you're planning to use this with the `csv` module. – user2357112 Jan 09 '22 at 16:51
  • 1
    Seems buggy. If I insert a `b''` in the middle of the input, it splits there, and if I enter *two* consecutive `b''`, it stops there, failing to provide the rest. [Demo](https://tio.run/##ZVHBasMwDL37K0R3qFNCodtlDHrZrafBtmMhdRK1Ebi2kZVt@frMadOsI8ZgSU9Pfn4OnTTePT0H7ns6B88C5JUqO8FYkCDDFoZDawVplcvK22KTL/NbOkXzIHU@7t1myvPFl7HzthYXe5eyLFOqsiZGeEdTm9LiLl1sxLMmv969vZqI2cuFVOMRioIcSVHoiPaYJ5EjNqyhtCa5aU/YROM0e6S4O8YD0Ml5RjB8as/oJMG@9HUHppLWWNtBZRjjHcF5QZDGJMfSjrCqmIQqY1fX6jdCbENIpJgKCIcP8eH6JPLuAA0yTuMYpWUHDn9Ej@rzwaT/wgdTLvCd8pH5yS0qFZicaEtR9MzEvz/NsqzvfwE). – no comment Jan 09 '22 at 17:27
  • 1
    @nocomment `iter(filter(it))` should fix that I think. – o11c Jan 09 '22 at 18:04
  • I used your class alongside a special decoder, see my answer below. Thanks! – Zach Young Jan 10 '22 at 05:45
2

I used yield and re.finditer.

The yield expression is used when defining a generator function or an asynchronous generator function and thus can only be used in the body of a function definition. Using a yield expression in a function’s body causes that function to be a generator function

Return an iterator yielding match objects over all non-overlapping matches for the RE pattern in string. The string is scanned left-to-right, and matches are returned in the order found. Empty matches are included in the result.
If there are no groups, return a list of strings matching the whole pattern. If there is exactly one group, return a list of strings matching that group. If multiple groups are present, return a list of tuples of strings matching the groups. Non-capturing groups do not affect the form of the result.

The regular expression ([^\r\n]*)(\r\n|\r|\n)? can be divided into two parts to match (that is, two groups). The first group matches the data without \r and \n, and the second group matches \r, \n or \r\n.

import re
find_rule = re.compile("([^\r\n]*)(\r\n|\r|\n)?")


def converter(byte_data):
    left_d = ""
    for d in byte_data:
        # Used to save the previous match result in the `for` loop
        prev_result = None
        # Concatenate the last part of the previous data with the current data,
        # used to deal with the case of `\r\n` being separated.
        d = left_d + d.decode()
        left_d = ""
        # Using `find_rule.finditer` the last value("") will be invalid
        for match_result in find_rule.finditer(d):
            i = match_result.group()
            if not i:
                # The program comes to this point, indicating that i == "", which is the last matching value
                left_d, prev_result = prev_result.group(), None
                continue
            if prev_result:
                if prev_result.group(2) is None:
                    # The program goes here, represented as the last valid value matched
                    left_d = prev_result.group()
                else:
                    # Returns the previous matched value
                    yield prev_result.group()
            # Save the current match result
            prev_result = match_result

    else:
        yield left_d


for i in (converter(iter((
        b'col_1,\r',
        b'\nc',
        b'ol_2\n1',
        b'\n,"val;\r',
        b'ue"\n')))
):
    print(repr(i))

Output:

'col_1,\r\n'
'col_2\n'
'1\n'
',"val;\r'
'ue"\n'
pppig
  • 1,215
  • 1
  • 6
  • 12
1

Maybe I'm missing something important (or subtle) because some of the upvoted answers seem a little more exotic than this, but I think you can decode and chain the bytes and use itertools.groupby to get a generator of strings:

from itertools import groupby, chain

bytes_iter = (
    b'col_1,',
    b'c',
    b'ol_2\n',
    b'1,"val;',
    b'ue"\n'
)

def make_strings(G):
    strings = chain.from_iterable(map(bytes.decode, G))
    for k, g in groupby(strings, key=lambda c: c not in '\n\r'):
        if k:
            yield ''.join(g)                            

list(make_strings(bytes_iter))
# ['col_1,col_2', '1,"val;ue"']
Mark
  • 90,562
  • 7
  • 108
  • 148
  • This fails if the input contains blank lines, and it's very slow, since it processes one byte at a time. – user2357112 Jan 09 '22 at 08:47
  • Hmm, thanks for that @user2357112supportsMonica. Having trouble comparing it with timeit to the other answers...they all seem more or less the same speed even with large input. I wonder if it is slower than the write to disk. – Mark Jan 09 '22 at 08:55
  • @user2357112supportsMonica How much very slow? Please share your benchmark. I had a similar thing once, and was surprised that processing one character at a time was *not* that much slower than the fastest way I could find. – Kelly Bundy Jan 09 '22 at 17:37
  • @KellyBundy: Depends on how big chunks tend to be. If the 4.4-byte average chunk size from the question is actually realistic instead of being reduced for the example, then we're almost reading byte by byte anyway and this option outperforms o11c's. Even 11-byte chunks are enough for o11c's answer to overtake this one in my tests, and with larger chunk sizes, o11c's answer starts outperforming by over an order of magnitude. Timings [here](https://ideone.com/oFbcAU). – user2357112 Jan 09 '22 at 17:58
  • Of course, this answer doesn't handle blank lines correctly, and correctness trumps performance. – user2357112 Jan 09 '22 at 17:59
  • @user2357112supportsMonica Thanks, that's indeed more than in my old case (I think it was around a 4x speed difference). – Kelly Bundy Jan 09 '22 at 18:24
  • Interesting...thanks @user2357112supportsMonica. I'll file this under TIL and remove the answer. – Mark Jan 09 '22 at 18:26
  • 2
    If you delete, you'll prevent others from learning what you just learned... – Kelly Bundy Jan 09 '22 at 18:39
1

Putting @o11c's and @user2357112 supports Monica's contributions together:

import codecs
import csv
import io

def yield_bytes():
    chunks = [
        b'col_1,',
        b'c',
        b'ol_2\n1',
        b',"val',
        b'ue"\n',
        b'Hello,'
        b'\xe4\xb8',
        b'\x96',
        b'\xe7',
        b'\x95\x8c\n'
        b'\n'
    ]

    for chunk in chunks:
        yield(chunk)

decoder = codecs.getincrementaldecoder('utf-8')()

def yield_encoded_bytes():
    s = None
    for bytes in yield_bytes():
        s = decoder.decode(bytes, final=False)
        if s:
            yield s.encode('utf-8')

class ReadableIterator(io.IOBase):
    def __init__(self, it):
        self.it = iter(it)
    def read(self, n):
        # ignore argument, nobody actually cares
        # note that it is *critical* that we suppress the `StopIteration` here
        return next(self.it, b'')
    def readable(self):
        return True

f = io.TextIOWrapper(ReadableIterator(yield_encoded_bytes()))

for row in csv.reader(f):
    print(row)

and I get:

['col_1', 'col_2']
['1', 'value']
['Hello', '世界']
[]
Zach Young
  • 10,137
  • 4
  • 32
  • 53