-1

I have below code where I execute 4 commands using subprocess.Popen. I am processing log files using below code. When I process the files sequentially using below code, it works fine. Now I created thread, one for each file for parallelism and bound below function to each thread. But some of them gives me desired output and some throws error.

Code:

def process_log_file(file):
    proc= subprocess.Popen(['python27', 'countmapper.py',"C:\\pythonPrograms\\04-03-2014\\17IL\\"+file],cwd="C:\pythonPrograms\\",stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    out, err = proc.communicate()
    sortedop= subprocess.Popen(['sort'],cwd="C:\pythonPrograms\\",stdout=subprocess.PIPE,stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
    out, err = sortedop.communicate(out)
    countReducer= subprocess.Popen(['python27', 'countreducer.py'],cwd="C:\pythonPrograms\\",stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
    out, err = countReducer.communicate(out)
    countpostprocesser= subprocess.Popen(['python27', 'countpostprocesser.py'],cwd="C:\pythonPrograms\\",stdin=subprocess.PIPE,stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    out, err = countpostprocesser.communicate(out)
    jsondata2=json.loads(out)
    fd=open(file+".json","w")
    json.dump(jsondata2,fd,sort_keys=True,indent=2)
    fd.close()
    return

Error Received:

Exception in thread Thread-42:
Traceback (most recent call last):
  File "C:\Python27\lib\threading.py", line 810, in __bootstrap_inner
    self.run()
  File "C:\Python27\lib\threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\pythonPrograms\counts_batch_threading.py", line 45, in process_log_file
    jsondata2=json.loads(out)
  File "C:\Python27\lib\json\__init__.py", line 338, in loads
    return _default_decoder.decode(s)
  File "C:\Python27\lib\json\decoder.py", line 365, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "C:\Python27\lib\json\decoder.py", line 383, in raw_decode
    raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded

Code used for thread creation:

for file in glob.glob("SAMPLE*.log"):
        thread1 = threading.Thread(target=process_log_file,args=(str(file),))
        threads.append(thread1)
        thread1.start()        

# Wait for all threads to complete
for t in threads:
    t.join()

Can someone help me on this?

Codelearner
  • 241
  • 2
  • 3
  • 16
  • Can you show the manner in which you are creating the thread? Could it be that the first output is not yet ready when you try to read it? – mdurant Jul 22 '14 at 20:42
  • What is the value of `out` at each stage? Is it what you expect? – Dunes Jul 22 '14 at 20:42
  • @mdurant added the requested code – Codelearner Jul 22 '14 at 20:46
  • Are any of these functions modifying anything shared—like, say, writing to a temporary file with a fixed pathname? – abarnert Jul 22 '14 at 20:47
  • @dunes I am able to get the desired output(.json file) for few threads. Some threads shows this error. – Codelearner Jul 22 '14 at 20:47
  • @abarnert No . Nothing is shared between threads. Each generates different output file. – Codelearner Jul 22 '14 at 20:49
  • I suggest, wrapping the `loads` function in a try catch statement and logging the value you tried to decode, or raising another exception, with that as a message. eg `raise ValueError("could not decode {}".format(out))`. You are linking the processes correctly, the problem is that one or more of the subprocesses or one of the input files is buggy. – Dunes Jul 22 '14 at 20:53
  • @Dunes Yeah you are correct. I added 'try' block . I think the next command is getting executed before previous command completes. – Codelearner Jul 22 '14 at 21:04
  • @Codelearner: That doesn't make any sense. What did you see when you added the `try` block? What's in the `out`? – abarnert Jul 22 '14 at 21:38
  • @abarnert.its showing the error in script. ~could not decode Traceback (most recent call last): File "countpostprocesser.py", line 140, in main(sys.stdin) File "countpostprocesser.py", line 92, in main key, value = line.split('\t') ValueError: need more than 1 value to unpack~ – Codelearner Jul 22 '14 at 21:39
  • @Codelearner: Yes, you're showing us the error, but you're not showing us the `out` value, which is what Dunes asked for. – abarnert Jul 22 '14 at 21:40
  • 1
    @abarnert Since the script has caused exception. 'out' does not have any value which led to error mentioned in the question. – Codelearner Jul 22 '14 at 21:45
  • How many files are you processing? You could be hitting an upper limit on the number of processes you can spawn. I believe a Win32 process can have at most 2048 child processes. – Dunes Jul 22 '14 at 22:10
  • 1
    @Codelearner: That's nonsense. The line that's raising the exception is `json.loads(out)`. That means it's successfully completed the previous line, which means `out` _does_ have a value, and that value is exactly what we need to see if you want us to debug why you're getting an exception. – abarnert Jul 22 '14 at 22:21
  • @Dunes I am processing 44 files. I dont think it is exceeding the limit you mentioned. – Codelearner Jul 23 '14 at 03:11
  • @abarnert I think its possible as if `countReducer` is failed it sets the out to null and err to the exception if any which will cause `countpostprocesser` to fail in which case also out will be set to null and err to the exception. Thus line which actually throws exception will be `json.loads(out)`. – Codelearner Jul 23 '14 at 15:15
  • 1
    @Codelearner: How about if, instead of just randomly guessing at what might happen, you just print out the `out` and `err` values and give them to us, as Dunes asked for yesterday? Even better, keep track of the `out` and `err` values from all of the calls and print them all out on exception so you can see exactly what went wrong and where. Then we can look at what `countreducer.py` got as its input and figure out why it produced no output—most likely, it's either a bug in `countreducer.py`, or it's getting bogus input because of a bug in `countmapper.py`, and you can debug that once you know. – abarnert Jul 23 '14 at 17:55
  • @abarnert Its is not practical to give all the `out` and `err` values here ( 44 log files each with 3 values of `out and `err`. Moreover the intermediate output is around 20 MB). As mentioned in post, issue is for only some of the threads. and if done sequentially, everything works fine( there is no issue with '.py' files so). I think the issue can be with Popen, refer [this](http://bugs.python.org/issue1236) – Codelearner Jul 23 '14 at 18:17
  • @Codelearner: Please read [How to create a Minimal, Complete, and Verifiable Example](http://stackoverflow.com/help/mcve). Nobody wants you to give us 20MB of data. Whether the bug is in your code or in the stdlib or whatever, knowing the inputs and outputs at each step when it fails means you can repro that step on its own and see if it still fails. That's the first step toward finding the problem, which is the first step toward solving it. – abarnert Jul 23 '14 at 18:32
  • @Codelearner: And meanwhile, you really thing a bug in Python 2.5 that was fixed in 2.6/3.1 is responsible for your code not working in Python 2.7? – abarnert Jul 23 '14 at 18:34
  • `err` is **always** `None` in your code and `out` may contain an error message due to `stderr=subprocess.STDOUT`. **Show us `repr(out)` that leads to ValueError** e.g., `try: jsondata2=json.loads(out) except ValueError: print >>sys.stderr, repr(out); raise` – jfs Aug 06 '14 at 15:42

1 Answers1

-1

Your error could be coming from the fact that Popen is non-blocking, so they try to read the results instantly instead of waiting for the program to terminate. Try instead using check_call, which is a blocking function and lets you wait to get the result.

Philip Massey
  • 1,401
  • 3
  • 14
  • 24
  • 1
    You can make the Popen instance block by calling its wait or communicate method. Refer [here](http://stackoverflow.com/questions/21936597/blocking-and-non-blocking-subprocess-calls). – Codelearner Jul 22 '14 at 21:17
  • Ah I didn't know `communicate()` was also blocking. Then your code looks fine, are you sure there's no inter-thread communication / thrashing? No filename conflicts or static functions being call twice and overwriting resources? – Philip Massey Jul 22 '14 at 21:23
  • Yes. I have posted the entire code. There are no resources being shared. – Codelearner Jul 22 '14 at 21:25