1

I have a list of Web URLs to image files. I wish to fetch all the image files and write them each to the appropriate directory. The images are all PNGs. In a test program I am able to successfully fetch a single image synchronously:

import urllib.request
import shutil

# This example will download a single traffic image.

# Spoof a well-known browser so servers won't think I am a bot.
class AppURLopener(urllib.request.FancyURLopener):
    version = "Mozilla/5.0"

def getTrafficImage(fromUrl, toPath): 
    baseUrl = "https://mass511.com/map/Cctv/"
    url = f"{baseUrl}{fromUrl}"
    opener = AppURLopener()
    # Request image file from remote server and save to disk locally.
    with opener.open(url) as response, open(toPath, 'wb') as out_file:
        shutil.copyfileobj(response, out_file)

# Camera label on MASS511:
#   I-93-SB-Somerville-Exit 26 Storrow Dr
url = "406443--1"

# Path to store the file
file_name = "C:/Users/pchernoch/projects/HackWeek/traffic-feeds/I-93-SB-Somerville-Exit.png"

getTrafficImage(url, file_name)

How can I repeat this for many URLs and have each fetch performed asynchronously?

If any image cannot be fetched or has an error (like a timeout), I wish to log that error to the console but not stop processing the other files.

I am using Python 3.6.2. My preference is to use the new async/await approach and the aiohttp and asyncio libraries. However, any popular async library (.e.g. curio) will do. I have only been programming in Python for one week, so much is confusing. This answer looks useful, but I do not know how to make use of it: asyncio web scraping 101: fetching multiple urls with aiohttp

Goal: The task to be accomplished is capturing traffic camera images from many Boston cameras every few seconds for a set period of time.

The following is the program I am trying to write, with TODO: marks at the places I am stumped. The program runs on a timer. Every few seconds it will capture another set of images from each of the traffic cameras. The timer loop is not asynchronous, but I want the image capture of many URLs to be async.

import sys, os, datetime, threading, time
import urllib.request
import shutil

# ==================
#    Configuration
# ==================

# Identify the name of the camera with its URL on Mass511 web site
CAMERA_URLS = {
  "I-93-SB-Somerville-Exit 26 Storrow Dr": "406443--1",
  "Road STWB-WB-TNL-Storrow WB": "1407--1",
  "I-93-NB-Dorchester-between x14 & x15 Savin": "406557"
  }

# All cameras have URLs that begin with this prefix
BASE_URL = "https://mass511.com/map/Cctv/"

# Store photos in subdirectories under this directory
PHOTO_STORAGE_DIR = "C:/Users/pchernoch/projects/HackWeek/traffic-feeds"

# Take this many pictures from each camera
SNAP_COUNT = 5

# Capture new set of pictures after this many seconds 
POLLING_INTERVAL_SECONDS = 2

# ==================
#      Classes
# ==================

def logMessage(msg):
    sys.stdout.write(msg + '\n')
    sys.stdout.flush()

# Change the presumed name of the browser to fool robot detectors
class AppURLopener(urllib.request.FancyURLopener):
    version = "Mozilla/5.0"

# Can Read file from one camera and save to a file
class Camera(object):
  def __init__(self, sourceUrl, targetDirectory, name, extension):
    self.SourceUrl = sourceUrl
    self.TargetDirectory = targetDirectory
    self.Name = name
    self.Extension = extension

  def TargetFile(self, time):
    timeStamp = time.strftime("%Y-%m-%d-%H-%M-%S") 
    return f"{self.TargetDirectory}/{timeStamp}.{self.Extension}"

  def Get(self):
      fileName = self.TargetFile(datetime.datetime.now())
      logMessage(f"  - For camera {self.Name}, get {self.SourceUrl} and save as {fileName}")
      # TODO: GET IMAGE FILE FROM WEB AND SAVE IN FILE HERE

