Source code for gridr.core.grid.grid_utils

# coding: utf8
#
# Copyright (c) 2025 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of GRIDR
# (see https://github.com/CNES/gridr).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Grid utils module
"""
import logging
from typing import NoReturn, Optional, Tuple, Union

import numpy as np
from scipy.interpolate import RegularGridInterpolator

from gridr.cdylib import (
    PyArrayWindow2,
    PyGeometryBoundsF64,
    PyGridGeometriesMetricsF64,
    py_array1_compute_resampling_grid_geometries_f64_f64,
    py_array1_compute_resampling_grid_src_boundaries_f64_f64,
)
from gridr.core.utils.array_utils import ArrayProfile, array_add
from gridr.core.utils.array_window import (
    window_apply,
    window_check,
    window_expand_ndim,
    window_extend,
    window_overflow,
    window_shape,
)

F64_F64 = (np.dtype("float64"), np.dtype("float64"))

# The first element in the tuple key represents the type used for
# PyGridGeometriesMetricsF64 members. Only float64 is considered by now.
PY_ARRAY_COMPUTE_RESAMPLING_GRID_GEOMETRIES_FUNC = {
    F64_F64: py_array1_compute_resampling_grid_geometries_f64_f64,
}

# The first element in the tuple key represents the type used for
# PyGeometryBoundsF64 members. Only float64 is considered by now.
PY_ARRAY_COMPUTE_RESAMPLING_GRID_SRC_BOUNDARIES_FUNC = {
    F64_F64: py_array1_compute_resampling_grid_src_boundaries_f64_f64,
}


[docs] def array_compute_resampling_grid_geometries( grid_row: np.ndarray, grid_col: np.ndarray, grid_resolution: Tuple[int, int], win: Optional[np.ndarray] = None, grid_mask: Optional[np.ndarray] = None, grid_mask_valid_value: Optional[int] = 1, grid_nodata: Optional[float] = None, ) -> Union[PyGridGeometriesMetricsF64, None]: """Computes resampling grid geometries metrics from given row and column grids. This function analyzes the validity of grid points (nodes) along rows and columns to determine bounding boxes and a transition matrix for resampling operations on grids. It returns detailed information about the source and destination grid bounds, as well as the transition matrix describing the linear transformation between source and destination grids. This method wraps a Rust function (`py_array1_compute_resampling_grid_geometries_f64_f64`) for grid metrics computation. Parameters ---------- grid_row : np.ndarray A 2D array representing the row coordinates of the target grid, with the same shape as `grid_col`. The coordinates target row positions in the `array_in` input array. grid_col : np.ndarray A 2D array representing the column coordinates of the target grid, with the same shape as `grid_row`. The coordinates target column positions in the `array_in` input array. grid_resolution : Tuple[int, int] A tuple specifying the oversampling factor for the grid along rows and columns. A resolution value of 1 represents full resolution, while higher values indicate lower resolution grids. win : Optional[np.ndarray], default None An optional window (or sub-region) of the grid to limit the computation to a specific target region. The window is defined as a list of tuples containing the first and last indices for each dimension. If `None`, the entire grid is processed. grid_mask : Optional[np.ndarray], default None An optional integer mask array for the grid. Grid cells corresponding to `grid_mask_valid_value` are considered **valid**; all other values indicate **invalid** cells and will result in `nodata_out` in the output array. If not provided, the entire grid is considered valid. The grid mask must have the same shape as `grid_row` and `grid_col`. grid_mask_valid_value : Optional[int], default 1 The value in `grid_mask` that designates a **valid** grid cell. All values in `grid_mask` that differ from this will be treated as **invalid**. This parameter is required if `grid_mask` is provided. grid_nodata : Optional[float], default None The value in `grid_row` and `grid_col` to consider as **invalid** cells. Note that this option is exclusive with `grid_mask`. This exclusivity is managed within the core bound method. Returns ------- Union[PyGridGeometriesMetricsF64, None] A structure containing the computed metrics (`PyGridGeometriesMetricsF64`) or `None` if no valid metrics can be computed (e.g., empty grid). Raises ------ Exception If the underlying Rust function `py_array1_compute_resampling_grid_geometries_f64_*` is not available for the provided input types. Exception If the `win` is outside of the array domain. Notes ----- - This method serves as a preprocessing step prior to resampling raster-like data onto a target coordinate grid. The resulting transition matrix facilitates effective anti-aliasing management. - The computed source bounds are intended to restrict the read window of the input raster, optimizing data access. Similarly, the destination bounds define the output grid window, preventing unnecessary processing of nodata regions. - The method is designed to support tiled processing workflows by accepting an optional window parameter, enabling integration within tile-based operations. Limitations ----------- - The method assumes that all input arrays (`grid_row`, `grid_col`, etc.) are C-contiguous. If any of them are not, the method may raise an assertion error. - The method assumes that the grid-related arrays (`grid_row`, `grid_col`, `grid_mask`) have the same shapes. Mismatched shapes will raise an assertion error. - The `win` parameter, if provided, must be compatible with the grid shape. If `win` exceeds the bounds of the grid, an error may occur. - The method does not handle invalid or missing values in the input arrays or masks beyond what's specified by `grid_mask` or `grid_nodata`. Users are responsible for ensuring any invalid or missing data is appropriately handled before calling this method. """ ret = None assert grid_row.flags.c_contiguous is True assert grid_col.flags.c_contiguous is True assert np.all(grid_row.shape == grid_col.shape) assert len(grid_row.shape) == 2 grid_shape = grid_row.shape grid_row = grid_row.reshape(-1) grid_col = grid_col.reshape(-1) py_grid_win = None if win is not None: py_grid_win = PyArrayWindow2( start_row=win[0][0], end_row=win[0][1], start_col=win[1][0], end_col=win[1][1] ) func_types = (np.dtype("float64"), grid_row.dtype) # Manage grid_mask if grid_mask is not None: # grid mask must be c-contiguous assert grid_mask.flags.c_contiguous is True # grid mask must be encoded as unsigned 8 bits integer assert grid_mask.dtype == np.dtype("uint8") # grid mask shape must be the same has the grids assert np.all(grid_mask.shape == grid_shape) # Lets flat the grid mask view grid_mask = grid_mask.reshape(-1) try: func = PY_ARRAY_COMPUTE_RESAMPLING_GRID_GEOMETRIES_FUNC[func_types] except KeyError as err: raise Exception( "py_array1_compute_resampling_grid_geometries_ function" f" not available for types {func_types}" ) from err else: ret = func( grid_row=grid_row, grid_col=grid_col, grid_shape=grid_shape, grid_resolution=grid_resolution, grid_mask=grid_mask, grid_mask_valid_value=grid_mask_valid_value, grid_nodata=grid_nodata, grid_win=py_grid_win, ) return ret
[docs] def array_compute_resampling_grid_src_boundaries( grid_row: np.ndarray, grid_col: np.ndarray, win: Optional[np.ndarray] = None, grid_mask: Optional[np.ndarray] = None, grid_mask_valid_value: Optional[int] = 1, grid_nodata: Optional[float] = None, ) -> Union[PyGeometryBoundsF64, None]: """Computes resampling grid source boundaries from given row and column grids. This function analyzes the validity of grid points (nodes) along rows and columns and determine the source bounding box for resampling operations on grids. This method wraps a Rust function (`py_array1_compute_resampling_grid_src_boundaries_f64_f64_f64_f64`). Parameters ---------- grid_row : np.ndarray A 2D array representing the row coordinates of the target grid, with the same shape as `grid_col`. The coordinates target row positions in the `array_in` input array. grid_col : np.ndarray A 2D array representing the column coordinates of the target grid, with the same shape as `grid_row`. The coordinates target column positions in the `array_in` input array. win : Optional[np.ndarray], default None An optional window (or sub-region) of the grid to limit the computation to a specific target region. The window is defined as a list of tuples containing the first and last indices for each dimension. If `None`, the entire grid is processed. grid_mask : Optional[np.ndarray], default None An optional integer mask array for the grid. Grid cells corresponding to `grid_mask_valid_value` are considered **valid**; all other values indicate **invalid** cells and will result in `nodata_out` in the output array. If not provided, the entire grid is considered valid. The grid mask must have the same shape as `grid_row` and `grid_col`. grid_mask_valid_value : Optional[int], default 1 The value in `grid_mask` that designates a **valid** grid cell. All values in `grid_mask` that differ from this will be treated as **invalid**. This parameter is required if `grid_mask` is provided. grid_nodata : Optional[float], default None The value in `grid_row` and `grid_col` to consider as **invalid** cells. Note that this option is exclusive with `grid_mask`. This exclusivity is managed within the core bound method. Returns ------- Union[PyGeometryBoundsF64, None] A structure containing the computed boundaries (`PyGeometryBoundsF64`) or `None` if no valid boundaries can be computed (e.g., empty grid). Raises ------ Exception If the underlying Rust function `py_array1_compute_resampling_grid_src_boundaries_f64_*` is not available for the provided input types. Exception If the `win` is outside of the array domain. Notes ----- - The computed source bounds are intended to restrict the read window of the input raster, optimizing data access. - The method is designed to support tiled processing workflows by accepting an optional window parameter, enabling integration within tile-based operations. Limitations ----------- - The method assumes that all input arrays (`grid_row`, `grid_col`, etc.) are C-contiguous. If any of them are not, the method may raise an assertion error. - The method assumes that the grid-related arrays (`grid_row`, `grid_col`, `grid_mask`) have the same shapes. Mismatched shapes will raise an assertion error. - The `win` parameter, if provided, must be compatible with the grid shape. If `win` exceeds the bounds of the grid, an error may occur. - The method does not handle invalid or missing values in the input arrays or masks beyond what's specified by `grid_mask` or `grid_nodata`. Users are responsible for ensuring any invalid or missing data is appropriately handled before calling this method. """ ret = None assert grid_row.flags.c_contiguous is True assert grid_col.flags.c_contiguous is True assert np.all(grid_row.shape == grid_col.shape) assert len(grid_row.shape) == 2 grid_shape = grid_row.shape grid_row = grid_row.reshape(-1) grid_col = grid_col.reshape(-1) py_grid_win = None if win is not None: py_grid_win = PyArrayWindow2( start_row=win[0][0], end_row=win[0][1], start_col=win[1][0], end_col=win[1][1] ) func_types = (np.dtype("float64"), grid_row.dtype) # Manage grid_mask if grid_mask is not None: # grid mask must be c-contiguous assert grid_mask.flags.c_contiguous is True # grid mask must be encoded as unsigned 8 bits integer assert grid_mask.dtype == np.dtype("uint8") # grid mask shape must be the same has the grids assert np.all(grid_mask.shape == grid_shape) # Lets flat the grid mask view grid_mask = grid_mask.reshape(-1) try: func = PY_ARRAY_COMPUTE_RESAMPLING_GRID_SRC_BOUNDARIES_FUNC[func_types] except KeyError as err: raise Exception( "py_array1_compute_resampling_grid_src_boundaries function" f" not available for types {func_types}" ) from err else: ret = func( grid_row=grid_row, grid_col=grid_col, grid_shape=grid_shape, grid_mask=grid_mask, grid_mask_valid_value=grid_mask_valid_value, grid_nodata=grid_nodata, grid_win=py_grid_win, ) return ret
[docs] def array_shift_grid_coordinates( grid_row: np.ndarray, grid_col: np.ndarray, grid_shift: Union[Tuple[int, int], Tuple[float, float]], win: Optional[np.ndarray] = None, grid_mask: Optional[np.ndarray] = None, grid_mask_valid_value: Optional[int] = 1, grid_nodata: Optional[float] = None, ) -> NoReturn: """Shift a resampling grid by adding a scalar values in both row and column dimentions. This method uses the core.utils.array_utils.array_add that wraps Rust functions (`py_array1_add_*`) in order to stricly oper inplace with no temporary memory allocation. Parameters ---------- grid_row : np.ndarray A 2D array representing the row coordinates of the target grid, with the same shape as `grid_col`. The coordinates target row positions in the `array_in` input array. grid_col : np.ndarray A 2D array representing the column coordinates of the target grid, with the same shape as `grid_row`. The coordinates target column positions in the `array_in` input array. grid_shift : Union[Tuple[int, int], Tuple[float, float]] A tuple specifying the scalar values to add to the grid coordinates along rows and columns. win : Optional[np.ndarray], default None An optional window (or sub-region) of the grid to limit the computation to a specific target region. The window is defined as a list of tuples containing the first and last indices for each dimension. If `None`, the entire grid is processed. grid_mask : Optional[np.ndarray], default None An optional integer mask array for the grid. Grid cells corresponding to `grid_mask_valid_value` are considered **valid**; all other values indicate **invalid** cells and will result in `nodata_out` in the output array. If not provided, the entire grid is considered valid. The grid mask must have the same shape as `grid_row` and `grid_col`. grid_mask_valid_value : Optional[int], default 1 The value in `grid_mask` that designates a **valid** grid cell. All values in `grid_mask` that differ from this will be treated as **invalid**. This parameter is required if `grid_mask` is provided. grid_nodata : Optional[float], default None The value in `grid_row` and `grid_col` to consider as **invalid** cells. Note that this option is exclusive with `grid_mask`. This exclusivity is managed within the core bound method. Returns ------- None Notes ----- - The method is designed to support tiled processing workflows by accepting an optional window parameter, enabling integration within tile-based operations. Limitations ----------- - The method assumes that all input arrays (`grid_row`, `grid_col`, etc.) are C-contiguous. If any of them are not, the method may raise an assertion error. - The method assumes that the grid-related arrays (`grid_row`, `grid_col`, `grid_mask`) have the same shapes. Mismatched shapes will raise an assertion error. - The `win` parameter, if provided, must be compatible with the grid shape. If `win` exceeds the bounds of the grid, an error may occur. """ assert grid_row.flags.c_contiguous is True assert grid_col.flags.c_contiguous is True assert np.all(grid_row.shape == grid_col.shape) assert len(grid_row.shape) == 2 grid_shape = grid_row.shape if win is not None: if not window_check(grid_row, win): raise Exception("window outside of output grid domain") # Manage grid_mask if grid_mask is not None: # grid mask must be c-contiguous assert grid_mask.flags.c_contiguous is True # grid mask must be encoded as unsigned 8 bits integer assert grid_mask.dtype == np.dtype("uint8") # grid mask shape must be the same has the grids assert np.all(grid_mask.shape == grid_shape) # Lets flat the grid mask view grid_mask = grid_mask.reshape(-1) # prepare call to array_add - here we use array_cond related options # note: val_cond is not used kwargs = { "val_cond": 0, "array_cond": grid_mask, "add_on_true": True, "array_cond_val": grid_mask_valid_value, "win": win, } # add shift on row coordinates array_add(array=grid_row, val_add=grid_shift[0], **kwargs) # add shift on column coordinates array_add(array=grid_col, val_add=grid_shift[1], **kwargs) elif grid_nodata is not None: # prepare call to array_add - here we use val_cond # we have to add the scalars on valid data, ie. different from val_cond, # hence add_on_true set to False kwargs = { "val_cond": grid_nodata, "array_cond": None, "add_on_true": False, "array_cond_val": None, "win": win, } # add shift on row coordinates array_add(array=grid_row, val_add=grid_shift[0], **kwargs) # add shift on column coordinates array_add(array=grid_col, val_add=grid_shift[1], **kwargs) else: # no mask is given, we directly use native numpy operations if win is not None: # Define the window slice - we have ensured previously that the window is valid win_slice = ( slice(win[0][0], win[0][1] + 1), slice(win[1][0], win[1][1] + 1), ) # The slice is 2D - reshape grid as 2D grid_row = grid_row.reshape(grid_shape) grid_col = grid_col.reshape(grid_shape) # Apply the shift on window grid_row[win_slice] += grid_shift[0] grid_col[win_slice] += grid_shift[1] else: grid_row += grid_shift[0] grid_col += grid_shift[1] return None
[docs] def read_win_from_grid_metrics( grid_metrics: PyGridGeometriesMetricsF64, array_src_profile_2d: ArrayProfile, margins: np.ndarray, logger: logging.Logger, logger_msg_prefix: str = "", ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Computes the source read window from grid metrics. This function determines the read window (`src_win_read`) from the source array based on the destination and source bounds contained in `grid_metrics`. It applies the necessary margins to ensure sufficient neighborhood data is available for operations such as interpolation, while handling cases where the window exceeds the array boundaries. Parameters ---------- grid_metrics : PyGridGeometriesMetricsF64 Object containing geometric metrics of the grid, including source and destination bounds. array_src_profile_2d : ArrayProfile Metadata/profile of the 2D source array, including shape and number of dimensions information. margins : numpy.ndarray of shape (2, 2) Margins to apply to the read window, formatted as ``[[top_margin, bottom_margin], [left_margin, right_margin]]``. logger : logging.Logger Logger instance used for debugging messages. logger_msg_prefix : str, optional Optional prefix to include in log messages for better traceability. Defaults to ''. Returns ------- src_win_read : numpy.ndarray of shape (2, 2) Final source read window adjusted for margins and boundary constraints. Format: ``[[row_start, row_end], [col_start, col_end]]``. This is the window that should be used for the raster read IO. src_win_marged : numpy.ndarray of shape (2, 2) Desired read window with margins applied, before boundary correction (padding). pad : numpy.ndarray of shape (2, 2) Amount of padding required if the marged window extends outside the source array. Format: ``[[top_pad, bottom_pad], [left_pad, right_pad]]``. Notes ----- This function is critical for operations requiring contextual data beyond the immediate processing area, such as filtering or interpolation, ensuring that no data loss or edge artifacts occur due to insufficient neighborhood information. """ def DEBUG(msg): if logger: logger.debug(f"{logger_msg_prefix} - {msg}") # The metrics have been computed, ie there is at least one # valid point in the grid regarding the masking options. # We can get the dst and src window : # - the dst bounds are given relative to the current strip # low resolute shape. # - the src bounds are absolute coordinates value in the # input array. dst_lowres_bounds = grid_metrics.dst_bounds src_bounds = grid_metrics.src_bounds DEBUG(f"dst low res bounds : {dst_lowres_bounds} ") DEBUG(f"src bounds : {src_bounds} ") # Define the strip low res computing window (upper limit included) dst_lowres_win = np.array( ( (dst_lowres_bounds.ymin, dst_lowres_bounds.ymax), (dst_lowres_bounds.xmin, dst_lowres_bounds.xmax), ) ) DEBUG(f"dst win : {dst_lowres_win}") src_win_read = None src_win_marged = None pad = None if ( (src_bounds.ymin < 0 and src_bounds.ymax < 0) or (src_bounds.xmin < 0 and src_bounds.xmax < 0) or ( src_bounds.ymin > array_src_profile_2d.shape[0] - 1 and src_bounds.ymax > array_src_profile_2d.shape[0] - 1 ) or ( src_bounds.xmin > array_src_profile_2d.shape[1] - 1 and src_bounds.xmax > array_src_profile_2d.shape[1] - 1 ) ): # The source is not readable from raster - fully outside for at least # one direction. DEBUG( "The source read window is not available ; it goes fully " "outside of raster in one direction at least" ) else: # Define the input read window src_win = np.array( ( (int(np.floor(src_bounds.ymin)), int(np.ceil(src_bounds.ymax))), (int(np.floor(src_bounds.xmin)), int(np.ceil(src_bounds.xmax))), ) ) DEBUG(f"src read win (preliminary) : {src_win}") # Here we got a preliminary read window, but : # - That window may overflow (ie. adress coordinates outside # of the raster # - We have to consider some margins : # -- margin required for the interpolation kernel # -- margin required for spline interpolation preprocessing. # -- margin that may be required for other processing (egg. # antialiasing filtering) # A marged window may also overflow but we may have to # manage edges here : the passed array has to cover for # margins. # # Strategy : # 1. compute overflow of the preliminary window and limit it # to the available region # 2. Add margins # 3. Compute overflow of the marged window # 4. Read the valid window and perform edge management if # needed # 1. compute overflow src_win_overflow = window_overflow(array_src_profile_2d, src_win) DEBUG(f"src read win (preliminary) overflow : {src_win_overflow}") # 2. compute marged window # 2.1 first crop the overflow if any src_win_marged = window_extend(src_win, src_win_overflow, reverse=True) DEBUG(f"src read win before margin : {src_win_marged}") # 2.2 apply the margin src_win_marged = window_extend(src_win_marged, margins, reverse=False) DEBUG(f"src read win after margin : {src_win_marged}") # 3. Compute overflow of the marged window src_win_marged_overflow = window_overflow(array_src_profile_2d, src_win_marged) DEBUG(f"src read win required pad : {src_win_marged_overflow}") # `cstrip_read_win` corresponds to the window to read from src array src_win_read = window_extend(src_win_marged, src_win_marged_overflow, reverse=True) src_win_read_shape = window_shape(src_win_read) DEBUG(f"src read win read : {src_win_read} with shape {src_win_read_shape}") pad = np.array([[0, 0], [0, 0]]) if not np.all(src_win_marged_overflow == 0): pad = src_win_marged_overflow return src_win_read, src_win_marged, pad
[docs] def interpolate_grid( grid: Optional[np.ndarray], grid_mask: Optional[np.ndarray], x: np.ndarray, y: np.ndarray, x_new: np.ndarray, y_new: np.ndarray, dtype: Optional[np.dtype] = None, mask_binarize_precision: float = 1e-6, mask_dtype: np.dtype = np.uint8, ) -> Tuple[np.ndarray]: """Interpolate a 3-dimensional grid and its associated 2-dimensional mask. The first dimension of the grid contains the variable. This function performs linear interpolation of both the grid data and its mask onto a new coordinate mesh defined by `x_new` and `y_new`. We assume here that the input mask follows the convention where all non-zero values are considered masked. The output mask will be binarized (i.e., values equal to 0 or 1) according to a binarization threshold applied to the interpolated mask values: .. math:: final\\_mask(i,j) = 0 \\quad \\text{only if} \\quad |interp\\_mask(i,j)| < threshold The grid is interpolated on the mesh generated by the `x_new` and `y_new` coordinates, using a linear interpolation method. Parameters ---------- grid : Optional[np.ndarray] Input 3-dimensional grid. Its first dimension should represent different variables or bands, while the subsequent two dimensions correspond to spatial (row, column) data. grid_mask : Optional[np.ndarray] Input 2-dimensional mask. This mask should have the same spatial dimensions as `grid` (i.e., its last two dimensions). Non-zero values are treated as masked. x : np.ndarray 1D coordinates associated with the input `grid` (and `grid_mask`) for column indexes. y : np.ndarray 1D coordinates associated with the input `grid` (and `grid_mask`) for row indexes. x_new : np.ndarray 1D coordinates associated with the interpolated grid and mask for columns. This defines the new horizontal sampling of the output grid. y_new : np.ndarray 1D coordinates associated with the interpolated grid and mask for rows. This defines the new vertical sampling of the output grid. dtype : Optional[np.dtype], default None Data type to use for computation and for the output interpolated grid. If `None`, it uses the same dtype as the input `grid`. mask_binarize_precision : float, default 1e-6 Threshold used for the interpolated mask's binarization. Values with an absolute magnitude less than this threshold will be set to 0 in the output mask. mask_dtype : np.dtype, default np.uint8 Data type of the output mask. This should typically be `np.uint8` (for 0 or 1 values) or `bool`. Returns ------- Tuple[np.ndarray, np.ndarray] A tuple containing: - The interpolated 3-dimensional grid. - The binarized 2-dimensional interpolated mask. """ interp_grid = None interp_grid_mask = None # Create the "sparse" coordinates grid in order to preserve memory x_new_sparse, y_new_sparse = np.meshgrid(x_new, y_new, indexing="xy", sparse=True) if dtype is None: dtype = grid.dtype # Init the output grid if grid is not None: n_vars = grid.shape[0] interp_grid = np.empty((grid.shape[0], len(y_new), len(x_new)), dtype=dtype) # Loop on each variable to perform interpolation for i in range(n_vars): # Create the interpolator interpolator = RegularGridInterpolator( (y, x), grid[i, :, :], method="linear", bounds_error=False, fill_value=np.nan ) # Perform the interpolation interp_grid[i, :, :] = interpolator((y_new_sparse, x_new_sparse)) # Perform interpolation on mask if grid_mask is not None: # Init the mask interp_grid_mask = np.empty(grid_mask.shape) interpolator = RegularGridInterpolator( (y, x), grid_mask[:, :], method="linear", bounds_error=False, fill_value=np.nan ) # Perform the interpolation interp_grid_mask = interpolator((y_new_sparse, x_new_sparse)) # The interpolator will generate interpolated values # If we are strict we will only consider unmasked data to have # strictly the mask_value (almost equal with a precision) interp_grid_mask = (np.abs(interp_grid_mask) >= mask_binarize_precision).astype(mask_dtype) return interp_grid, interp_grid_mask
[docs] def oversample_regular_grid( grid: Optional[np.ndarray], grid_oversampling_row: int, grid_oversampling_col: int, grid_mask: Optional[np.ndarray], dtype: Optional[np.dtype] = None, grid_mask_binarize_precision: Optional[float] = 1e-6, grid_mask_dtype: np.dtype = np.uint8, win: Optional[np.ndarray] = None, ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: """Get a linearly interpolated oversampled grid from the input grid. This function takes an input grid and optionally an associated mask, then oversamples them to a higher resolution based on provided oversampling factors. It handles the definition of the target interpolation coordinates and leverages `interpolate_grid` for the actual resampling. The output grid and mask will have dimensions scaled by `grid_oversampling_row` and `grid_oversampling_col`. Parameters ---------- grid : Optional[np.ndarray] Input 3-dimensional grid. Its first dimension represents different variables or bands, while the subsequent two dimensions correspond to spatial (row, column) data. Can be `None` if only `grid_mask` is to be oversampled. grid_oversampling_row : int Integer factor by which to oversample the grid along the row dimension. A value of 1 means no oversampling in this dimension. grid_oversampling_col : int Integer factor by which to oversample the grid along the column dimension. A value of 1 means no oversampling in this dimension. grid_mask : Optional[np.ndarray] Optional 2-dimensional mask associated with the input grid. If `grid` is `None`, this mask's shape is used to determine the original grid dimensions. dtype : Optional[np.dtype], default None Data type to use for computation and for the output interpolated grid. If `None`, it attempts to infer from `grid.dtype` if `grid` is provided and floating-point; otherwise, a `ValueError` is raised. grid_mask_binarize_precision : Optional[float], default 1e-6 Threshold used for the interpolated mask's binarization. This parameter is passed directly to `interpolate_grid`. grid_mask_dtype : np.dtype, default np.uint8 Data type of the output mask (e.g., `np.uint8` for 0 or 1 values, or `bool`). This parameter is passed directly to `interpolate_grid`. win : Optional[np.ndarray], default None An optional window (or sub-region) of the *oversampled* grid to compute. If `None`, the entire oversampled grid is processed. The window should be defined in terms of the *oversampled* coordinates (e.g., ``((row_start, row_end), (col_start, col_end))``). Returns ------- Tuple[Optional[np.ndarray], Optional[np.ndarray]] A tuple containing: - `out_grid` (`Optional[np.ndarray]`): The oversampled 3-dimensional grid, or `None` if the input `grid` was `None`. - `out_grid_mask` (`Optional[np.ndarray]`): The oversampled and binarized 2-dimensional mask, or `None` if the input `grid_mask` was `None`. Raises ------ AssertionError If `grid` is provided but its number of dimensions (`ndim`) is not 3. AssertionError If the calculated or provided `win` is outside the domain of the oversampled grid (checked by an internal `window_check` function). ValueError If `dtype` is `None` and the function cannot infer a floating-point type from the input `grid`. """ # Check that the grid dimension is 3 out_grid, out_grid_mask = None, None nrows, ncols = None, None if grid is not None: assert grid.ndim == 3 nrows, ncols = grid.shape[1:] else: nrows, ncols = grid_mask.shape # Define the window to the full data if not given if win is None: # Define the window on full grid # - first idx is 0 # - last idx equals to (shape(axis) - 1) * oversampling # => the number of points along an axis is given by : # (shape(axis) -1) * oversampling + 1 win = np.asarray( [[0, (nrows - 1) * grid_oversampling_row], [0, (ncols - 1) * grid_oversampling_col]] ) # Check the window is OK - for that we pass an ArrayProfile in order to # mock the output array's profile out_array_profile = ArrayProfile( shape=((nrows - 1) * grid_oversampling_row + 1, (ncols - 1) * grid_oversampling_col + 1), ndim=2, dtype=float, ) if not window_check(out_array_profile, win): raise Exception("window outside of output grid domain") # Check dtype to use if dtype is None: if grid is not None and np.issubdtype(grid.dtype, np.floating): dtype = grid.dtype else: raise ValueError( "You must precise argument 'dtype'. Cannot tell " "if float32 or float64 should be used" ) # Create associated coordinates # Input grid coordinates # y = np.arange(0, nrows, dtype=xy_dtype) * grid_oversampling_row # x = np.arange(0, ncols, dtype=xy_dtype) * grid_oversampling_col # Target grid coordinates # y_new = np.arange(win[0,0], win[0,1]+1, dtype=xy_dtype) # x_new = np.arange(win[1,0], win[1,1]+1, dtype=xy_dtype) y = np.linspace(0, (nrows - 1) * grid_oversampling_row, nrows, dtype=dtype) x = np.linspace(0, (ncols - 1) * grid_oversampling_col, ncols, dtype=dtype) y_new = np.linspace(win[0, 0], win[0, 1], win[0, 1] - win[0, 0] + 1, dtype=dtype) x_new = np.linspace(win[1, 0], win[1, 1], win[1, 1] - win[1, 0] + 1, dtype=dtype) # Interpolate the grid out_grid, out_grid_mask = interpolate_grid( grid=grid, grid_mask=grid_mask, x=x, y=y, x_new=x_new, y_new=y_new, dtype=dtype, mask_binarize_precision=grid_mask_binarize_precision, mask_dtype=grid_mask_dtype, ) return out_grid, out_grid_mask
[docs] def build_grid( resolution: Tuple[int, int], grid: np.ndarray, grid_target_win: np.ndarray, grid_resolution: Tuple[int, int], out: np.ndarray, computation_dtype: Optional[np.dtype] = None, ) -> Optional[np.ndarray]: """Create the target resolution grid. This method generates a grid at a specified target resolution by resampling an input raster-like grid. It performs no I/O operations. The function supports providing a preallocated output buffer for efficiency. A preallocated output buffer can be passed to the method via the `out` argument. If provided, its shape must be consistent with the expected output shape determined by `grid_target_win` and `resolution`. The data type used for interpolation can also be specified with `computation_dtype`. Note that if `computation_dtype` differs from the output buffer's data type (or the input grid's data type if `out` is not provided), an implicit cast to the output data type will be performed during the final assignment. Parameters ---------- resolution : Tuple[int, int] The desired resolution of the output grid. Currently, only full resolution (i.e., `(1, 1)`) is implemented. This parameter influences the resampling (oversampling) of the input grid. grid : np.ndarray The input grid, expected to be a 3-dimensional NumPy array. Its first dimension typically represents different bands or variables. grid_target_win : np.ndarray The production window for the output grid. It's provided as a 2-dimensional array of shape (2, 2) defining ``((first_row, last_row), (first_col, last_col))``. This window is expressed in the output grid's coordinate system. grid_resolution : Tuple[int, int] The resolution (oversampling factors) of the input `grid` in row and column directions. For example, `(2, 2)` means the input grid is twice oversampled in both dimensions compared to its intrinsic resolution. out : np.ndarray An optional preallocated NumPy array buffer to store the result. If provided, its shape must perfectly match the expected output shape derived from `grid_target_win` and `resolution`. computation_dtype : Optional[np.dtype], default None An optional data type to use for the interpolation computations. If `None`, it defaults to the `out` array's data type if `out` is given, or the `grid` data type otherwise. Returns ------- Optional[np.ndarray] The computed grid as a NumPy array if `out` was `None`. If `out` was provided, the result is written directly into it, and this function returns `None`. Raises ------ ValueError If `resolution` or `grid` are `None`. ValueError If the input `grid` does not have 3 dimensions. ValueError If the desired output `resolution` is not `(1, 1)` (due to current implementation limitations). ValueError If `grid_target_win` is not a 2D window. ValueError If `grid_target_win` is outside the bounds of the full-resolution input grid. ValueError If `out` is provided but its shape does not match the expected output shape. """ ret = None # -- Perform some checks on arguments and init optional arguments if resolution is None: raise ValueError("You must provide both the 'shape' and 'resolution' " "arguments") if grid is None: raise ValueError("You must provide the 'grid' argument") if grid.ndim != 3: raise ValueError("Input grid must have 3 dimensions") if ~np.all(resolution == (1, 1)): raise ValueError( "Output resolution different from full resolution have" " not been implemented yet" ) grid_full_res_profile = ArrayProfile( shape=( grid.shape[0], (grid.shape[1] - 1) * grid_resolution[0] + 1, (grid.shape[2] - 1) * grid_resolution[1] + 1, ), ndim=grid.ndim, dtype=grid.dtype, ) if grid_target_win is None: # Compute full size grid_target_win = np.asarray( [[0, grid_full_res_profile.shape[1] - 1], [0, grid_full_res_profile.shape[2] - 1]] ) else: grid_target_win = np.asarray(grid_target_win) if grid_target_win.ndim != 2: raise ValueError("The argument 'grid_target_win' must be a 2d " "window") grid_target_win3 = window_expand_ndim(grid_target_win, (0, grid.shape[0] - 1)) # check that the target window lies in the full resolution grid if not window_check(arr=grid_full_res_profile, win=grid_target_win3, axes=None): raise ValueError( "Target window error is not contained in input grid : " f"\n\t Input grid : {grid_full_res_profile.shape}" f"\n\t Window : {grid_target_win3}" ) # Compute shape shape3 = window_shape(grid_target_win3) # Init output buffer if not given if out is None: out = np.zeros(shape3, dtype=grid.dtype) ret = out elif ~np.all(out.shape == shape3): raise ValueError("The values of the 2 arguments 'out' and 'shape' does " "not match.") if computation_dtype is None: computation_dtype = out.dtype # elif dtype != out.dtype: # raise ValueError(f"Ouput data type {out.dtype} does not match with the " # f"input argument 'dtype' {dtype}") # -- End of argument's checks if grid_resolution[0] != 1 or grid_resolution[1] != 1: # We have to oversample the grid to the output resolution # FUTURE_WARNING : if resolution_out != (1,1) we will have to # reimplement this part out[:, :, :], _ = oversample_regular_grid( grid=grid, grid_oversampling_row=grid_resolution[0], grid_oversampling_col=grid_resolution[1], grid_mask=None, win=grid_target_win, # the method takes a 2d window dtype=computation_dtype, ) else: # No computation to perform - just select the target window # FUTURE_WARNING : if resolution_out != (1,1) we will have to # reimplement this part # Please note the method takes a window with same ndim as grid out[:, :, :] = window_apply(arr=grid, win=grid_target_win3) return ret