Source code for hypergas.retrieve

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2023-2024 HyperGas developers
#
# This file is part of hypergas.
#
# hypergas is a library to retrieve trace gases from hyperspectral satellite data
"""Retrieve trace gas enhancements using hyperspectral satellite data."""

import logging
import numpy as np
import spectral.algorithms as algo
import xarray as xr
from spectral.algorithms.detectors import matched_filter

from .landmask import Land_mask
from .cluster import PCA_kmeans
from .unit_spectrum import Unit_spec

LOG = logging.getLogger(__name__)


[docs] class MatchedFilter(): """The MatchedFilter Class.""" def __init__(self, scn, wvl_intervals, species='ch4', mode='column', rad_dist='normal', rad_source='model', land_mask=True, land_mask_source='OSM', cluster=False, plume_mask=None, scaling=None, ): """Initialize MatchedFilter. Parameters ---------- scn : Satpy Scene including at least one variable named "radiance" (:class:`~xarray.Dataset` ["bands", "y", "x"], units: mW m^-2 sr^-1 nm^-1), and at least two coordinates: [1] "wavelength" (nm) and [2] "fwhm" (nm). wvl_intervals : list The wavelength range (nm) used in matched filter. It can be one list or nested list. e.g. ``[2110, 2450]`` or ``[[1600, 1750], [2110, 2450]]``. species : str The species to be retrieved. Only species defined in the ``config.yaml`` file. Default: "ch4". mode : str The mode ("column" or "scene") to apply matched filter. Default: "column". Be careful of noise if you apply the matched filter for the whole scene. rad_dist : str The assumed rads distribution ("normal" or "lognormal"). Default: "normal". rad_source : str The data ('model' or 'lut') used for calculating rads or transmissions. Default: "model". land_mask : bool Whether apply the matched filter to continental and oceanic pixels seperately. Default: True. land_mask_source : str The data source of land mask ("OSM", "GSHHS", or "Natural Earth"). Default: "OSM". cluster : bool Whether apply the pixel classification. Default: False. plume_mask : :class:`numpy.ndarray` 2D array, 0: neglected pixels, 1: plume pixels. Default: None. scaling : float The scaling factor for alpha to ensure numerical stability. """ # set the wavelength range for matched filter self.wvl_min = wvl_intervals[0] self.wvl_max = wvl_intervals[1] # subset data to selected wavelength range for matched filter radiance = scn['radiance'] wvl_mask = (radiance['bands'] >= self.wvl_min) & (radiance['bands'] <= self.wvl_max) radiance = radiance.where(wvl_mask, drop=True) # add to the class self.radiance = radiance self.mode = mode self.rad_dist = rad_dist self.rad_source = rad_source self.species = species # calculate unit spectrum loaded_names = [x['name'] for x in scn.keys()] if 'central_wavelengths' in loaded_names: # calculate K by column because we have central wavelength per column central_wavelengths = scn['central_wavelengths'].where(wvl_mask, drop=True) K, self.scaling = Unit_spec(self.radiance, central_wavelengths, self.wvl_min, self.wvl_max, self.species, self.rad_source).fit_slope(scaling=scaling) self.K = xr.DataArray(K, dims=['bands', 'x'], coords={'bands': self.radiance.coords['bands']}) else: # calculate K using the default dim named 'bands' K, self.scaling = Unit_spec(self.radiance, self.radiance.coords['bands'], self.wvl_min, self.wvl_max, self.species, self.rad_source).fit_slope(scaling=scaling) self.K = xr.DataArray(K, dims='bands', coords={'bands': self.radiance.coords['bands']}) # calculate the land/ocean segmentation self.land_mask = land_mask if cluster: segmentation = PCA_kmeans(self.radiance) elif land_mask: # get lon and lat from area attrs lons, lats = self.radiance.attrs['area'].get_lonlats() # create land mask from 10-m Natural Earth data segmentation = Land_mask(lons, lats, land_mask_source) else: # set all pixels as the same type segmentation = xr.DataArray(np.ones((self.radiance.sizes['y'], self.radiance.sizes['x'])), dims=['y', 'x'], ) segmentation.attrs['description'] = '' # save to class self.segmentation = segmentation # save plume mask if plume_mask is None: # set all pixels as non-plume self.plume_mask = xr.DataArray(np.ones((self.radiance.sizes['y'], self.radiance.sizes['x'])), dims=['y', 'x'], ) else: self.plume_mask = plume_mask # def _norm(self): # from sklearn.preprocessing import MinMaxScaler # data = data.reshape((-1, data.shape[-1])) # self.radiance = self.radiance.stack(z=('y', 'x')) # scaler = MinMaxScaler() # scaler.fit(self.radiance) # return scaler.transform(data)
[docs] def col_matched_filter(self, radiance, segmentation, plume_mask, K): """Apply the matched filter by column. Parameters ---------- radiance : :class:`~xarray.DataArray` The radiance DataArray for one column. segmentation : :class:`~xarray.DataArray`, same shape as ``radiance`` The segmentation of pixels (e.g., land and water mask). plume_mask : :class:`~xarray.DataArray`, same shape as ``radiance`` Since the matched filter assumes plume signals are sparse (i.e., present in only a small fraction of pixels), it is better to exclude pixels within identified plume masks, so that background statistics are estimated only from non-plume pixels and the sparsity assumption remains valid. K : :class:`numpy.ndarray` The Jacobian K (i.e, the change of the radiance (or its logarithm) for a +1 ppm methane concentration increase). Returns ------- The gas enhancement : :class:`~xarray.DataArray` Unit: ppm. """ if self.mode == 'column': # create empty alpha with shape: [nrows('y'), 1] alpha = np.full((radiance.shape[0], 1), fill_value=np.nan, dtype=float) # iterate unique label to apply the matched filter for label in np.unique(segmentation): # create nan*label mask segmentation_mask = segmentation == label mask = ~np.isnan(radiance).any(axis=-1) mask = mask * segmentation_mask # we need to create new mask with plume instead of overwrite # because we want to keep retrieval results over plume pixels mask_exclude_plume = mask * plume_mask.astype(bool) # calculate the background stats if there're many valid values if mask_exclude_plume.sum() > 1: if self.rad_dist == 'lognormal': # calculate lognormal rads lograds = np.log(radiance, out=np.zeros_like(radiance), where=radiance > 0) background = algo.calc_stats(lograds, mask=mask_exclude_plume, index=None, allow_nan=True) # apply the matched filter a = matched_filter(lograds, K, background) elif self.rad_dist == 'normal': # linearized MF if sum(mask_exclude_plume) > 1: background = algo.calc_stats(radiance, mask=mask_exclude_plume, index=None, allow_nan=True) else: # if all pixels are masked, we use the default mask background = algo.calc_stats(radiance, mask=mask, index=None, allow_nan=True) # get mean value mu = background.mean # calculate the target spectrum target = K * mu # apply the matched filter a = matched_filter(radiance, target, background) else: raise ValueError(f"{self.rad_dist} is not supported. Please use 'normal' or 'lognormal' as rad_dist.") # concat data alpha[:, 0][mask] = a[:, 0][mask] else: # assign 0 value if only one pixel is available # because denoising data with one nan pixel will cause a large nan area alpha[:, 0][mask] = 0 elif self.mode == 'scene': # this function is experimental LOG.warning('The scene MF is experimental!!!') background = self.background # get mean value mu = background.mean # calculate the target spectrum target = K * mu # apply matched filter alpha = matched_filter(radiance, target, background) else: raise ValueError(f'Wrong mode: {self.mode}. It should be "column" or "scene".') if self.rad_source == 'model': # ppm return alpha elif self.rad_source == 'lut': # ppm m -> ppm assuming a scale height of about 8km return alpha * 1.25e-4
[docs] def smf(self): """Standard/Robust matched filter. Compute mean and covariance of set of each column and then run standard matched filter. Returns ------- The gas enhancement : :class:`~xarray.DataArray` Unit: ppm. """ if self.mode == 'scene': # calculate the background of whole scene radiance_scene = self.radiance.transpose(..., 'bands').values mask_scene = ~np.isnan(radiance_scene).any(axis=-1) if self.land_mask: # only calculate background over land mask = mask_scene * self.segmentation.data else: mask = mask_scene self.background = algo.calc_stats(radiance_scene, mask=mask, index=None) LOG.info('Applying matched filter ...') alpha = xr.apply_ufunc(self.col_matched_filter, self.radiance.transpose(..., 'bands'), self.segmentation, self.plume_mask, self.K, exclude_dims=set(('y', 'bands')), input_core_dims=[['y', 'bands'], ['y'], ['y'], ['bands']], output_core_dims=[['y', 'bands']], vectorize=True, dask='parallelized', output_dtypes=[self.radiance.dtype], dask_gufunc_kwargs=dict(output_sizes={'y': self.radiance.sizes['y'], 'bands': 1, }, allow_rechunk=True, ) ) # set the dims order to the same as radiance alpha = alpha.transpose(*self.radiance.dims) # fill nan values by interpolation # this usually happens for pixels with only one segmentation labels in a specific column. # if data is not available for the whole row, then the row values should still be nan. alpha = alpha.interpolate_na(dim='y', method='linear') return alpha*self.scaling
# def ctmf(self, segmentation=None): # """Cluster-tuned matched filter # This method clusters similar pixels to improve the mean and cov calculation. # Kwargs: # seg_method (str): The mothod of segmentation which is useful for clustering pixels to calculate mean and cov. # Note this only works with the `CTMF` method. # Default: 'kmeans' # bkg_extent (str): The extent for calculating background. # Note this only works with the `CTMF` method. # Default: 'column' # """