Source code for pydeltasnow.utils

"""
This module contains utility functions for missing value handling and daterange
continuity validation.
"""

import pandas as pd
import numpy as np
from numba import njit

from pydeltasnow import __version__

__author__ = "Johannes Aschauer"
__copyright__ = "Johannes Aschauer"
__license__ = "GPL-2.0-or-later"

ONE_HOUR = np.timedelta64(1, 'h')

[docs]@njit def continuous_timedeltas(dr): """ Check for continuity on dates Parameters ---------- dr : 1D :class:`numpy.ndarray` with :class:`numpy.datetime64` dtype Input date range. Returns ------- continuous : bool Whether the datetime array is evenly spaced. resolution : float The time resolution in hours. """ if len(dr) <= 1: continuous = True else: # np.gradient not working in numba tdeltas = np.zeros(len(dr)-1, dtype='timedelta64[ns]') for i in range(len(dr)-1): tdeltas[i] = dr[i+1]-dr[i] # check if all deltas are equal continuous = np.all(tdeltas == tdeltas[0]) # get time resolution in hours. # one hour timedelta has to be moved outside numba, see this issue for # reference: https://github.com/numba/numba/issues/1750 resolution = tdeltas[0] / ONE_HOUR return continuous, resolution
[docs]@njit def continuous_timedeltas_in_nonzero_chunks( dr, start_idxs, stop_idxs ): """ Check that every non-zero HS chunk has continuous dates and same resolution. Parameters ---------- dr : 1D :class:`numpy.ndarray` with :class:`numpy.datetime64` dtype Input date range. start_idxs: 1D :class:`numpy.ndarray` Indices where a nonzero chunk begins. stop_idxs: 1D :class:`numpy.ndarray` Indices where a nonzero chunk ends. Returns ------- continuous : bool Whether the chunks are evenly spaced with equal resolution. resolution : float The time resolution in hours. """ cont = np.zeros(len(start_idxs), dtype='bool') # is chunk continuous res = np.zeros(len(start_idxs)) # chunk time resolutions for i, (start, stop) in enumerate(zip(start_idxs, stop_idxs)): cont[i], res[i] = continuous_timedeltas(dr[start:stop]) continuous = np.all(np.array([np.all(cont), np.all(res==res[0])])) resolution = res[0] return continuous, resolution
[docs]@njit def get_nonzero_chunk_idxs(Hobs): """ Return start and stop indices of consecutive nonzero chunks in Hobs. A nonzero chunk is a slice in Hobs of values not zero. Nans are treated as nonzeros. The slices which are retured from this function include one leading and one trailing zero since the main deltasnow loop wants an input HS array to start and end with zero. Exceptions are when Hobs does not start and/or end with zero (then the first start and/or last stop index is not pointing to a zero). Note that when you slice `Hobs` with `Hobs[start:stop]` the stop value is non-inclusive (https://stackoverflow.com/a/509295). Parameters ---------- Hobs : 1D :class:`numpy.ndarray` of floats input HS data Returns ------- start_idxs: 1D :class:`numpy.ndarray` Indices where a nonzero chunk begins. stop_idxs: 1D :class:`numpy.ndarray` Indices where a nonzero chunk ends. """ start_idxs = [] stop_idxs = [] # if first value of Hobs is not zero, set 0 as first start_idx if Hobs[0] != 0: start_idxs.append(0) for i in range(len(Hobs)): if i < len(Hobs)-1: if Hobs[i] == 0. and Hobs[i+1]!=0: start_idxs.append(i) if i > 0: if Hobs[i] == 0. and Hobs[i-1]!=0: stop_idxs.append(i) # if last value not zero, set last idx of Hobs as last stop_idx if len(stop_idxs) < len(start_idxs): stop_idxs.append(len(Hobs)) return np.array(start_idxs), np.array(stop_idxs)
[docs]@njit def get_zeropadded_gap_idxs( Hobs, require_leading_zero, ): """ Get indices of Nan data-gaps in Hobs that are surrounded or followed by zeros. Parameters ---------- Hobs : 1D :class:`numpy.ndarray` of floats input HS data require_leading_zero : bool Whether to include gaps that do not have a leading zero but have a trailing zero. Returns ------- zeropadded_gap_idxs : 1D :class:`numpy.ndarray` of bools """ zeropadded_gap_idxs = np.zeros(len(Hobs), dtype='bool') start_idxs = [] stop_idxs = [] gap = False start = len(Hobs) for i in range(len(Hobs)): if i == 0 and np.isnan(Hobs[i]): start = i gap = True if i > 0: if require_leading_zero: if np.isnan(Hobs[i]) and Hobs[i-1] == 0: start = i gap = True else: if np.isnan(Hobs[i]) and not np.isnan(Hobs[i-1]): start = i gap = True if i < len(Hobs)-1: if np.isnan(Hobs[i]) and Hobs[i+1] == 0: if gap: start_idxs.append(start) stop_idxs.append(i+1) if not np.isnan(Hobs[i]): gap=False # if last value also nan, set last idx of Hobs as last stop_idx if gap: if len(start_idxs)>0 and start != start_idxs[-1]: start_idxs.append(start) stop_idxs.append(len(Hobs)) elif start < len(Hobs): start_idxs.append(start) stop_idxs.append(len(Hobs)) if len(start_idxs)>0: for start_i, stop_i in zip(start_idxs, stop_idxs): zeropadded_gap_idxs[start_i:stop_i] = True return zeropadded_gap_idxs
[docs]@njit def get_small_gap_idxs( Hobs, dates, max_gap_length, ): """ Create a boolean mask for valid small gaps. Gap needs to be surrounded by values Dates need to be continuous between day before gap and day after gap Parameters ---------- Hobs : 1D :class:`numpy.ndarray` of floats input HS data dates : 1D :class:`numpy.ndarray` of :class:`numpy.datetime64` dtype timestamps of the snow depth observations. max_gap_length : int Only gaps shorter or equal max_gap_length are valid. Returns ------- small_gap_idxs : 1D :class:`numpy.ndarray` of bools """ small_gap_idxs = np.zeros(len(Hobs), dtype='bool') start_idxs = [] stop_idxs = [] gapl = 0 # counter of active gap length active_gap = False valid_gap = False start = len(Hobs) for i in range(len(Hobs)): if active_gap: gapl = gapl+1 if gapl > max_gap_length: valid_gap = False if i > 0: if not np.isnan(Hobs[i-1]) and np.isnan(Hobs[i]): start = i valid_gap=True active_gap = True if i < len(Hobs): if np.isnan(Hobs[i-1]) and not np.isnan(Hobs[i]): if valid_gap and continuous_timedeltas(dates[start-1:i+1])[0]: start_idxs.append(start) stop_idxs.append(i) gapl = 0 active_gap = False valid_gap=False if len(start_idxs)>0: for start_i, stop_i in zip(start_idxs, stop_idxs): small_gap_idxs[start_i:stop_i] = True return small_gap_idxs
[docs]def fill_small_gaps( Hobs, dates, max_gap_length, method='linear' ): """ Interpolate small gaps in Hobs. No extrapolation: data points before and after gap needed. Date continuity in the filled gaps + leading and trailing data point is ensured. Parameters ---------- Hobs : 1D :class:`numpy.ndarray` Snow depth data. dates : 1D :class:`numpy.ndarray` of :class:`numpy.datetime64` dtype timestamps of the snow depth observations. max_gap_length : int Only gaps shorter or equal max_gap_length are interpolated. method : str, optional Interpolation method which will be passed to :func:`pandas.Series.interpolate`. The default is 'linear'. Returns ------- Hobs_intepolated : 1D :class:`numpy.ndarray` Snow depth data with filled gaps. """ valid_gap_mask = get_small_gap_idxs(Hobs, dates, max_gap_length) interpolated = pd.Series(Hobs).interpolate(method=method).to_numpy() Hobs_interpolated = np.where(valid_gap_mask, interpolated, Hobs) return Hobs_interpolated