4

I'm dipping my toes into Python threading. I've created a supplier thread that returns me character/line data from a *nix (serial) /dev via a Queue.

As an exercise, I would like to consume the data from the queue one line at a time (using '\n' as the line terminator).

My current (simplistic) solution is to put() only 1 character at a time into the queue, so the consumer will only get() one character at a time. (Is this a safe assumption?) This approach currently allows me to do the following:

...
return_buffer = []
while True:
    rcv_data = queue.get(block=True)
    return_buffer.append(rcv_data)        
    if rcv_data == "\n":
        return return_buffer

This seems to be working, but I can definitely cause it to fail when I put() 2 characters at a time.

I would like to make the receive logic more generic and able to handle multi-character put()s.

My next approach would be to rcv_data.partition("\n"), putting the "remainder" in yet another buffer/list, but that will require juggling the temporary buffer alongside the queue. (I guess another approach would be to only put() one line at a time, but where's the fun in that?)

Is there a more elegant way to read from a queue one line at a time?

JS.
  • 14,781
  • 13
  • 63
  • 75
  • Does this previously similar question that I answered cover this question at all? http://stackoverflow.com/questions/11196367/processing-single-file-from-multiple-processes-in-python – jdi Sep 05 '12 at 21:12
  • Yes, as I mentioned, that would work. It would also "tie" me to that approach for consuming the data. This is terminal data, so to start parsing the output at "prompt" characters would require re-writing the producer. I would like to keep a generic /dev reader thread and be able to change how I retrieve data by varying the queue consumer method. I know how to do this the "hard" way, but I'm hoping to learn a more elegant approach if one exists. – JS. Sep 05 '12 at 21:12
  • Oh, I assumed you wanted the queue to have multiple works processing it. Is this only for a single worker to consume characters? – jdi Sep 05 '12 at 21:13
  • Yes, there is only a single consumer. For multiple consumers I agree that put()ing per line would be the way to go. This is probably an overkill approach for this problem, but I'm using this approach to learn more about threading in order to attack bigger problems lurking on the horizon. – JS. Sep 05 '12 at 21:16

4 Answers4

3

This may be a good use for a generator. It will pick up exactly where it left off after yield, so reduces the amount of storage and buffer swapping you need (I cannot speak to its performance).

def getLineGenerator(queue, splitOn):
    return_buffer = []
    while True:
        rcv_data = queue.get(block=True) # We can pull any number of characters here.
        for c in rcv_data:
            return_buffer.append(c)
            if c == splitOn:
                yield return_buffer
                return_buffer = []


gen = getLineGenerator(myQueue, "\n")
for line in gen:
    print line.strip()

Edit:

Once J.F. Sebastian pointed out that the line separator could be multi-character I had to solve that case as well. I also used StringIO from jdi's answer. Again I cannot speak to the efficiency, but I believe it is correct in all cases (at least the ones I could think of). This is untested, so would probably need some tweaks to actually run. Thanks to J.F. Sebastian and jdi for their answers which ultimately lead to this one.

def getlines(chunks, splitOn="\n"):
    r_buffer = StringIO()
    for chunk in chunks
        r_buffer.write(chunk)
        pos = r_buffer.getvalue().find(splitOn) # can't use rfind see the next comment
        while pos != -1: # A single chunk may have more than one separator
            line = r_buffer.getvalue()[:pos + len(splitOn)]
            yield line
            rest = r_buffer.getvalue().split(splitOn, 1)[1]
            r_buffer.seek(0)
            r_buffer.truncate()
            r_buffer.write(rest)
            pos = rest.find(splitOn) # rest and r_buffer are equivalent at this point. Use rest to avoid an extra call to getvalue

    line = r_buffer.getvalue();
    r_buffer.close() # just for completeness
    yield line # whatever is left over.

for line in getlines(iter(queue.get, None)): # break on queue.put(None)
    process(line)
grieve
  • 13,220
  • 10
  • 49
  • 61
  • Brilliant! In my case, performance isn't really an issue. Great idea! – JS. Sep 05 '12 at 21:42
  • your generator produces lists of single characters. list has no .strip() method. You could add `yield ''.join(return_buffer)` to fix it – jfs Sep 05 '12 at 22:13
  • @J.F.Sebastian: I did indeed modify the 'yield' as you suggest above. – JS. Sep 05 '12 at 23:29
  • @J.F.Sebastian: If splitOn is more than a single character the code will not work correctly, and would require more changes making it more complicated. I may look at that when I have more time. As far as returning a list of characters. I was trying to modify the original posters code as little as possible. :) – grieve Sep 06 '12 at 01:35
  • @J.F.Sebastian: Just saw you answer below. That is indeed the extra complication I was thinking of. :) – grieve Sep 06 '12 at 01:39
  • @grieve: Your first code example is more elegant then your second example and the code in my answer. The first example is simple, readable and correct for `len(splitOn) == 1` (though it doesn't follow pep-8 for naming conventions). btw, you *can* use `.rfind()` even if there are multiple separators in a chunk e.g., [my answer](http://stackoverflow.com/a/12290485/4279) supports it. there is a typo: `s/getValue/getvalue/` – jfs Sep 06 '12 at 03:04
  • @J.F.Sebastian: My fingers always want to type in camel case, when coding. Thanks for catching that. I have corrected it. In your answer you can use rfind, but I cannot because I am using r_buffer.getvalue()[:pos + len(splitOn)], so pos has to point to the first occurrence. – grieve Sep 06 '12 at 03:14
2

If your specific use-case producer needs to put to the queue character by character, then I suppose I can't see anything wrong with getting them in a loop in the consumer. But you can probably get better performance by using a StringIO object as the buffer.

from cStringIO import StringIO
# python3: from io import StringIO

buf = StringIO()

The object if file-like, so you can write to it, seek it, and call getvalue() at any time to get the complete string value in the buffer. This will most likely give you much better performance than having to constantly grow a list, join it to a string, and clear it.

return_buffer = StringIO()
while True:
    rcv_data = queue.get(block=True)
    return_buffer.write(rcv_data)        
    if rcv_data == "\n":
        ret = return_buffer.getvalue()
        return_buffer.seek(0)
        # truncate, unless you are counting bytes and
        # reading the data directly each time
        return_buffer.truncate()

        return ret
jdi
  • 90,542
  • 19
  • 167
  • 203
  • rcv_data may contain multiple bytes. At least that was my impression of what the OP wanted to do. Feed in bytes as he gets them, instead of one at a time. – grieve Sep 05 '12 at 21:33
  • AH yea I overlooked that part. I was just focusing on the buffer I guess – jdi Sep 05 '12 at 21:38
  • +1 for teaching me about StringIO I hadn't run across that before. – grieve Sep 05 '12 at 21:46
  • @grieve: I think StringIO, combined with your answer would be great. Feel free to add it into yours if you want. – jdi Sep 05 '12 at 21:47
  • you probably want `return_buffer.seek(0)` before .truncate() instead of .reset(). There is no .reset() in io.StringIO or StringIO.StringIO. It is present only in cStringIO – jfs Sep 05 '12 at 22:27
1

The queue returns exactly what you put in it. If you put fragments you get fragments. If you put lines you get lines.

To consume line by line if partial lines in the input are allowed and could be completed later you need a buffer either explicit or implicit to store partial lines:

def getlines(fragments, linesep='\n'):
    buff = []
    for fragment in fragments:
        pos = fragment.rfind(linesep)
        if pos != -1: # linesep in fragment
           lines = fragment[:pos].split(linesep)
           if buff: # start of line from previous fragment
              line[0] = ''.join(buff) + line[0] # prepend
              del buff[:] # clear buffer
           rest = fragment[pos+len(linesep):]
           if rest:
              buff.append(rest)
           yield from lines
        elif fragment: # linesep not in fragment, fragment is not empty
           buff.append(fragment)

    if buff:
       yield ''.join(buff) # flush the rest

It allows fragments, linesep of arbitrary length. linesep should not span several fragments.

Usage:

for line in getlines(iter(queue.get, None)): # break on queue.put(None)
    process(line)
jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • Ok, so the Queue returns data exactly has one puts it in. Good to know. Thanks! – JS. Sep 05 '12 at 23:27
  • @JS.: I'd summarize it as: "you need a buffer" instead. If `len(linesep) == 1` and performance doesn't matter for large fragments then [@grieve's answer](http://stackoverflow.com/a/12290064/4279) is preferable. – jfs Sep 06 '12 at 00:28
  • What if linesep is split across two fragments? Also there I am a little confused with the function being named lines, and apparently there is a scoped variable named lines. – grieve Sep 06 '12 at 01:43
  • @grieve: Good catch. There is no recursive calls so using `lines` accidently might have worked. I've edited the answer. I've added remark about linesep and multiple fragments. – jfs Sep 06 '12 at 02:06
  • Nice use of iter on top of queue.get. I stole that for my edit above. – grieve Sep 06 '12 at 02:38
1

It's important to note that there could be multiple lines in the queue. This function will return (and optionally print) all the lines from a given queue:

def getQueueContents(queue, printContents=True):
    contents = ''
    # get the full queue contents, not just a single line
    while not queue.empty():
        line = queue.get_nowait()
        contents += line
        if printContents:
            # remove the newline at the end
            print line[:-1]
    return contents
blented
  • 2,699
  • 2
  • 23
  • 17