"""
Core of the DeltaSNOW model. This module provides the public function
:func:`deltasnow_snowpack_evolution` which implements the algorithm of the
deltaSNOW model with numba but does not validate input data. Users are
discouraged to use this function and should rather use the
:func:`pydeltasnow.main.swe_deltasnow` function instead.
Significant parts of the code in this module are based on the work of Manuel
Theurl: https://github.com/manueltheurl/snow_to_swe
"""
import numpy as np
from numba import njit
from pydeltasnow import __version__
__author__ = "Johannes Aschauer"
__copyright__ = "Johannes Aschauer"
__license__ = "GPL-2.0-or-later"
G = 9.81 # gravitational acceleration on earth
PRECISION = 10e-10 # floating point precision
@njit
def _assign_H(
h_dd,
swe_dd,
age_dd,
h,
swe,
age,
H,
SWE,
t,
day_tot,
):
"""
Assign snowpack properties of the next timestep.
"""
if t < day_tot:
h[:, t + 1] = h_dd
swe[:, t + 1] = swe_dd
age[:, t + 1] = age_dd
H[t + 1] = np.sum(h[:, t + 1])
SWE[t + 1] = np.sum(swe[:, t + 1])
return h, swe, age, H, SWE
@njit
def _compact_H(
h,
swe,
swe_hat,
age,
ts,
eta_null,
k,
rho_max,
):
"""
Compaction of individual snow layers without additional load
Values of today are compacted to values of tomorrow.
"""
age_d = 0 if h == 0 else age
h_dd = h / (1 + (swe_hat * G * ts) / eta_null * np.exp(-k * swe / h))
h_dd = swe / rho_max if swe / h_dd > rho_max else h_dd
h_dd = 0 if h == 0 else h_dd
swe_dd = swe
age_dd = 0 if h == 0 else age_d + 1
rho_dd = 0 if h == 0 else swe_dd / h_dd
rho_dd = rho_max if rho_max - rho_dd < PRECISION else rho_dd
return h_dd, swe_dd, age_dd, rho_dd
@njit
def _drench_H(
t,
ly,
ly_tot,
day_tot,
Hobs,
h,
swe,
age,
H,
SWE,
ts,
eta_null,
k,
rho_max
):
"""
Drenching of the snowpack if the observed snowdepth decreased significantly.
"""
Hobs_d = Hobs[t]
h_d = h[:, t]
swe_d = swe[:, t]
age_d = age[:, t]
# distribute mass top-down
# reversed not working in numba
for i in range(ly - 1, -1, -1):
# for i in reversed(range(ly)):
if np.sum(np.array([element for j, element in enumerate(h_d) if j != i])) + swe_d[
i] / rho_max - Hobs_d >= PRECISION:
# layers is densified to rho_max
h_d[i] = swe_d[i] / rho_max
else:
# layer is densified as far as possible
# but doesnt reach rho_max
h_d[i] = swe_d[i] / rho_max + np.abs(
np.sum(np.array([element for j, element in enumerate(h_d) if j != i])) + swe_d[i] / rho_max - Hobs_d)
break
true_false_list = np.ones(ly, dtype="bool")
for i, (swe_d_val, h_d_val) in enumerate(zip(swe_d[:ly], h_d[:ly])):
true_false_list[i] = rho_max - swe_d_val / h_d_val <= PRECISION
if np.all(true_false_list):
# no further compaction, runoff
scale = Hobs_d / np.sum(h_d)
h_d = h_d * scale # all layers are compressed (and have rho_max) [m]
swe_d = swe_d * scale
h[:, t] = h_d
swe[:, t] = swe_d
age[:, t] = age_d
H[t] = np.sum(h[:, t])
SWE[t] = np.sum(swe[:, t])
# no further compaction possible
h_dd, swe_dd, age_dd, rho_dd = _dry_metamorphism(
h[:, t],
swe[:, t],
age[:, t],
ly_tot,
ly,
ts,
eta_null,
k,
rho_max)
# set values for next day
h, swe, age, H, SWE = _assign_H(h_dd, swe_dd, age_dd, h, swe, age, H, SWE, t, day_tot)
return h, swe, age, H, SWE, rho_dd
@njit
def _dry_metamorphism(
h_d,
swe_d,
age_d,
ly_tot,
ly,
ts,
eta_null,
k,
rho_max
):
"""
Compaction of dry snowpack.
"""
# overburden of current day (swe_hat_d)
swe_hat_d = np.zeros(ly_tot)
for i in range(ly_tot):
swe_hat_d[i] = np.sum(swe_d[i:ly_tot])
h_dd = h_d.copy()
swe_dd = swe_d.copy()
age_dd = age_d.copy()
rho_dd = np.zeros(ly_tot)
for i, (h, swe, swe_hat, age) in enumerate(zip(h_d[:ly], swe_d[:ly], swe_hat_d[:ly], age_d[:ly])):
h_dd[i], swe_dd[i], age_dd[i], rho_dd[i] = _compact_H(
h,
swe,
swe_hat,
age,
ts,
eta_null,
k,
rho_max
)
return h_dd, swe_dd, age_dd, rho_dd
@njit
def _scale_H(
t,
ly,
ly_tot,
day_tot,
deltaH,
Hobs,
h,
swe,
age,
H,
SWE,
ts,
eta_null,
k,
rho_max,
):
"""
Scale snowpack if the settling was assumed a bit too strong or too weak.
"""
Hobs_d = Hobs[t - 1]
Hobs_dd = Hobs[t]
h_d = h[:, t - 1]
swe_d = swe[:, t - 1]
age_d = age[:, t]
# todays overburden
swe_hat_d = np.zeros(ly_tot)
for i in range(ly_tot):
swe_hat_d[i] = np.sum(swe_d[i:ly_tot])
# analytical solution for layerwise adapted viskosity eta
# assumption: recompaction ~ linear height change of yesterdays layers (see paper)
eta_cor = np.zeros(ly_tot)
for i in range(ly_tot):
if swe_d[i] == 0. or h_d[i] == 0:
eta_cor[i] = 0.
else:
rho_d = swe_d[i] / h_d[i]
x = ts * G * swe_hat_d[i] * np.exp(-k * rho_d) # yesterday
P = h_d[i] / Hobs_d # yesterday
eta_cor[i] = Hobs_dd * x * P / (h_d[i] - Hobs_dd * P) if (h_d[i] - Hobs_dd * P) != 0 else np.inf
h_dd_cor = np.zeros(ly_tot)
for i in range(ly_tot):
if h_d[i] == 0 or eta_cor[i] == 0:
h_dd_cor[i] = 0.
else:
h_dd_cor[i] = h_d[i] / (1 + (swe_hat_d[i] * G * ts) / eta_cor[i] * np.exp(-k * swe_d[i] / h_d[i]))
h_dd_cor[np.isnan(h_dd_cor)] = 0
H_dd_cor = np.sum(h_dd_cor)
# and check, if Hd.cor is the same as Hobs.d
if np.abs(H_dd_cor - Hobs_dd) > PRECISION:
print("WARNING: error in exponential re-compaction: H.dd.cor-Hobs.dd")
# which layers exceed rho.max?
idx_max_arr = np.zeros(ly_tot, dtype='bool')
for i, (swe_e_val, h_dd_cor_val) in enumerate(zip(swe_d, h_dd_cor)):
try:
idx_max_arr[i] = np.divide(swe_e_val, h_dd_cor_val) - rho_max > PRECISION
except:
idx_max_arr[i] = False
idx_max_arr[np.isnan(idx_max_arr)] = False
if np.any(idx_max_arr):
if np.count_nonzero(idx_max_arr) < ly:
# collect excess swe in those layers
swe_excess = swe_d[idx_max_arr] - h_dd_cor[idx_max_arr] * rho_max
# set affected layer(s) to rho.max
swe_d[idx_max_arr] = swe_d[idx_max_arr] - swe_excess
# distribute excess swe to other layers top-down
lys = range(ly)
lys_non_max = []
for e in lys:
if e not in np.nonzero(idx_max_arr)[0]:
lys_non_max.append(e)
i = lys_non_max[-1]
swe_excess_all = np.sum(swe_excess)
while swe_excess_all > 0:
# layer tolerates this swe amount to reach rho.max
swe_res = h_dd_cor[i] * rho_max - swe_d[i]
if swe_res > swe_excess_all:
swe_res = swe_excess_all
swe_d[i] = swe_d[i] + swe_res
swe_excess_all = swe_excess_all - swe_res
i = i - 1
if i < 0 < swe_excess_all:
# runoff
break
else:
# if all layers have density > rho.max
# remove swe.excess from all layers (-> runoff)
# (this sets density to rho.max)
swe_excess = swe_d[idx_max_arr] - h_dd_cor[idx_max_arr] * rho_max
swe_d[idx_max_arr] = swe_d[idx_max_arr] - swe_excess
h[:, t] = h_dd_cor
swe[:, t] = swe_d
age[:, t] = age_d
H[t] = np.sum(h[:, t])
SWE[t] = np.sum(swe[:, t])
# compact actual day
h_dd, swe_dd, age_dd, rho_dd = _dry_metamorphism(
h[:, t],
swe[:, t],
age[:, t],
ly_tot,
ly,
ts,
eta_null,
k,
rho_max,
)
# set values for next day
h, swe, age, H, SWE = _assign_H(
h_dd,
swe_dd,
age_dd,
h,
swe,
age,
H,
SWE,
t,
day_tot,
)
return h, swe, age, H, SWE, rho_dd
[docs]@njit
def deltasnow_snowpack_evolution(
Hobs,
rho_max,
rho_null,
c_ov,
k_ov,
k,
tau,
eta_null,
resolution,
):
"""
This is the main loop in the delta snow model. Should be called on HS chunks
of consecutive nonzeros.
Parameters
----------
Hobs : 1D :class:`numpy.ndarray` of floats
Measured snow height. Needs to be in [m].
Must comply to the following constraints:
- no nans
- continuous entries (no missing dates)
rho_max : float, optional
Maximum density of an individual snow layer produced by the deltasnow
model in [kg/m3], rho_max needs to be positive. The default is 401.2588.
rho_null : float, optional
Fresh snow density for a newly created layer [kg/m3], rho_null needs to
be positive. The default is 81.19417.
c_ov : float, optional
Overburden factor due to fresh snow [-], c_ov needs to be positive. The
default is 0.0005104722.
k_ov : float, optional
Defines the impact of the individual layer density on the compaction due
to overburden [-], k_ov need to be in the range [0,1].
The default is 0.37856737.
k : float, optional
Exponent of the exponential-law compaction [m3/kg], k needs to be
positive. The default is 0.02993175.
tau : float, optional
Uncertainty bound [m], tau needs to be positive.
The default is 0.02362476.
eta_null : float, optional
Effective compactive viscosity of snow for "zero-density" [Pa s].
The default is 8523356.
resolution : float
Timedelta in hours between snow observations.
Raises
------
RuntimeError
If the snowpack evolution has somehow gone wrong.
Returns
-------
SWE : 1D :class:`numpy.ndarray` of floats
Calculated SWE in [mm]. Same shape as Hobs.
"""
ly_tot = np.count_nonzero(Hobs) # maximum number of layers [-]
day_tot = len(Hobs) # total days from first to last snowfall [-]
# preallocate output arrays
H = np.zeros(day_tot) # modeled total height of snow at any day [m]
SWE = np.zeros(day_tot) # modeled total SWE at any day [kg/m2]
# preallocate matrix as days X layers
h = np.zeros((ly_tot, day_tot)) # modeled height of snow in all layers [m]
swe = np.zeros((ly_tot, day_tot)) # modeled swe in all layers [kg/m2]
age = np.zeros((ly_tot, day_tot)) # age in all layers
ly = 1 # layer number [-]
ts = resolution * 3600
for t in range(day_tot):
# snowdepth = 0, no snow cover
if Hobs[t] == 0:
H[t] = 0
SWE[t] = 0
h[:, t] = 0
swe[:, t] = 0
# there is snow
elif Hobs[t] > 0: # redundant if, cause can snow height be negative?
# first snow in/during season
if Hobs[t - 1] == 0:
ly = 1
age[ly - 1, t] = 1
h[ly - 1, t] = Hobs[t]
H[t] = Hobs[t]
swe[ly - 1, t] = rho_null * Hobs[t]
SWE[t] = swe[ly - 1, t]
# compact actual day
h_dd, swe_dd, age_dd, rho_dd = _dry_metamorphism(
h[:, t],
swe[:, t],
age[:, t],
ly_tot,
ly,
ts,
eta_null,
k,
rho_max,
)
h, swe, age, H, SWE = _assign_H(
h_dd,
swe_dd,
age_dd,
h,
swe,
age,
H,
SWE,
t,
day_tot,
)
elif Hobs[t - 1] > 0:
deltaH = Hobs[t] - H[t]
if deltaH > tau:
sigma_null = deltaH * rho_null * G
epsilon = c_ov * sigma_null * np.exp(-k_ov * rho_dd / (rho_max - rho_dd))
h[:, t] = (1 - epsilon) * h[:, t]
swe[:, t] = swe[:, t - 1]
age[:ly, t] = age[:ly, t - 1] + 1
H[t] = np.sum(h[:, t])
SWE[t] = np.sum(swe[:, t])
# only for new layer
ly = ly + 1
h[ly - 1, t] = Hobs[t] - H[t]
swe[ly - 1, t] = rho_null * h[ly - 1, t]
age[ly - 1, t] = 1
# recompute
H[t] = np.sum(h[:, t])
SWE[t] = np.sum(swe[:, t])
# compact actual day
h_dd, swe_dd, age_dd, rho_dd = _dry_metamorphism(
h[:, t],
swe[:, t],
age[:, t],
ly_tot,
ly,
ts,
eta_null,
k,
rho_max,
)
# set values for next day
h, swe, age, H, SWE = _assign_H(
h_dd,
swe_dd,
age_dd,
h,
swe,
age,
H,
SWE,
t,
day_tot,
)
# no mass gain or loss, but scaling
elif -tau <= deltaH <= tau:
h, swe, age, H, SWE, rho_dd = _scale_H(
t,
ly,
ly_tot,
day_tot,
deltaH,
Hobs,
h,
swe,
age,
H,
SWE,
ts,
eta_null,
k,
rho_max,
)
elif deltaH < -tau:
h, swe, age, H, SWE, rho_dd = _drench_H(
t,
ly,
ly_tot,
day_tot,
Hobs,
h,
swe,
age,
H,
SWE,
ts,
eta_null,
k,
rho_max,
)
else:
raise RuntimeError("no valid calculated HS deviation.")
return SWE