Source code for merger.harmony.download_worker

"""A utility for downloading multiple granules simultaneously"""

from copy import deepcopy
from multiprocessing import Manager, Process
from os import cpu_count
from pathlib import Path
import queue
import re
from urllib.parse import urlparse

from harmony.logging import build_logger
from harmony.util import download


[docs]def multi_core_download(urls, destination_dir, access_token, cfg, process_count=None): """ A method which automagically scales downloads to the number of CPU cores. For further explaination, see documentation on "multi-track drifting" Parameters ---------- urls : list list of urls to download destination_dir : str output path for downloaded files access_token : str access token as provided in Harmony input cfg : dict Harmony configuration information process_count : int Number of worker processes to run (expected >= 1) Returns ------- list list of downloaded files as pathlib.Path objects """ if process_count is None: process_count = cpu_count() with Manager() as manager: url_queue = manager.Queue(len(urls)) path_list = manager.list() for url in urls: url_queue.put(url) # Spawn worker processes processes = [] for _ in range(process_count): download_process = Process(target=_download_worker, args=(url_queue, path_list, destination_dir, access_token, cfg)) processes.append(download_process) download_process.start() # Ensure worker processes exit successfully for process in processes: process.join() if process.exitcode != 0: raise RuntimeError(f'Download failed - exit code: {process.exitcode}') process.close() path_list = deepcopy(path_list) # ensure GC can cleanup multiprocessing return [Path(path) for path in path_list]
def _download_worker(url_queue, path_list, destination_dir, access_token, cfg): """ A method to be executed in a separate process which processes the url_queue and places paths to completed downloads into the path_list. Downloads are handled by harmony.util.download Parameters ---------- url_queue : queue.Queue URLs to process - should be filled from start and only decreases path_list : list paths to completed file downloads destination_dir : str output path for downloaded files access_token : str access token as provided in Harmony input cfg : dict Harmony configuration information """ logger = build_logger(cfg) while not url_queue.empty(): try: url = url_queue.get_nowait() except queue.Empty: break path = Path(download(url, destination_dir, logger=logger, access_token=access_token, cfg=cfg)) filename_match = re.match(r'.*\/(.+\..+)', urlparse(url).path) if filename_match is not None: filename = filename_match.group(1) dest_path = path.parent.joinpath(filename) path = path.rename(dest_path) else: logger.warning('Origin filename could not be assertained - %s', url) path_list.append(str(path))