Source code for gridr.core.grid.grid_resampling

# coding: utf8
#
# Copyright (c) 2025 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of GRIDR
# (see https://github.com/CNES/gridr).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Grid resampling
"""
# pylint: disable=C0413
import sys
from typing import NoReturn, Optional, Tuple, Union

import numpy as np

from gridr.cdylib import PyArrayWindow2, PyInterpolatorType, py_array1_grid_resampling_f64

PY311 = sys.version_info >= (3, 11)

if PY311:
    from typing import Self  # noqa: E402, F401
else:
    from typing_extensions import Self  # noqa: E402, F401
# pylint: enable=C0413


F64_F64_F64 = (np.dtype("float64"), np.dtype("float64"), np.dtype("float64"))

PY_ARRAY_GRID_RESAMPLING_FUNC = {
    F64_F64_F64: py_array1_grid_resampling_f64,
}

PY_INTERPOLATOR_TYPES = {
    "nearest": PyInterpolatorType.Nearest,
    "linear": PyInterpolatorType.Linear,
    "cubic": PyInterpolatorType.OptimizedBicubic,
}


[docs] def array_grid_resampling( interp: str, array_in: np.ndarray, grid_row: np.ndarray, grid_col: np.ndarray, grid_resolution: Tuple[int, int], array_out: Optional[np.ndarray], array_out_win: Optional[np.ndarray] = None, nodata_out: Optional[Union[int, float]] = 0, array_in_origin: Optional[Tuple[float, float]] = (0.0, 0.0), win: Optional[np.ndarray] = None, array_in_mask: Optional[np.ndarray] = None, grid_mask: Optional[np.ndarray] = None, grid_mask_valid_value: Optional[int] = 1, grid_nodata: Optional[float] = None, array_out_mask: Optional[Union[np.ndarray, bool]] = None, check_boundaries: bool = True, ) -> Tuple[Union[np.ndarray, NoReturn], Union[np.ndarray, NoReturn]]: """Resamples an input array based on target grid coordinates, applying an optional bilinear interpolation for low resolution grids. The method uses target grid coordinates (`grid_row` and `grid_col`) that may represent a lower resolution than the input array. Bilinear interpolation is applied internally to compute missing target coordinates. The oversampling factor is specified by the `grid_resolution` parameter, where a value of 1 indicates full resolution. The interpolation method is used through the `interp` parameter. This method wraps a Rust function (`py_array1_grid_resampling_*`) for efficient resampling. Parameters ---------- interp: str The interpolator name as string. See PY_INTERPOLATOR_TYPES keys for accepted values. array_in : np.ndarray The input array to be resampled. It must be a contiguous 2D (nrow, ncol) or 3D (nvar, nrow, ncol) array. grid_row : np.ndarray A 2D array representing the row coordinates of the target grid, with the same shape as `grid_col`. The coordinates target row positions in the `array_in` input array. grid_col : np.ndarray A 2D array representing the column coordinates of the target grid, with the same shape as `grid_row`. The coordinates target column positions in the `array_in` input array. grid_resolution : Tuple[int, int] A tuple specifying the oversampling factor for the grid for rows and columns. The resolution value of 1 represents full resolution, and higher values indicate lower resolution grids. array_out : Optional[np.ndarray] The output array where the resampled values will be stored. If `None`, a new array will be allocated. The shape of the output array is either determined based on the resolution and the input grid or by the optional `win` parameter. array_out_win : Optional[np.ndarray], default None An optional `np.ndarray` that designates the specific area in `array_out` to receive the resampled data. If `None`, the method will populate a default rectangular region starting from `array_out`'s top-left corner. This argument is only considered when `array_out` is passed, requiring `array_out` to be large enough to contain `array_out_win`. nodata_out : Optional[Union[int, float]], default 0 The value to be assigned to "NoData" in the output array. This value is used to fill in missing values where no valid resampling could occur or where a mask flag is set. array_in_origin : Optional[Tuple[float, float]], default (0., 0.) Bias to respectively apply to the `grid_row` and `grid_col` coordinates. The operation is performed by the wrapped Rust function. Its primary use cases include aligning with alternative grid origin conventions or handling situations where the provided `array_in` array corresponds to a subregion of the complete source raster. win : Optional[np.ndarray], default None A window (or sub-region) of the full resolution grid to limit the resampling to a specific target region. The window is defined as a list of tuples containing the first and last indices for each dimension. If `None`, the entire grid is processed. array_in_mask : Optional[np.ndarray], default None A mask for the input array that indicates which parts of `array_in` are valid for resampling. If not provided, the entire input array is considered valid. grid_mask : Optional[np.ndarray], default None An optional integer mask array for the grid. Grid cells corresponding to `grid_mask_valid_value` are considered **valid**; all other values indicate **invalid** cells and will result in `nodata_out` in the output array. If not provided, the entire grid is considered valid. The grid mask must have the same shape as `grid_row` and `grid_col`. grid_mask_valid_value : Optional[int], default 1 The value in `grid_mask` that designates a **valid** grid cell. All values in `grid_mask` that differ from this will be treated as **invalid**. This parameter is required if `grid_mask` is provided. grid_nodata : Optional[float], default None The value in `grid_row` and `grid_col` to consider as **invalid** cells. Please note this option is exclusive with `grid_mask`. The exclusivity is managed within the bound core method. array_out_mask : Optional[Union[np.ndarray, bool]], default None A mask for the output array that indicates where the resampled values should be stored. If `True`, a new array will be allocated and initially filled with 0. The shape of this output mask array is consistent with the `array_out` shape. If `None` or not `True`, the entire output array is assumed to be valid. check_boundaries : bool, default True Force a check at each iteration to ensure that the requreid data to perform interpolation is available in the source data. This parameter can be set to False for performance gain if you are sure that all the required data is available. Returns ------- Tuple[Union[np.ndarray, NoReturn], Union[np.ndarray, NoReturn]] A tuple containing: - The resampled array. If `array_out` was provided, this will be `None` (as the result is written in-place). - The resampled output mask. If `array_out_mask` was `False` or `None`, this will be `None`. Raises ------ Exception If the `py_array_grid_resampling_*` function (the underlying Rust binding) is not available for the provided input types. Notes ----- - This method is designed for resampling raster-like data using a grid of target coordinates. - This method is designed to be embedded in code that works on tiles, supporting both tiled inputs and outputs. - For correct results, ensure that the `grid_row` and `grid_col` values represent the desired target grid coordinates within the full resolution grid system. Limitations ----------- - The method assumes that all input arrays (`array_in`, `grid_row`, `grid_col`, etc.) are C-contiguous. If any are not, the method may raise an assertion error. - The method assumes that the grid-related arrays (`grid_row`, `grid_col`, `grid_mask`) have the same shapes. Mismatched shapes will raise an assertion error. - The `win` parameter, if provided, must be compatible with the resolution of the grid. If `win` exceeds the bounds of the grid, an error may occur. - The method does not handle invalid or missing values in the input arrays or masks beyond what's specified by `grid_mask` or `grid_nodata`. Users are responsible for ensuring any invalid or missing data is appropriately handled before calling the method. - For large grids or arrays, performance may degrade. Users should test the method's efficiency for their specific data sizes before using it in production. - This method assumes that the input grid is in a "full resolution" grid coordinate system. If the coordinate system is different, the resampling may produce incorrect results. Example ------- .. code-block:: python # Example usage with a 2D input array and grid: array_in = np.random.rand(100, 100) grid_row = np.linspace(0, 99, 50) grid_col = np.linspace(0, 99, 50) grid_resolution = (2, 2) array_out = None result, _ = array_grid_resampling( interp="cubic", array_in=array_in, grid_row=grid_row, grid_col=grid_col, grid_resolution=grid_resolution, array_out=array_out ) """ ret = None ret_mask = None interp_type = None try: interp_type = PY_INTERPOLATOR_TYPES[interp] except KeyError as err: raise Exception(f"Unknown interpolator {interp!r}") from err assert array_in.flags.c_contiguous is True assert grid_row.flags.c_contiguous is True assert grid_col.flags.c_contiguous is True array_in_shape = array_in.shape if len(array_in_shape) == 2: array_in_shape = (1,) + array_in_shape array_in = array_in.reshape(-1) assert np.all(grid_row.shape == grid_col.shape) assert len(grid_row.shape) == 2 grid_shape = grid_row.shape grid_row = grid_row.reshape(-1) grid_col = grid_col.reshape(-1) py_grid_win = None if win is not None: py_grid_win = PyArrayWindow2( start_row=win[0][0], end_row=win[0][1], start_col=win[1][0], end_col=win[1][1] ) # Allocate array_out if not given if array_out is None: if array_out_win is not None: # Ignore it array_out_win = None array_out_shape = None if win is not None: # Take the output shape from the window defined at full resolution array_out_shape = (win[0, 1] - win[0, 0] + 1, win[1, 1] - win[1, 0] + 1) else: # Take the output shape from the grid at full resolution array_out_shape = ( (grid_shape[0] - 1) * grid_resolution[0] + 1, (grid_shape[1] - 1) * grid_resolution[1] + 1, ) # Init the array array_out_shape = (array_in_shape[0],) + array_out_shape array_out = np.empty(array_out_shape, dtype=np.float64, order="C") ret = array_out assert array_out.flags.c_contiguous is True array_out_shape = array_out.shape if len(array_out_shape) == 2: array_out_shape = (1,) + array_out_shape # check same number of variables in array (first dim) assert array_out_shape[0] == array_in_shape[0] array_out = array_out.reshape(-1) py_array_out_win = None if array_out_win is not None: py_array_out_win = PyArrayWindow2( start_row=array_out_win[0][0], end_row=array_out_win[0][1], start_col=array_out_win[1][0], end_col=array_out_win[1][1], ) # Manage optional input mask # array_in_mask_dtype = np.dtype("uint8") if array_in_mask is not None: # array_in_mask_dtype = array_in_mask.dtype # check shape assert array_in_mask.dtype == np.dtype("uint8") assert array_in_mask.shape[0] == array_in_shape[1] assert array_in_mask.shape[1] == array_in_shape[2] # reshape array_in_mask = array_in_mask.reshape(-1) # Manage optional output mask if array_out_mask is not None: try: assert array_out_mask.dtype == np.dtype("uint8") assert array_out_mask.shape[0] == array_out_shape[1] assert array_out_mask.shape[1] == array_out_shape[2] array_out_mask = array_out_mask.reshape(-1) except AttributeError: # Not None and not a numpy array due to exception # Test if True if array_out_mask is True: array_out_mask = np.zeros(array_out_shape[1:], dtype=np.uint8, order="C").reshape( -1 ) ret_mask = array_out_mask else: array_out_mask = None func_types = (array_in.dtype, array_out.dtype, grid_row.dtype) nodata_out = array_out.dtype.type(nodata_out) # Manage grid_mask if grid_mask is not None: # grid mask must be c-contiguous assert grid_mask.flags.c_contiguous is True # grid mask must be encoded as unsigned 8 bits integer assert grid_mask.dtype == np.dtype("uint8") # grid mask shape must be the same has the grids assert np.all(grid_mask.shape == grid_shape) # Lets flat the grid mask view grid_mask = grid_mask.reshape(-1) try: func = PY_ARRAY_GRID_RESAMPLING_FUNC[func_types] except KeyError as err: raise Exception( f"py_array_grid_resampling_ function not available for types {func_types}" ) from err else: func( interp=interp_type, array_in=array_in, array_in_shape=array_in_shape, grid_row=grid_row, grid_col=grid_col, grid_shape=grid_shape, grid_resolution=grid_resolution, array_out=array_out, array_out_shape=array_out_shape, nodata_out=nodata_out, array_in_origin=array_in_origin, array_in_mask=array_in_mask, grid_mask=grid_mask, grid_mask_valid_value=grid_mask_valid_value, grid_nodata=grid_nodata, array_out_mask=array_out_mask, grid_win=py_grid_win, out_win=py_array_out_win, check_boundaries=check_boundaries, ) if ret is not None: ret = ret.reshape(array_out_shape).squeeze() if ret_mask is not None: ret_mask = ret_mask.reshape(array_out_shape[1:]) return ret, ret_mask