"""Preprocessing methods and the utilities to automagically run them in single-thread/multiprocess modes"""
import math
import multiprocessing
from multiprocessing.shared_memory import SharedMemory
import queue
import time
import os
import shutil
import netCDF4 as nc
import numpy as np
from podaac.merger.path_utils import resolve_dim, resolve_group
[docs]def shared_memory_size():
"""
try to get the shared memory space size by reading the /dev/shm on linux machines
"""
try:
stat = shutil.disk_usage("/dev/shm")
return stat.total
except FileNotFoundError:
# Get memory size via env or default to 60 MB
default_memory_size = os.getenv("SHARED_MEMORY_SIZE", "60000000")
return int(default_memory_size)
[docs]def max_var_memory(file_list, var_info, max_dims):
"""
function to get the maximum shared memory that will be used for variables
Parameters
----------
file_list : list
List of file paths to be processed
var_info : dict
Dictionary of variable paths and associated VariableInfo
"""
max_var_mem = 0
for file in file_list:
with nc.Dataset(file, 'r') as origin_dataset:
for var_path, var_meta in var_info.items():
ds_group, var_name = resolve_group(origin_dataset, var_path)
ds_var = ds_group.variables.get(var_name)
if ds_var is None:
target_shape = tuple(max_dims[f'/{dim}'] for dim in var_meta.dim_order)
var_size = math.prod(target_shape) * var_meta.datatype.itemsize
max_var_mem = max(var_size, max_var_mem)
else:
var_size = math.prod(ds_var.shape) * var_meta.datatype.itemsize
max_var_mem = max(var_size, max_var_mem)
return max_var_mem
[docs]def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logger):
"""
Automagically run merging in an optimized mode determined by the environment
Parameters
----------
merged_dataset : nc.Dataset
Destination dataset of the merge operation
file_list : list
List of file paths to be processed
var_info : dict
Dictionary of variable paths and associated VariableInfo
max_dims : dict
Dictionary of dimension paths and maximum dimensions found during preprocessing
process_count : int
Number of worker processes to run (expected >= 1)
"""
if process_count == 1:
_run_single_core(merged_dataset, file_list, var_info, max_dims, logger)
else:
# Merging is bottlenecked at the write process which is single threaded
# so spinning up more than 2 processes for read/write won't scale the
# optimization
max_var_mem = max_var_memory(file_list, var_info, max_dims)
max_memory_size = round(shared_memory_size() * .95)
if max_var_mem < max_memory_size:
_run_multi_core(merged_dataset, file_list, var_info, max_dims, 2, logger)
else:
_run_single_core(merged_dataset, file_list, var_info, max_dims, logger)
def _run_single_core(merged_dataset, file_list, var_info, max_dims, logger):
"""
Run the variable merge in the current thread/single-core mode
Parameters
----------
merged_dataset : nc.Dataset
Destination dataset of the merge operation
file_list : list
List of file paths to be processed
var_info : dict
Dictionary of variable paths and associated VariableInfo
max_dims : dict
Dictionary of dimension paths and maximum dimensions found during preprocessing
"""
logger.info("Running single core ......")
for i, file in enumerate(file_list):
with nc.Dataset(file, 'r') as origin_dataset:
origin_dataset.set_auto_maskandscale(False)
for var_path, var_meta in var_info.items():
ds_group, var_name = resolve_group(origin_dataset, var_path)
merged_group = resolve_group(merged_dataset, var_path)
ds_var = ds_group.variables.get(var_name)
merged_var = merged_group[0].variables[var_name]
if ds_var is None:
fill_value = var_meta.fill_value
target_shape = tuple(max_dims[f'/{dim}'] for dim in var_meta.dim_order)
merged_var[i] = np.full(target_shape, fill_value)
continue
resized = resize_var(ds_var, var_meta, max_dims)
merged_var[i] = resized
def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count, logger): # pylint: disable=too-many-locals
"""
Run the variable merge in multi-core mode. This method creates (process_count - 1)
read processes which read data from an origin granule, resize it, then queue it
for the write process to write to disk. The write process is run in the current
thread
# of write processes (1) + # of read processes (process_count - 1) = process_count
Parameters
----------
merged_dataset : nc.Dataset
Destination dataset of the merge operation
file_list : list
List of file paths to be processed
var_info : dict
Dictionary of variable paths and associated VariableInfo
max_dims : dict
Dictionary of dimension paths and maximum dimensions found during preprocessing
process_count : int
Number of worker processes to run (expected >= 2)
"""
logger.info("Running multicore ......")
total_variables = len(file_list) * len(var_info)
logger.info(f"total variables {total_variables}")
# Ensure SharedMemory doesn't get cleaned up before being processed
context = multiprocessing.get_context('forkserver')
with context.Manager() as manager:
in_queue = manager.Queue(len(file_list))
out_queue = manager.Queue((process_count - 1) * len(var_info)) # Store (process_count - 1) granules in buffer
memory_limit = manager.Value('i', 0)
lock = manager.Lock()
logger.info(file_list)
for i, file in enumerate(file_list):
in_queue.put((i, file))
processes = []
logger.info("creating read processes")
for _ in range(process_count - 1):
process = context.Process(target=_run_worker, args=(in_queue, out_queue, max_dims, var_info, memory_limit, lock))
processes.append(process)
process.start()
processed_variables = 0
logger.info("Start processing variables in main process")
while processed_variables < total_variables:
try:
i, var_path, shape, memory_name = out_queue.get_nowait()
except queue.Empty:
_check_exit(processes)
continue
merged_var = merged_dataset[var_path]
var_meta = var_info[var_path]
shared_memory = SharedMemory(name=memory_name, create=False)
resized_arr = np.ndarray(shape, var_meta.datatype, shared_memory.buf)
merged_var[i] = resized_arr # The write operation itself
shared_memory.unlink()
shared_memory.close()
with lock:
memory_limit.value = memory_limit.value - resized_arr.nbytes
processed_variables = processed_variables + 1
for process in processes:
# Ensure that child processes properly exit before manager context
# gets GCed. Solves EOFError
process.join()
def _run_worker(in_queue, out_queue, max_dims, var_info, memory_limit, lock):
"""
A method to be executed in a separate process which reads variables from a
granule, performs resizing, and queues the processed data up for the writer
process.
Parameters
----------
in_queue : Queue
Input queue of tuples of subset indexes and granule file paths respectively
out_queue : Queue
Output queue of tuples of subset indexes, variable path, variable shape, and shared memory name
max_dims : dict
Dictionary of dimension paths and maximum dimensions found during preprocessing
var_info : dict
Dictionary of variable paths and associated VariableInfo
"""
# want to use max 95% of the memory size of disk
max_memory_size = round(shared_memory_size() * .95)
while not in_queue.empty():
try:
i, file = in_queue.get_nowait()
except queue.Empty:
break
with nc.Dataset(file, 'r') as origin_dataset:
origin_dataset.set_auto_maskandscale(False)
for var_path, var_meta in var_info.items():
ds_group, var_name = resolve_group(origin_dataset, var_path)
ds_var = ds_group.variables.get(var_name)
if ds_var is None:
fill_value = var_meta.fill_value
target_shape = tuple(max_dims[f'/{dim}'] for dim in var_meta.dim_order)
resized_arr = np.full(target_shape, fill_value)
else:
resized_arr = resize_var(ds_var, var_meta, max_dims)
if resized_arr.nbytes > max_memory_size:
raise RuntimeError(f'Merging failed - MAX MEMORY REACHED: {resized_arr.nbytes}')
# Limit to how much memory we allocate to max memory size
while memory_limit.value + resized_arr.nbytes > max_memory_size and not out_queue.empty():
time.sleep(.5)
# Copy resized array to shared memory
shared_mem = SharedMemory(create=True, size=resized_arr.nbytes)
shared_arr = np.ndarray(resized_arr.shape, resized_arr.dtype, buffer=shared_mem.buf)
np.copyto(shared_arr, resized_arr)
with lock:
memory_limit.value = memory_limit.value + resized_arr.nbytes
out_queue.put((i, var_path, shared_arr.shape, shared_mem.name))
shared_mem.close()
def _check_exit(processes):
"""
Ensure that all processes have exited without error by checking their exitcode
if they're no longer running. Processes that have exited properly are removed
from the list
Parameters
----------
processes : list
List of processes to check
"""
for process in processes.copy():
if not process.is_alive():
if process.exitcode == 0:
processes.remove(process)
else:
raise RuntimeError(f'Merging failed - exit code: {process.exitcode}')
[docs]def resize_var(var, var_info, max_dims):
"""
Resizes a variable's data to the maximum dimensions found in preprocessing.
This method will never downscale a variable and only performs bottom and
left padding as utilized in the original Java implementation
Parameters
----------
var : nc.Variable
variable to be resized
group_path : str
group path to this variable
max_dims : dict
dictionary of maximum dimensions found during preprocessing
Returns
-------
np.ndarray
An ndarray containing the resized data
"""
# special case for 0d variables
if var.ndim == 0:
return var[:]
# generate ordered array of new widths
dims = [resolve_dim(max_dims, var_info.group_path, dim.name) - dim.size for dim in var.get_dims()]
widths = [[0, dim] for dim in dims]
# Legacy merger doesn't explicitly define this behavior, but its resizer
# fills its resized arrays with 0s upon initialization. Sources:
# https://github.com/Unidata/netcdf-java/blob/87f37eb82b6f862f71e0d5767470500b27af5d1e/cdm-core/src/main/java/ucar/ma2/Array.java#L52
fill_value = 0 if var_info.fill_value is None else var_info.fill_value
resized = np.pad(var, widths, mode='constant', constant_values=fill_value)
return resized