5

I have a program that can be started or stopped at any moment. The program is used to download data from web pages. First, a user will define a bunch of web pages in a .csv file, then save that .csv file, then start the program. That program will read the .csv file and turn it into a list of jobs. Next, the jobs are split among 5 separate downloader functions that work in parallel but may take different times to download.

After a downloader (which there are 5 of) finished downloading a web page, I need it to open the .csv file and remove the link. This way, as time passes, the .csv file will get smaller and smaller. The issue is that sometimes two download functions will try to update the .csv file at the same time and will cause the program to crash. How can I deal with this?

user1367204
  • 4,549
  • 10
  • 49
  • 78
  • 2
    That seems like a particularly difficult way to handle the problem. What are these jobs? Perhaps that's the best way to handle the problem. Consume the csv once to create jobs (perhaps stored in a database or separate files) and manage those. If you want to do the csv thing, it should only be managed by a single entity (that master program dealing with the jobs perhaps) that is sent responses from the workers and does the csv file update. – tdelaney Jun 07 '17 at 20:01
  • 1
    You don't. Handling side-effects from multiple (parallel) points of control is a recipe for disaster. If it's the *only way to make it work* you can implement locks as some of your current answers suggest, however that doesn't seem to be the case here. You should implement a manager/worker pattern and have the manager handle IO, pass jobs off to the workers, and receive results from the workers. – Adam Smith Jun 07 '17 at 20:12

3 Answers3

4

If this is a continuation of your project from yesterday you already have your download list in memory - just remove the entries from the loaded list as their processes finish download and only write down the whole list over the input file once you're exiting the 'downloader'. There is no reason to constantly write down the changes.

If you want to know (say from an external process) when a url gets downloaded even while your 'downloader' is running, write in a downloaded.dat a new line each time a process returns that download was successful.

Of course, in both cases, write from within your main process/thread so you don't have to worry about mutex.

UPDATE - Here's how to do it with an additional file, using the same code base as yesterday:

def init_downloader(params):  # our downloader initializator
    downloader = Downloader(**params[0])  # instantiate our downloader
    downloader.run(params[1])  # run our downloader
    return params  # job finished, return the same params for identification

if __name__ == "__main__":  # important protection for cross-platform use

    downloader_params = [  # Downloaders will be initialized using these params
        {"port_number": 7751},
        {"port_number": 7851},
        {"port_number": 7951}
    ]
    downloader_cycle = cycle(downloader_params)  # use a cycle for round-robin distribution

    with open("downloaded_links.dat", "a+") as diff_file:  # open your diff file
        diff_file.seek(0)  # rewind the diff file to the beginning to capture all lines
        diff_links = {row.strip() for row in diff_file}  # load downloaded links into a set
        with open("input_links.dat", "r+") as input_file:  # open your input file
            available_links = []
            download_jobs = []  # store our downloader parameters + a link here
            # read our file line by line and filter out downloaded links
            for row in input_file:  # loop through our file
                link = row.strip()  # remove the extra whitespace to get the link
                if link not in diff_links:  # make sure link is not already downloaded
                    available_links.append(row)
                    download_jobs.append([next(downloader_cycle), link])
            input_file.seek(0)  # rewind our input file
            input_file.truncate()  # clear out the input file
            input_file.writelines(available_links)  # store back the available links
            diff_file.seek(0)  # rewind the diff file
            diff_file.truncate()  # blank out the diff file now that the input is updated
        # and now let's get to business...
        if download_jobs:
            download_pool = Pool(processes=5)  # make our pool use 5 processes
            # run asynchronously so we can capture results as soon as they ar available
            for response in download_pool.imap_unordered(init_downloader, download_jobs):
                # since it returns the same parameters, the second item is a link
                # add the link to our `diff` file so it doesn't get downloaded again
                diff_file.write(response[1] + "\n")
        else:
            print("Nothing left to download...")

The whole idea is, as I wrote in the comment, to use a file to store downloaded links as they get downloaded, and then on the next run to filter out the downloaded links and update the input file. That way even if you forcibly kill it, it will always resume where it left off (except for the partial downloads).

zwer
  • 24,943
  • 3
  • 48
  • 66
  • It is. I need to continuously update the file because there could be about 40,000 jobs to do, each taking about a minute, and the chance of my computer being shut off (or the program being interrupted) in the middle of this program is high. Therefore I think I should continuously update the file. – user1367204 Jun 07 '17 at 20:12
  • @user1367204 so you're having each worker process parse a 40k entry csv, find the *one* entry it's supposed to delete, and overwrite the file? That sounds like a horror show. There's a number of better implementations, one of which being zwer's second paragraph here about keeping the csv intact but populating a "done" list as well. – Adam Smith Jun 07 '17 at 20:15
  • @user1367204 - then use a 'diff' file - write a new line for each link you download in that file, then when you start your downloader the next time, after loading the 'input' file with the links, load this 'diff' file, remove the 'diff' links from 'input' links, save the 'input' file with this filtered list and blank the 'diff' file. I'll write some pseudo code a bit later if you need it better visualized... – zwer Jun 07 '17 at 20:22
  • @zwer, thanks for being so helpful. I think the diff file might have the same issue, namely, it might be a problem if several `downloader` functions try to write to it at the same time. – user1367204 Jun 07 '17 at 20:28
  • @user1367204 right, that's why we're all suggesting you not let the `downloader` functions touch the filesystem at all. – Adam Smith Jun 07 '17 at 20:31
  • @user1367204 - don't write to the diff file from your sub-processes, write from your main process (the 'dispatcher' that loads the 'input' links in the first place) when a process finishes downloading and returns successful. An alternative is to use `multiprocessing.Lock` and to pass it around your processes so that only one subprocess accesses the 'diff' file at the time, but that's needless complicating of something rather simple. – zwer Jun 07 '17 at 20:32
  • 1
    @zwer Thanks, I looked into what you are talking about and it made the most sense. I'm going to avoid locking and instead use a worker-manager setup. – user1367204 Jun 07 '17 at 22:30
  • @user1367204 - check the update as one way to do it... it can be more efficient and less verbose, but this is for demonstration purposes and I don't want to introduce new concepts. – zwer Jun 07 '17 at 23:23
1

Use a 'Lock' from the multiprocessing library to serialize operations with the file.

You will want to pass the lock into each process. Each process should 'acquire' the lock before it opens the file and 'release' the lock after it closes the file.

https://docs.python.org/2/library/multiprocessing.html

J. Darnell
  • 519
  • 7
  • 15
0

Look into locking files in python. Locking a file will make the next process wait until the file is unlocked to modify it. Locking files is platform specific so you will have to use whichever method works for the os you are on. If you need to figure out the os use a switch statement like this.

import os

def my_lock(f):
    if os.name == "posix":
        # Unix or OS X specific locking here
    elif os.name == "nt":
        # Windows specific locking here
    else:
        print "Unknown operating system, lock unavailable"

Then I would look at this article and figure out exactly how you want to implement your lock.

Benjamin Commet
  • 350
  • 3
  • 8
  • Why implement a custom lock? [`multiprocessing.Lock`](https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing.lock#multiprocessing.Lock) exists for this purpose. – Adam Smith Jun 07 '17 at 20:23