# Can poll multiple cameras once
class CameraPoller(object):
  def __init__(self, urlMap, baseUrl, rootDir):
    self.CamerasToRead = []
    for cameraName, urlSuffix in urlMap.items():
      url = f"{baseUrl}{urlSuffix}"
      targetDir = f"{rootDir}/{cameraName}"
      if not os.path.exists(targetDir):
        os.makedirs(targetDir)
      camera = Camera(url, targetDir, cameraName, "png")
      self.CamerasToRead.append(camera)

  def Snap(self):
    # TODO: MAKE THIS LOOP ASYNC
    for camera in self.CamerasToRead:
      camera.Get()

# Repeatedly poll all cameras, then sleep
def get_images(poller, pollingInterval, snapCount):
    next_call = time.time()
    for i in range(0, snapCount):
        now = datetime.datetime.now()
        timeString = now.strftime("%Y-%m-%d-%H-%M-%S") 
        logMessage(f"\nPoll at {timeString}")
        poller.Snap()
        next_call = next_call + pollingInterval
        time.sleep(next_call - time.time())

# ==================
#    Application
# ==================

if __name__ == "__main__":

    cameraPoller = CameraPoller(CAMERA_URLS, BASE_URL, PHOTO_STORAGE_DIR)

    # Poll cameras i na separate thread. It is a daemon, so when the main thread exits, it will stop.
    timerThread = threading.Thread(target=get_images, args=([cameraPoller, POLLING_INTERVAL_SECONDS, SNAP_COUNT]))
    timerThread.daemon = False
    timerThread.start()

    timerThread.join()

    endTime = datetime.datetime.now()
    endTimeString = endTime.strftime("%Y-%m-%d-%H-%M-%S") 
    logMessage(f"Exiting Poller at {endTimeString}")
Paul Chernoch
  • 5,275
  • 3
  • 52
  • 73
  • What have you tried so far? What specific problem(s) are you having? – dirn Aug 05 '17 at 21:18
  • What I have tried is doing it synchronously. It is the async part that I can't figure out. The syntax and workflow is unfamiliar, and in the examples I have looked at I am not sure where to put the code to write to a file. This includes the stackoverflow answer I linked to in my question. – Paul Chernoch Aug 05 '17 at 21:29
  • 1
    Your goal is to fetch URLs in parallel. Have you also looked at the `multiprocessing` module? E.g., as described in https://stackoverflow.com/questions/16181121/a-very-simple-multithreading-parallel-url-fetching-without-queue. – larsks Aug 05 '17 at 21:55
  • Your link looks promising. I will give it a try. – Paul Chernoch Aug 05 '17 at 23:52
  • @larsks - Instead of multiprocessing, I used ideas from another answer to the same question. – Paul Chernoch Aug 06 '17 at 00:32

2 Answers2

1

Here is the same code, with the URL grabbing done using ThreadPoolExecutor. It required the fewest changes to my code. Thanks to @larsks for pointing me in the right direction.

import sys, os, datetime, threading, time
import urllib.request
from concurrent.futures import ThreadPoolExecutor
import shutil

# ==================
#    Configuration
# ==================

# Identify the name of the camera with its URL on Mass511 web site
CAMERA_URLS = {
  "I-93-SB-Somerville-Exit 26 Storrow Dr": "406443--1",
  "Road STWB-WB-TNL-Storrow WB": "1407--1",
  "I-93-NB-Dorchester-between x14 & x15 Savin": "406557"
  }

# All cameras have URLs that begin with this prefix
BASE_URL = "https://mass511.com/map/Cctv/"

# Store photos in subdirectories under this directory
PHOTO_STORAGE_DIR = "C:/Users/pchernoch/projects/HackWeek/traffic-feeds"

# Take this many pictures from each camera
SNAP_COUNT = 5

# Capture new set of pictures after this many seconds 
POLLING_INTERVAL_SECONDS = 2

# ==================
#      Classes
# ==================

