# coding: utf8
#
# Copyright (c) 2025 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of GRIDR
# (see https://github.com/CNES/gridr).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Grid resampling
"""
# pylint: disable=C0413
import logging
import sys
from typing import Any, NoReturn, Optional, Tuple, Union
import numpy as np
from gridr.cdylib import PyArrayWindow2, py_array1_grid_resampling_f64
from gridr.core.grid.grid_commons import grid_full_resolution_shape, grid_resolution_window_safe
from gridr.core.grid.grid_mask import Validity
from gridr.core.grid.grid_utils import (
array_compute_resampling_grid_geometries,
array_compute_resampling_grid_src_boundaries,
read_win_from_grid_metrics,
)
from gridr.core.interp.bspline_prefiltering import array_bspline_prefiltering
from gridr.core.interp.interpolator import (
Interpolator,
InterpolatorIdentifier,
get_interpolator,
is_bspline,
)
from gridr.core.utils.array_pad import pad_inplace
from gridr.core.utils.array_utils import ArrayProfile
PY311 = sys.version_info >= (3, 11)
if PY311:
from typing import Self # noqa: E402, F401
else:
from typing_extensions import Self # noqa: E402, F401
# pylint: enable=C0413
F64_F64_F64 = (np.dtype("float64"), np.dtype("float64"), np.dtype("float64"))
PY_ARRAY_GRID_RESAMPLING_FUNC = {
F64_F64_F64: py_array1_grid_resampling_f64,
}
STANDALONE_SAFECHECK_SOURCE_BOUNDARIES = True
"""
Parameter activating an additional validation of the grid source boundaries to
ensure topological consistency in standalone mode.
This check computes the source boundaries from all valid grid data within the
current computed region, verifying that the source boundaries extracted from
grid metrics align with the hull border.
When using grid metrics only, we assumes that points inside the source hull
correspond to points within the target hull, maintaining topological integrity.
If this assumption is violated, the read window may be insufficient, potentially
causing a Rust panic when attempting to access out-of-bounds indices.
This safety check helps prevent such runtime errors by proactively extending
boundary conditions if required.
"""
[docs]
def calculate_source_extent(
interp: Interpolator,
array_in: np.ndarray,
grid_row: np.ndarray,
grid_col: np.ndarray,
grid_resolution: Tuple[int, int],
grid_nodata: Optional[Union[int, float]],
grid_mask: np.ndarray,
grid_mask_valid_value: Optional[int] = 1,
win: Optional[np.ndarray] = None,
safecheck_src_boundaries: Optional[bool] = True,
logger_msg_prefix: Optional[str] = None,
logger: Optional[logging.Logger] = None,
):
"""Calculate the source array read window with margins for interpolation.
This function computes the minimal read window from the source array
required to resample data onto the target grid. It accounts for
interpolation margins and handles boundary cases where the required window
extends beyond the source array bounds.
The calculation proceeds in three steps:
1. Compute grid metrics from valid grid coordinates
2. Apply interpolation margins to determine the required source extent
3. Adjust for boundary conditions and compute necessary padding
Parameters
----------
interp : Interpolator
Interpolator instance that defines the required margins for
interpolation.
See `gridr.core.interp.interpolator` for details.
array_in : np.ndarray
Source array to be resampled. Must be a C-contiguous 3D array with shape
(nvar, nrow, ncol) or 2D array with shape (nrow, ncol).
grid_row : np.ndarray
2D array of row coordinates in the source array coordinate system.
Must have the same shape as `grid_col`.
grid_col : np.ndarray
2D array of column coordinates in the source array coordinate system.
Must have the same shape as `grid_row`.
grid_resolution : tuple of int
Oversampling factor as (row_resolution, col_resolution). Value of 1
indicates full resolution; higher values indicate coarser grids.
grid_nodata : int or float, optional
Value in `grid_row` and `grid_col` indicating invalid cells. Mutually
exclusive with `grid_mask` (exclusivity enforced in core method).
grid_mask : np.ndarray, optional
Integer mask array for the grid. Cells matching `grid_mask_valid_value`
are considered valid. Must have the same shape as `grid_row` and
`grid_col`.
grid_mask_valid_value : int, default=1
Value in `grid_mask` that designates valid grid cells. Required if
`grid_mask` is provided.
win : np.ndarray, optional
Target window in full-resolution grid coordinates, shape (2, 2):
``[[row_start, row_end], [col_start, col_end]]``. If None, processes
the entire grid.
safecheck_src_boundaries : bool, default=True
If True, computes the source boundaries from all valid grid data.
logger_msg_prefix : str, optional
Prefix for log messages generated by this function.
logger : logging.Logger, optional
Logger instance for diagnostic messages.
Returns
-------
array_src_win_read : np.ndarray, shape (2, 2)
Final source read window adjusted for margins and boundary constraints.
Format: ``[[row_start, row_end], [col_start, col_end]]``. Use this
window for raster IO operations.
array_src_win_marged : np.ndarray, shape (2, 2)
Desired read window with margins applied, before boundary correction.
Format: ``[[row_start, row_end], [col_start, col_end]]``.
pad : np.ndarray, shape (2, 2)
Padding required if the marged window extends outside the source array.
Format: ``[[top_pad, bottom_pad], [left_pad, right_pad]]``.
Notes
-----
- If no valid grid points are found, returns None for all outputs
- The `safecheck_src_boundaries` option is useful to detect grid topology
issues that could cause panic
- Padding may be required when the grid extends beyond source boundaries,
which should be filled using appropriate boundary conditions
See Also
--------
array_compute_resampling_grid_geometries : Computes grid metrics from
coordinates read_win_from_grid_metrics : Derives read window from grid
metrics source_extent_pad : Applies padding to source arrays
"""
def DEBUG(msg):
if logger:
logger.debug(f"{logger_msg_prefix} - {msg}")
def WARNING(msg):
if logger:
logger.warning(f"{logger_msg_prefix} - {msg}")
# Computing total required margins
# (top, bottom, left, right)
margin = np.asarray(interp.total_margins()).reshape((2, 2))
DEBUG(f"Required margins for interpolator {interp.shortname()} : {margin}")
# Compute explicit grid target window if it is not defined
if win is None:
full_shape_out = grid_full_resolution_shape(
shape=grid_row.shape, resolution=grid_resolution
)
win = np.array(((0, full_shape_out[0] - 1), (0, full_shape_out[1] - 1)))
# Determine the minimal coarse-grid window containing the oversampled window
# `oversamped_grid_win`.
grid_arr_win, _ = grid_resolution_window_safe(
resolution=grid_resolution, win=win, grid_shape=grid_row.shape
)
# Compute strip grid metrics for current tile
DEBUG("Computing grid metrics... ")
grid_metrics = array_compute_resampling_grid_geometries(
grid_row=grid_row,
grid_col=grid_col,
grid_resolution=grid_resolution,
win=grid_arr_win,
grid_mask=grid_mask,
grid_mask_valid_value=grid_mask_valid_value,
grid_nodata=grid_nodata, # TODO : check not None is supported
)
# Grid metrics may be None if no valid points
# First init returned values to None then proceed with defined grid_metrics
array_src_win_read, array_src_win_marged, pad = None, None, None
if grid_metrics:
if safecheck_src_boundaries:
DEBUG("SAFECHECK_SOURCE_BOUNDARIES : Computing source boundaries... ")
# Compute source boundaries from all valid coordinates
safe_src_boundaries = array_compute_resampling_grid_src_boundaries(
grid_row=grid_row,
grid_col=grid_col,
win=grid_arr_win,
grid_mask=grid_mask,
grid_mask_valid_value=grid_mask_valid_value,
grid_nodata=grid_nodata, # TODO : check not None is supported
)
DEBUG(f"SAFECHECK_SOURCE_BOUNDARIES : {safe_src_boundaries}")
# Check that the grid preserve the source topology
if (
safe_src_boundaries.xmin < grid_metrics.src_bounds.xmin
or safe_src_boundaries.xmax > grid_metrics.src_bounds.xmax
or safe_src_boundaries.ymin < grid_metrics.src_bounds.ymin
or safe_src_boundaries.ymax > grid_metrics.src_bounds.ymax
):
# Boundaries extend is required !
WARNING(
"SAFECHECK_SOURCE_BOUNDARIES : The grid does not respect the source topology"
" - the source boundaries have to be expanded"
)
# Replace the source boundaries
DEBUG("SAFECHECK_SOURCE_BOUNDARIES : Expanding grid metrics source boundaries... ")
grid_metrics.src_bounds = safe_src_boundaries
array_src_profile = array_in
if array_in.ndim == 3:
array_src_profile = array_in[0]
array_src_win_read, array_src_win_marged, pad = read_win_from_grid_metrics(
grid_metrics=grid_metrics,
array_src_profile_2d=array_src_profile,
margins=margin,
logger=logger,
logger_msg_prefix=logger_msg_prefix,
)
return array_src_win_read, array_src_win_marged, pad
[docs]
def get_array_padded_shape(
array_src: np.ndarray,
pad: Tuple[int, int],
) -> (Tuple[int], Tuple[slice]):
"""Compute padded array shape and source window slice for padding operations.
This utility function calculates the shape of an array after padding and
generates slice objects to position the original data within the padded array.
Parameters
----------
array_src : np.ndarray
Source array to be padded. Must be 2D (nrow, ncol) or 3D (nvar, nrow, ncol).
pad : tuple of tuple of int
Padding amounts as ``((top, bottom), (left, right))``.
Returns
-------
array_padded_shape : tuple of int
Shape of the padded array:
- For 3D input: (nvar, nrow + top + bottom, ncol + left + right)
- For 2D input: (nrow + top + bottom, ncol + left + right)
source_window : tuple of slice
Slice objects to position the original array within the padded array:
- For 3D: (slice(None), slice(top, top+nrow), slice(left, left+ncol))
- For 2D: (slice(top, top+nrow), slice(left, left+ncol))
Raises
------
ValueError
If input array has neither 2 nor 3 dimensions.
Notes
-----
This function does not allocate or modify arrays; it only computes metadata
for padding operations. Use `source_extent_pad` to apply actual padding.
Examples
--------
>>> import numpy as np
>>> from gridr import get_array_padded_shape
>>>
>>> # For a 2D array
>>> arr = np.ones((100, 100))
>>> pad = ((5, 5), (10, 10))
>>> shape, window = get_array_padded_shape(arr, pad)
>>> print(shape) # (110, 120)
>>> print(window) # (slice(5, 105), slice(10, 110))
>>>
>>> # For a 3D array
>>> arr = np.ones((3, 100, 100))
>>> shape, window = get_array_padded_shape(arr, pad)
>>> print(shape) # (3, 110, 120)
>>> print(window) # (slice(None), slice(5, 105), slice(10, 110))
"""
array_padded_shape, source_window = None, None
match array_src.ndim:
case 3:
array_padded_shape = (
array_src.shape[0],
array_src.shape[1] + pad[0][0] + pad[0][1],
array_src.shape[2] + pad[1][0] + pad[1][1],
)
source_window = (
slice(None, None),
slice(pad[0][0], pad[0][0] + array_src.shape[1]),
slice(pad[1][0], pad[1][0] + array_src.shape[2]),
)
case 2:
array_padded_shape = (
array_src.shape[0] + pad[0][0] + pad[0][1],
array_src.shape[1] + pad[1][0] + pad[1][1],
)
source_window = (
slice(pad[0][0], pad[0][0] + array_src.shape[0]),
slice(pad[1][0], pad[1][0] + array_src.shape[1]),
)
case _:
raise ValueError("Input array must have 2 or 3 dimensions")
return array_padded_shape, source_window
[docs]
def source_extent_pad(
array_src: np.ndarray,
pad,
boundary_condition,
fill: Optional[Any] = None,
) -> np.ndarray:
"""Apply padding to a source array with specified boundary conditions.
This function creates a padded version of the input array, optionally filling
the padded regions using boundary conditions (edge, reflect, symmetric, wrap)
or a constant fill value.
Parameters
----------
array_src : np.ndarray
Source array to pad. Must be 2D (nrow, ncol) or 3D (nvar, nrow, ncol),
C-contiguous.
pad : tuple of tuple of int
Padding amounts as ``((top, bottom), (left, right))``.
boundary_condition : str, optional
Boundary condition mode for padding the margins. If None, padded regions
are left uninitialized (except if `fill` is provided). Available modes:
- 'edge': Repeat edge values
- 'reflect': Mirror reflection without repeating edge
- 'symmetric': Mirror reflection with repeating edge
- 'wrap': Circular wrap-around
fill : scalar, optional
Value to initialize padded regions before applying boundary conditions.
If None, array is allocated uninitialized (faster but contains garbage
values in padded regions if `boundary_condition` is None).
Returns
-------
array_padded : np.ndarray
Padded array with the same dtype as `array_src`. Shape:
- For 3D input: (nvar, nrow + top + bottom, ncol + left + right)
- For 2D input: (nrow + top + bottom, ncol + left + right)
Notes
-----
- The original data is always copied into the center of the padded array
- If both `boundary_condition` and `fill` are provided, `fill` is applied
first, then the boundary condition overwrites the padded regions
- Uses an optimized in-place padding implementation (`pad_inplace`)
- The returned array is always C-contiguous
See Also
--------
get_array_padded_shape : Computes padded shape without allocation
pad_inplace : Low-level in-place padding function
Examples
--------
>>> import numpy as np
>>> from gridr import source_extent_pad
>>>
>>> # Pad with edge replication
>>> arr = np.arange(9).reshape(3, 3)
>>> padded = source_extent_pad(arr, pad=((1, 1), (1, 1)),
... boundary_condition='edge')
>>> print(padded.shape) # (5, 5)
>>>
>>> # Pad with constant fill value
>>> padded = source_extent_pad(arr, pad=((2, 2), (2, 2)),
... boundary_condition=None, fill=-999)
>>>
>>> # Pad 3D array with reflection
>>> arr_3d = np.random.rand(3, 100, 100)
>>> padded_3d = source_extent_pad(arr_3d, pad=((5, 5), (5, 5)),
... boundary_condition='reflect')
"""
array_padded_shape, source_window = get_array_padded_shape(array_src, pad)
# Allocate a new buffer
array_padded = None
if fill is not None:
array_padded = np.full(array_padded_shape, fill, dtype=array_src.dtype, order="C")
else:
array_padded = np.empty(array_padded_shape, dtype=array_src.dtype, order="C")
# Copy original data
array_padded[source_window] = array_src[:]
# Apply the boundary condition if any
if boundary_condition:
if array_padded.ndim == 3:
pad = ((0, 0),) + tuple(pad)
pad_inplace(
array=array_padded,
src_win=source_window,
pad_width=pad,
mode=boundary_condition,
strict_size=True,
)
return array_padded
[docs]
def array_grid_resampling(
interp: InterpolatorIdentifier,
array_in: np.ndarray,
grid_row: np.ndarray,
grid_col: np.ndarray,
grid_resolution: Tuple[int, int],
array_out: Optional[np.ndarray],
array_out_win: Optional[np.ndarray] = None,
nodata_out: Optional[Union[int, float]] = 0,
array_in_origin: Optional[Tuple[float, float]] = (0.0, 0.0),
win: Optional[np.ndarray] = None,
array_in_mask: Optional[np.ndarray] = None,
grid_mask: Optional[np.ndarray] = None,
grid_mask_valid_value: Optional[int] = 1,
grid_nodata: Optional[float] = None,
array_out_mask: Optional[Union[np.ndarray, bool]] = None,
check_boundaries: bool = True,
interp_kwargs: Optional[dict] = None,
standalone: Optional[bool] = True,
boundary_condition: Optional[bool] = None,
logger_msg_prefix: Optional[str] = None,
logger: Optional[logging.Logger] = None,
) -> Tuple[Union[np.ndarray, NoReturn], Union[np.ndarray, NoReturn]]:
"""Resamples an input array based on target grid coordinates, applying an
optional bilinear interpolation for low resolution grids.
The method uses target grid coordinates (`grid_row` and `grid_col`) that
may represent a lower resolution than the input array. Bilinear
interpolation is applied internally to compute missing target coordinates.
The oversampling factor is specified by the `grid_resolution` parameter,
where a value of 1 indicates full resolution.
The interpolation method is set through the `interp` parameter.
This method wraps a Rust function (`py_array1_grid_resampling_*`) for
efficient resampling. The underlying Rust implementation requires that:
1. All target positions in `array_in` referenced by grid coordinates must be
accessible, along with their neighborhoods needed for interpolation and
preprocessing (e.g., B-spline prefiltering).
2. Any required preprocessing (e.g., B-spline prefiltering) must be completed
before interpolation.
**Execution Modes**
The method supports two execution modes controlled by the `standalone` parameter:
**Standalone Mode (standalone=True)**:
Handles all preprocessing automatically, making the function fully self-contained:
- **Automatic Padding**: If `array_in` is too small to satisfy interpolation
requirements (e.g., neighborhood access for the interpolator or grid
coordinates falling near boundaries), the array is automatically padded
according to `boundary_condition`.
- **Mask Handling**: If `array_in_mask` is provided, it is padded consistently
with `array_in`:
* With `boundary_condition` set: Padded mask values are extrapolated from
the original mask according to the boundary condition (e.g., 'edge'
repeats boundary mask values, 'reflect' mirrors them without including the edge).
* With `boundary_condition=None`: Padded regions are marked as invalid
(typically set to 0).
* If no mask is provided and `boundary_condition=None`: A mask is created
with padded regions marked as invalid.
- **Preprocessing**: All required preprocessing steps (e.g., B-spline
coefficient calculation) are performed automatically.
Use this mode when calling the function independently or when you want
the function to handle all edge cases automatically.
**Integrated Mode (standalone=False)**:
Assumes preprocessing has been handled externally, offering maximum performance:
- **No Padding**: Assumes `array_in` is already large enough to satisfy all
interpolation requirements. The caller is responsible for ensuring adequate
array dimensions.
- **No Mask Preprocessing**: Assumes `array_in_mask` (if provided) is already
properly sized and formatted.
- **No Preprocessing**: Assumes all required preprocessing (e.g., B-spline
prefiltering) has been completed externally.
Use this mode within tiled processing pipelines where padding and preprocessing
are managed at a higher level to avoid redundant operations across tiles.
Parameters
----------
interp: InterpolatorIdentifier
The interpolator identifier. It can be:
- A string representing the interpolator name (e.g., "nearest", "linear"
, "cubic", etc.).
- A `PyInterpolatorType` enum value.
- An instance of an interpolator class.
See `gridr.core.interp.interpolator` for further details
array_in : np.ndarray
The input array to be resampled. It must be a contiguous 2D (nrow,
ncol) or 3D (nvar, nrow, ncol) array.
grid_row : np.ndarray
A 2D array representing the row coordinates of the target grid, with
the same shape as `grid_col`. The coordinates target row positions in
the `array_in` input array.
grid_col : np.ndarray
A 2D array representing the column coordinates of the target grid,
with the same shape as `grid_row`. The coordinates target column
positions in the `array_in` input array.
grid_resolution : Tuple[int, int]
A tuple specifying the oversampling factor for the grid for rows and
columns. The resolution value of 1 represents full resolution, and
higher values indicate lower resolution grids.
array_out : Optional[np.ndarray]
The output array where the resampled values will be stored.
If `None`, a new array will be allocated. The shape of the output array
is either determined based on the resolution and the input grid or by
the optional `win` parameter.
array_out_win : Optional[np.ndarray], default None
An optional `np.ndarray` that designates the specific area in
`array_out` to receive the resampled data. If `None`, the method will
populate a default rectangular region starting from `array_out`'s
top-left corner. This argument is only considered when `array_out` is
passed, requiring `array_out` to be large enough to contain
`array_out_win`.
nodata_out : Optional[Union[int, float]], default 0
The value to be assigned to "NoData" in the output array. This value
is used to fill in missing values where no valid resampling could occur
or where a mask flag is set.
array_in_origin : Optional[Tuple[float, float]], default (0., 0.)
Bias to respectively apply to the `grid_row` and `grid_col` coordinates.
The operation is performed by the wrapped Rust function. Its primary use
cases include aligning with alternative grid origin conventions or
handling situations where the provided `array_in` array corresponds to a
subregion of the complete source raster.
win : Optional[np.ndarray], default None
A window (or sub-region) of the full resolution grid to limit the
resampling to a specific target region. The window is defined as a list
of tuples containing the first and last indices for each dimension.
If `None`, the entire grid is processed.
array_in_mask : Optional[np.ndarray], default None
A mask for the input array that indicates which parts of `array_in`
are valid for resampling. If not provided, the entire input array is
considered valid.
grid_mask : Optional[np.ndarray], default None
An optional integer mask array for the grid. Grid cells corresponding to
`grid_mask_valid_value` are considered **valid**; all other values
indicate **invalid** cells and will result in `nodata_out` in the output
array. If not provided, the entire grid is considered valid. The grid
mask must have the same shape as `grid_row` and `grid_col`.
grid_mask_valid_value : Optional[int], default 1
The value in `grid_mask` that designates a **valid** grid cell.
All values in `grid_mask` that differ from this will be treated as
**invalid**. This parameter is required if `grid_mask` is provided.
grid_nodata : Optional[float], default None
The value in `grid_row` and `grid_col` to consider as **invalid**
cells. Please note this option is exclusive with `grid_mask`. The
exclusivity is managed within the bound core method.
array_out_mask : Optional[Union[np.ndarray, bool]], default None
A mask for the output array that indicates where the resampled values
should be stored. If `True`, a new array will be allocated and initially
filled with 0. The shape of this output mask array is consistent with
the `array_out` shape. If `None` or not `True`, the entire output array
is assumed to be valid.
check_boundaries : bool, default True
Force a check at each iteration to ensure that the required data to
perform interpolation is available in the source data.
This parameter adresses the core Rust function and can be set to False
for performance gain if you are sure that all the required data is
available.
interp_kwargs : Optional[dict], default=None
Optional keyword parameters that will be passed to the `get_interpolator`
function for interpolator creation. Used when `interp` is of type `str`
or `PyInterpolatorType`.
standalone : bool, default=True
Controls the execution mode:
- `True`: **Standalone mode** - Performs all preprocessing automatically,
including padding, mask handling, and any required interpolator-specific
preprocessing (e.g., B-spline prefiltering). Use when calling this
function independently.
- `False`: **Integrated mode** - Assumes all preprocessing has been handled
externally. Offers maximum performance for tiled processing pipelines.
The caller must ensure `array_in` is adequately sized and preprocessed.
boundary_condition : Optional[str], default=None
Defines how to handle boundary conditions when padding is required in
standalone mode. Ignored when `standalone=False`. Options:
- `'edge'`: Pad with the edge values of the array (repeat boundary values).
- `'wrap'`: Wrap around to the opposite edge (circular/periodic boundary).
- `'reflect'`: Mirror reflection without repeating the edge values.
- `'symmetric'`: Mirror reflection with edge values repeated.
- `None`: No padding is applied. If insufficient data is available for
interpolation, those regions will be marked as invalid in the mask.
The boundary condition applies to both `array_in` and `array_in_mask`
(if provided). For masks, the boundary values are extrapolated according
to the same rule, or marked as invalid if `boundary_condition=None`.
logger_msg_prefix : Optional[str], default=None
A prefix to add to all log messages generated by this function.
logger : Optional[logging.Logger], default=None
A logger instance for outputting diagnostic messages.
Returns
-------
Tuple[Union[np.ndarray, NoReturn], Union[np.ndarray, NoReturn]]
A tuple containing:
- The resampled array. If `array_out` was provided, this will be
`None` (as the result is written in-place).
- The resampled output mask. If `array_out_mask` was `False` or
`None`, this will be `None`.
Raises
------
ValueError
If incompatible parameters are provided (e.g., both `array_in_origin` and
`standalone=True`).
AssertionError
If input arrays are not C-contiguous.
AssertionError
If grid-related arrays have mismatched shapes.
AssertionError
If optional `array_in_mask` shape is not consistent with `array_in`.
Exception
If the `py_array_grid_resampling_*` function (the underlying Rust
binding) is not available for the provided input types.
Notes
-----
- This method is designed for resampling raster-like data using a grid of
target coordinates.
- With integrated mode (`standalone=False`) this method is designed to be
embedded in code that works on tiles, supporting both tiled inputs and
outputs.
- For correct results, ensure that the `grid_row` and `grid_col` values
represent the desired target grid coordinates within the full resolution
grid system.
- When `standalone=True`, the function may allocate temporary arrays
internally, which may increase memory usage.
Limitations
-----------
- The method assumes that all input arrays (`array_in`, `grid_row`,
`grid_col`, etc.) are C-contiguous. If any are not, the method may
raise an assertion error.
- The method assumes that the grid-related arrays (`grid_row`, `grid_col`,
`grid_mask`) have the same shapes. Mismatched shapes will raise an
assertion error.
- The `win` parameter, if provided, must be compatible with the resolution
of the grid. If `win` exceeds the bounds of the grid, an error may
occur.
- For large grids or arrays, performance may degrade. Users should test
the method's efficiency for their specific data sizes before using it
in production.
- This method assumes that the input grid is in a "full resolution" grid
coordinate system. If the coordinate system is different, the resampling
may produce incorrect results.
Example
-------
**Standalone mode with automatic padding**:
.. code-block:: python
>>> array_in = np.random.rand(100, 100)
>>> grid_row = np.linspace(0, 99, 50)
>>> grid_col = np.linspace(0, 99, 50)
>>> grid_resolution = (2, 2)
>>> result, mask = array_grid_resampling(
... interp="cubic",
... array_in=array_in,
... grid_row=grid_row,
... grid_col=grid_col,
... grid_resolution=grid_resolution,
... array_out=None,
... standalone=True,
... boundary_condition='reflect'
... )
**Integrated mode for tiled processing**:
.. code-block:: python
>>> # Assume array_in is already padded and preprocessed
>>> updated_array = preprocess(interp, array_in) # External function
>>> result, mask = array_grid_resampling(
... interp="cubic",
... array_in=updated_array,
... grid_row=grid_row,
... grid_col=grid_col,
... grid_resolution=grid_resolution,
... array_out=None,
... standalone=False
... )
"""
ret = None
ret_mask = None
# First perform some checks
assert array_in.flags.c_contiguous is True
assert grid_row.flags.c_contiguous is True
assert grid_col.flags.c_contiguous is True
array_in_shape = array_in.shape
if len(array_in_shape) == 2:
array_in_shape = (1,) + array_in_shape
assert np.all(grid_row.shape == grid_col.shape)
assert len(grid_row.shape) == 2
grid_shape = grid_row.shape
# Manage optional input mask
# array_in_mask_dtype = np.dtype("uint8")
if array_in_mask is not None:
# array_in_mask_dtype = array_in_mask.dtype
# check shape
assert array_in_mask.dtype == np.dtype("uint8")
assert array_in_mask.shape[0] == array_in_shape[1]
assert array_in_mask.shape[1] == array_in_shape[2]
# Getting the interpolator object from its identifier
interp_kwargs = interp_kwargs if interp_kwargs is not None else {}
interp = get_interpolator(interp, **interp_kwargs)
if standalone:
if array_in_origin is not None and np.any(np.asarray(array_in_origin) != 0.0):
raise ValueError("Shifting the array origin is not available for standalone mode")
# Initialize the interpolator - required for B-spline for instance
interp.initialize()
# Compute require source extent in order to compute resampling from the
# grid.
(
_,
array_src_win_marged,
pad,
) = calculate_source_extent(
interp=interp,
array_in=array_in,
grid_row=grid_row,
grid_col=grid_col,
grid_resolution=grid_resolution,
grid_nodata=grid_nodata,
grid_mask=grid_mask,
grid_mask_valid_value=grid_mask_valid_value,
win=win,
safecheck_src_boundaries=STANDALONE_SAFECHECK_SOURCE_BOUNDARIES,
logger_msg_prefix=logger_msg_prefix,
logger=logger,
)
# Check if input array is sufficient
if np.any(pad != 0):
# TODO : not optimal if there is no padding required at an edge,
# we may not need all input data from the input array.
# The source_extent_pad method does only perfom padding and do not
# consider cropping.
array_padded_fill = None
mask_padded_fill = None
mask_padded = None
if boundary_condition is None:
array_padded_fill = 0
mask_padded_fill = Validity.INVALID
array_padded = source_extent_pad(
array_src=array_in,
pad=pad,
boundary_condition=boundary_condition,
fill=array_padded_fill,
)
# # Manage input mask if any is given
# if array_in_mask is not None:
# mask_padded = source_extent_pad(
# array_src=array_in_mask,
# pad=pad,
# boundary_condition=boundary_condition,
# fill=mask_padded_fill,
# )
# elif boundary_condition is None:
# # Mask handling:
# # - A default mask must be created if:
# # * No mask is provided AND
# # * No boundary condition is specified
# # - No mask creation is needed if:
# # * A boundary condition is provided and applied to the input array
# # * In this case, all points are considered valid by default
# mask_profile = ArrayProfile(
# shape=(array_in_shape[1], array_in_shape[2]),
# ndim=2,
# dtype=np.uint8,
# )
# (mask_padded_shape, mask_source_window) = get_array_padded_shape(mask_profile, pad)
# # Allocate buffer for padded mask and fill it with INVALID value
# mask_padded = np.full(
# mask_padded_shape, Validity.INVALID, dtype=np.uint8, order="C"
# )
# # Mark original region as VALID
# mask_padded[mask_source_window] = Validity.VALID
# Mask handling:
# 1. If a mask is provided if must be padded considering boundary condition
# 2. Otherwise :
# 2.1 A default mask must be created if:
# - No boundary condition is specified in order to mark domain extension as invalid.
# - A boundary condition is specified and the interpolator is a BSpline (validity
# for domain extension has to be set)
# 2.2 No mask creation is needed if:
# - A boundary condition is provided and interpolator is not BSpline : in this case,
# all points are considered valid by default
if array_in_mask is not None:
mask_padded = source_extent_pad(
array_src=array_in_mask,
pad=pad,
boundary_condition=boundary_condition,
fill=mask_padded_fill,
)
elif boundary_condition is None or is_bspline(interp):
mask_profile = ArrayProfile(
shape=(array_in_shape[1], array_in_shape[2]),
ndim=2,
dtype=np.uint8,
)
(mask_padded_shape, mask_source_window) = get_array_padded_shape(mask_profile, pad)
if boundary_condition is None:
# Allocate buffer for padded mask and fill it with INVALID value
mask_padded = np.full(
mask_padded_shape, Validity.INVALID, dtype=np.uint8, order="C"
)
# Mark original region as VALID
mask_padded[mask_source_window] = Validity.VALID
else:
# Interpolator is BSpline
# Here we allocate a full valid mask.
# TODO - Computing Improvement : we can still have mask as
# None and create the appropriate mask ater bspline
# prefiltering
mask_padded = np.full(
mask_padded_shape, Validity.VALID, dtype=np.uint8, order="C"
)
# substitute array_in with array_padded
array_in = array_padded
# Update array padded shape to match the effective shape
array_padded_shape = array_padded.shape
if array_padded.ndim == 2:
array_padded_shape = tuple(np.insert(array_padded_shape, 0, array_in_shape[0]))
array_in_shape = array_padded_shape
# subsitute array_in_mask with mask_padded
array_in_mask = mask_padded
# We've applied padding to `array_in` and optional associated mask,
# so we must account for the implied shift in coordinates.
# Note: Standalone mode is incompatible with `array_in_origin` as
# input, but we can use it here for the coming core Rust function
# call.
array_in_origin = (pad[0][0], pad[1][0])
# Note : If we want to apply low-pass filtering for antialiasing this is
# the right place. But first we would have to integrate the required
# margin for antialiasing into the total margins requirement.
# Manage interpolator preprocessings
if is_bspline(interp):
# Prefiltering is performed in-place on the available data.
array_bspline_prefiltering(
array_in=array_in, # thats the previously read buffer
array_in_mask=array_in_mask,
interp=interp, # The interpolator
)
array_in = array_in.reshape(-1)
grid_row = grid_row.reshape(-1)
grid_col = grid_col.reshape(-1)
py_grid_win = None
if win is not None:
py_grid_win = PyArrayWindow2(
start_row=win[0][0], end_row=win[0][1], start_col=win[1][0], end_col=win[1][1]
)
# Allocate array_out if not given
if array_out is None:
if array_out_win is not None:
# Ignore it
array_out_win = None
array_out_shape = None
if win is not None:
# Take the output shape from the window defined at full resolution
array_out_shape = (win[0, 1] - win[0, 0] + 1, win[1, 1] - win[1, 0] + 1)
else:
# Take the output shape from the grid at full resolution
array_out_shape = (
(grid_shape[0] - 1) * grid_resolution[0] + 1,
(grid_shape[1] - 1) * grid_resolution[1] + 1,
)
# Init the array
array_out_shape = (array_in_shape[0],) + array_out_shape
array_out = np.empty(array_out_shape, dtype=np.float64, order="C")
ret = array_out
assert array_out.flags.c_contiguous is True
array_out_shape = array_out.shape
if len(array_out_shape) == 2:
array_out_shape = (1,) + array_out_shape
# check same number of variables in array (first dim)
assert array_out_shape[0] == array_in_shape[0]
array_out = array_out.reshape(-1)
py_array_out_win = None
if array_out_win is not None:
py_array_out_win = PyArrayWindow2(
start_row=array_out_win[0][0],
end_row=array_out_win[0][1],
start_col=array_out_win[1][0],
end_col=array_out_win[1][1],
)
# Manage optional input mask
# array_in_mask_dtype = np.dtype("uint8")
if array_in_mask is not None:
# reshape
array_in_mask = array_in_mask.reshape(-1)
# Manage optional output mask
if array_out_mask is not None:
try:
assert array_out_mask.dtype == np.dtype("uint8")
assert array_out_mask.shape[0] == array_out_shape[1]
assert array_out_mask.shape[1] == array_out_shape[2]
array_out_mask = array_out_mask.reshape(-1)
except AttributeError:
# Not None and not a numpy array due to exception
# Test if True
if array_out_mask is True:
array_out_mask = np.zeros(array_out_shape[1:], dtype=np.uint8, order="C").reshape(
-1
)
ret_mask = array_out_mask
else:
array_out_mask = None
func_types = (array_in.dtype, array_out.dtype, grid_row.dtype)
nodata_out = array_out.dtype.type(nodata_out)
# Manage grid_mask
if grid_mask is not None:
# grid mask must be c-contiguous
assert grid_mask.flags.c_contiguous is True
# grid mask must be encoded as unsigned 8 bits integer
assert grid_mask.dtype == np.dtype("uint8")
# grid mask shape must be the same has the grids
assert np.all(grid_mask.shape == grid_shape)
# Lets flat the grid mask view
grid_mask = grid_mask.reshape(-1)
try:
func = PY_ARRAY_GRID_RESAMPLING_FUNC[func_types]
except KeyError as err:
raise Exception(
f"py_array_grid_resampling_ function not available for types {func_types}"
) from err
else:
func(
interp=interp,
array_in=array_in,
array_in_shape=array_in_shape,
grid_row=grid_row,
grid_col=grid_col,
grid_shape=grid_shape,
grid_resolution=grid_resolution,
array_out=array_out,
array_out_shape=array_out_shape,
nodata_out=nodata_out,
array_in_origin=array_in_origin,
array_in_mask=array_in_mask,
grid_mask=grid_mask,
grid_mask_valid_value=grid_mask_valid_value,
grid_nodata=grid_nodata,
array_out_mask=array_out_mask,
grid_win=py_grid_win,
out_win=py_array_out_win,
check_boundaries=check_boundaries,
)
if ret is not None:
ret = ret.reshape(array_out_shape).squeeze()
if ret_mask is not None:
ret_mask = ret_mask.reshape(array_out_shape[1:])
return ret, ret_mask