"""
Sequence stats + related.
"""
from abc import ABC
import numpy as np
from scipy.ndimage import gaussian_filter1d
[docs]def filter_nan_gaussian(arr, sigma, noise=None):
"""
Gaussian convolution. (Allows intensity to leak into the NaN area.)
If `noise` magnitude given, adds uniform noise.
Implementation from stackoverflow answer:
https://stackoverflow.com/a/36307291/7128154
"""
gauss = arr.copy()
gauss[np.isnan(gauss)] = 0
gauss = gaussian_filter1d(gauss, sigma=sigma, mode='constant', cval=0)
norm = np.ones(shape=arr.shape)
norm[np.isnan(arr)] = 0
norm = gaussian_filter1d(norm, sigma=sigma, mode='constant', cval=0)
# avoid RuntimeWarning: invalid value encountered in true_divide
norm = np.where(norm==0, 1, norm)
gauss = gauss / norm
# Add uniform noise, restricted positive
if noise is not None:
gauss += np.random.uniform(-noise, noise, size=gauss.size)
gauss[gauss < 0.] = 0
# Restore NaN
gauss[np.isnan(arr)] = np.nan
return gauss
[docs]class SequenceStatsMixin(ABC):
"""
Defines the plot/viz interface for `BedSequence`.
"""
@property
def interfaces(self):
"""
Get all pairs of adjacent Beds, ignoring any pairs with either Bed 'missing'
"""
non_missing = lambda t: 'missing' not in [t[0].lithology, t[1].lithology]
return filter(non_missing, zip(self[:-1], self[1:]))
@property
def net_to_gross(self):
"""
Returns (total thickness of 'sand' & 'gravel' Beds) / (total thickness of all Beds)
"""
is_sand = lambda bed: bed.lithology == 'sand'
sand_th = sum([bed.thickness for bed in filter(is_sand, self)])
is_gravel = lambda bed: bed.lithology == 'gravel'
gravel_th = sum([bed.thickness for bed in filter(is_gravel, self)])
not_missing = lambda bed: bed.lithology != 'missing'
total_th = sum([bed.thickness for bed in filter(not_missing, self)])
return (sand_th + gravel_th) / total_th
@property
def amalgamation_ratio(self):
"""
1. dont count mud on mud contacts
2. find sand on sand contacts
3. divide sand-on-sand contacts by total number of contacts
NOTE: 'gravel' counts as a 'sand'
"""
sand_like = ('sand', 'gravel')
total_contacts, sand_contacts = 0, 0
for upper, lower in self.interfaces:
if all(litho in sand_like for litho in (upper.lithology, lower.lithology)):
sand_contacts += 1
elif upper.lithology == lower.lithology == 'mud':
continue
elif upper.lithology == 'missing' or lower.lithology == 'missing':
continue
total_contacts += 1
try:
return sand_contacts / total_contacts
except ZeroDivisionError:
return -1.
@staticmethod
def _hurst_K(x, take_log=True, safe=True):
"""
Computes Hurst K ``log(R(n)/S(n)) / log(x.size / 2)`` for 1D array ``x``
Used below in public ``hurst_K`` and ``hurst_D`` methods.
"""
x = np.array(x).squeeze()
assert x.ndim == 1, 'Can only compute _rescaled_range on 1D series `x`'
if safe and x.size < 20:
raise UserWarning(f'Cannot use field of size {x.size} with ``safe=True``')
if take_log:
x = np.log10(x)
y = x - x.mean()
z = np.cumsum(y)
Rn = z.max() - z.min()
Sn = np.std(y)
return np.log10(Rn / Sn) / np.log10(x.size / 2.)
[docs] def hurst_K(self, field, lithology, safe=True):
"""
Hurst K value for data from a sequence ``field``.
If ``safe``, will only accept fields with at least 20 values.
"""
values = self.get_field(field, lithology)
return self._hurst_K(values, safe=safe)
[docs] def hurst_D(self, field, lithology, take_log=True, safe=True, nsamples=1000, return_K=True):
"""
Returns (D, p, K) if ``return_K``, else (D, p)
where:
D : Bootstrapped Hurst value from ``nsamples`` resamples
p : p-value of ``D``
K : Hurst K value with original values
"""
values = self.get_field(field, lithology)
K = self._hurst_K(values, take_log=take_log, safe=safe)
ks = np.zeros(nsamples)
for i in range(nsamples):
_sample = np.random.choice(values, size=values.size, replace=True)
ks[i] = self._hurst_K(_sample, take_log=take_log, safe=safe)
D = (K - ks.mean()) / ks.std()
p = np.sum(ks >= K) / nsamples
return (D, p, K) if return_K else (D, p)
[docs] def pseudo_gamma_simple(self,
gs_field='grain_size_mm',
depth_field='depth_m',
resolution=0.2,
gs_cutoff=0.0625,
gamma_range=(30, 180),
sigma=0.1,
noise=10.):
"""
Compute a 'pseudo' gamma ray log by thresholding `gs_field` + Gaussian convolution.
Parameters
----------
gs_field: str
Which field to use for grainsize
depth_field: str
Which field to use for depth
resolution: float
Scale at which to resample (in `depth_field` units)
gs_cutoff: float
Cutoff for `gs_field` thresholding. Values above/below get mapped to `gamma_range`.
gamma_range: tuple or list
(low, high) sample values for `gs_field` values (above, below) `gs_cuttoff`.
sigma: float
Width of Gaussian, in depth units.
noise: float or None
Magnitude of uniform noise to add, or None to add no noise.
"""
ds, gs = self.get_field(depth_field), self.get_field(gs_field)
# Make sure depth field is monotonic
step_pos = np.diff(ds) > 0
assert np.unique(step_pos).size == 1, '{depth_field} data is non-monotonic'
# Must be increasing to use `np.interp`
if not step_pos.all():
ds, gs = ds[::-1], gs[::-1]
# Resample `gs_field` at `resolution`
nsamples = (np.abs(self.start.z-self.stop.z) // resolution) + 1
sampled_ds = np.linspace(self.start.z, self.stop.z, num=int(nsamples), endpoint=True)
sampled_gs = np.interp(sampled_ds, ds, gs)
# Threshold `sampled_gs` to `gamma_range`
nan_idxs = np.argwhere(np.isnan(sampled_gs))
sampled_gs[nan_idxs] = gamma_range[1]
pseudo_gr = np.where(sampled_gs < gs_cutoff, gamma_range[1], gamma_range[0])
# Convolution
pseudo_gr = gaussian_filter1d(pseudo_gr, sigma / resolution).astype('float64')
if noise is not None:
pseudo_gr += np.random.uniform(-noise, noise, size=pseudo_gr.size)
pseudo_gr[pseudo_gr < 0.] = 0
pseudo_gr[nan_idxs] = np.nan
return sampled_ds, pseudo_gr