def logMessage(msg):
    sys.stdout.write(msg + '\n')
    sys.stdout.flush()

# Change the presumed name of the browser to fool robot detectors
class AppURLopener(urllib.request.FancyURLopener):
    version = "Mozilla/5.0"

# Can Read file from one camera and save to a file
class Camera(object):
  def __init__(self, sourceUrl, targetDirectory, name, extension):
    self.SourceUrl = sourceUrl
    self.TargetDirectory = targetDirectory
    self.Name = name
    self.Extension = extension

  def TargetFile(self, time):
    timeStamp = time.strftime("%Y-%m-%d-%H-%M-%S") 
    return f"{self.TargetDirectory}/{timeStamp}.{self.Extension}"

  def Get(self):
      fileName = self.TargetFile(datetime.datetime.now())
      message = f"  - For camera {self.Name}, get {self.SourceUrl} and save as {fileName}"
      # Request image file from remote server and save to disk locally.
      opener = AppURLopener()
      with opener.open(self.SourceUrl) as response, open(fileName, 'wb') as out_file:
        shutil.copyfileobj(response, out_file)
      logMessage(message)
      return message

def snap_picture(camera):
  return camera.Get()


# Can poll multiple cameras once
class CameraPoller(object):
  def __init__(self, urlMap, baseUrl, rootDir):
    self.CamerasToRead = []
    for cameraName, urlSuffix in urlMap.items():
      url = f"{baseUrl}{urlSuffix}"
      targetDir = f"{rootDir}/{cameraName}"
      if not os.path.exists(targetDir):
        os.makedirs(targetDir)
      camera = Camera(url, targetDir, cameraName, "png")
      self.CamerasToRead.append(camera)

  def Snap(self):
    with ThreadPoolExecutor(max_workers=10) as executor:
      results = executor.map(snap_picture, self.CamerasToRead)

# Repeatedly poll all cameras, then sleep
def get_images(poller, pollingInterval, snapCount):
    next_call = time.time()
    for i in range(0, snapCount):
        now = datetime.datetime.now()
        timeString = now.strftime("%Y-%m-%d-%H-%M-%S") 
        logMessage(f"\nPoll at {timeString}")
        poller.Snap()
        next_call = next_call + pollingInterval
        time.sleep(next_call - time.time())

# ==================
#    Application
# ==================

if __name__ == "__main__":

    cameraPoller = CameraPoller(CAMERA_URLS, BASE_URL, PHOTO_STORAGE_DIR)

    # Poll cameras i na separate thread. It is a daemon, so when the main thread exits, it will stop.
    timerThread = threading.Thread(target=get_images, args=([cameraPoller, POLLING_INTERVAL_SECONDS, SNAP_COUNT]))
    timerThread.daemon = False
    timerThread.start()

    timerThread.join()

    endTime = datetime.datetime.now()
    endTimeString = endTime.strftime("%Y-%m-%d-%H-%M-%S") 
    logMessage(f"Exiting Poller at {endTimeString}")
Paul Chernoch
  • 5,275
  • 3
  • 52
  • 73
0

Here is an asyncio version. Untested, but shouldn't be too far off.

With asyncio, basically you launch all your tasks, and gather the results with asyncio.gather. But starting tons of requests concurrently won't work, so I also added a Semaphore in CameraPoller: this ensures that at most 10 concurrent request will run

import asyncio
import aiohttp
import datetime
import time


# ==================
#    Configuration
# ==================

# Identify the name of the camera with its URL on Mass511 web site
CAMERA_URLS = {
  "I-93-SB-Somerville-Exit 26 Storrow Dr": "406443--1",
  "Road STWB-WB-TNL-Storrow WB": "1407--1",
  "I-93-NB-Dorchester-between x14 & x15 Savin": "406557"
}

# All cameras have URLs that begin with this prefix
BASE_URL = "https://mass511.com/map/Cctv/"

# Store photos in subdirectories under this directory
PHOTO_STORAGE_DIR = "C:/Users/pchernoch/projects/HackWeek/traffic-feeds"

# Take this many pictures from each camera
SNAP_COUNT = 5

# Capture new set of pictures after this many seconds 
POLLING_INTERVAL_SECONDS = 2

USER_AGENT = 'Mozilla/5.0'

# ==================
#      Classes
# ==================

def logMessage(msg):
  print(msg)

# Can Read file from one camera and save to a file
class Camera:
  def __init__(self, session, sourceUrl, targetDirectory, name, extension):
    self.session = session
    self.SourceUrl = sourceUrl
    self.TargetDirectory = targetDirectory
    self.Name = name
    self.Extension = extension

  def TargetFile(self, time):
    timeStamp = time.strftime("%Y-%m-%d-%H-%M-%S") 
    return f"{self.TargetDirectory}/{timeStamp}.{self.Extension}"

  async def Get(self):
      fileName = self.TargetFile(datetime.datetime.now())
      message = 
      # Request image file from remote server
      async with self.session.get(self.SourceUrl, headers={'User-Agent': USER_AGENT}) as resp:
        data = await resp.read()
      # and save to disk locally.
      with open(fileName, 'wb') as out_file:
        out_file.write(data)
      logMessage(f"  - For camera {self.Name}, get {self.SourceUrl} and save as {fileName}")


# Can poll multiple cameras once
class CameraPoller:
  def __init__(self, session, urlMap, baseUrl, rootDir, concurrency=10):
    self.CamerasToRead = []
    for cameraName, urlSuffix in urlMap.items():
      url = f"{baseUrl}{urlSuffix}"
      targetDir = f"{rootDir}/{cameraName}"
      if not os.path.exists(targetDir):
        os.makedirs(targetDir)
      camera = Camera(session, url, targetDir, cameraName, "png")
      self.CamerasToRead.append(camera)

    self.sem = asyncio.BoundedSemaphore(concurrency)

  async def _snap(self, camera):
    async with self.sem:
      await camera.Get()

  async def Snap(self):
    await asyncio.gather(*(self._snap(cam) for cam in self.CamerasToRead))

  # Repeatedly poll all cameras, then sleep
  async def poll(self, pollingInterval, snapCount):
    loop = asyncio.get_event_loop()
    next_call = loop.time()
    for i in range(0, snapCount):
        now = datetime.datetime.now()
        timeString = now.strftime("%Y-%m-%d-%H-%M-%S") 
        logMessage(f"\nPoll at {timeString}")

        await self.Snap()

        next_call = next_call + pollingInterval
        await asyncio.sleep(next_call - loop.time())

# ==================
#    Application
# ==================

async def main():
  async with aiohttp.ClientSession as session:
    poller = ameraPoller(session, CAMERA_URLS, BASE_URL, PHOTO_STORAGE_DIR)
    await poller.poll(POLLING_INTERVAL_SECONDS, SNAP_COUNT)

  endTime = datetime.datetime.now()
  endTimeString = endTime.strftime("%Y-%m-%d-%H-%M-%S") 
  logMessage(f"Exiting Poller at {endTimeString}")


if __name__ == "__main__":
  loop = asyncio.get_event_loop()
  loop.run_until_complete(main())
Arthur
  • 4,093
  • 3
  • 17
  • 28
  • Thank you. I will check it out tomorrow. – Paul Chernoch Aug 09 '17 at 22:46
  • Arthur notice that in your solution saving the file blocks the event loop while the file is being saved. To avoid it you should make use of the thread pool executor of asyncio. Check the explanation in Fluent Python: https://books.google.com/books?id=kYZHCgAAQBAJ&lpg=PA540&ots=isufUxFOOh&dq=threadpoolexecutor%20fluentpython%20asyncio&pg=PA582#v=onepage&q&f=false – Enrique Saez Aug 11 '17 at 19:04