#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2023-2024 HyperGas developers
#
# This file is part of hypergas.
#
# hypergas is a library to retrieve trace gases from hyperspectral satellite data
"""Create a priori plume mask for hyperspectral satellite data."""
import logging
import numpy as np
import pandas as pd
import tobac
import xarray as xr
from pyresample.geometry import AreaDefinition
from scipy.stats.mstats import trimmed_std, trimmed_mean
LOG = logging.getLogger(__name__)
[docs]
class Mask():
"""Create a priori plume mask from trace gas enhancement field."""
def __init__(self, scn, varname, n_min_threshold=5, sigma_threshold=1):
'''Initialize Mask.
Args:
scn (:class:`~satpy.Scene`):
Satpy Scene including one variable named ``segmentation``,
which is generated by :class:`hypergas.landmask.Land_mask`.
``segmentation`` is a :class:`~xarray.DataArray`: 0 = ocean, >0 = land.
varname (str):
The variable used to create plume mask (recommend: ``<gas>_comb_denoise``, which is the denoised data).
n_min_threshold (int):
The minimum number of pixels per threshold for detecting features. Default is 5.
sigma_threshold (int):
Gaussian filter sigma for smoothing field.
Default is 1. Because the ``<gas>_comb_denoise`` field is already smoothed, 1 should be high enough.
'''
# read variable, pixel resolution, and segmentation
self.data = scn[varname]
self.dxy = abs(self.data.y.diff('y')[0])
self.n_min_threshold = n_min_threshold
self.sigma_threshold = sigma_threshold
# remove attrs for iris
self.data.attrs['standard_name'] = ''
# add time dim for tobac
time_da = pd.date_range(start=self.data.attrs['start_time'], periods=1, freq='D').to_numpy()
self.data = self.data.squeeze()
self.data = self.data.expand_dims(time=time_da)
# get the segmentation values
self.segmentation = scn['segmentation']
self.segmentation = self.segmentation.squeeze()
self.segmentation_unique = pd.unique(self.segmentation.data.flatten())
self.segmentation_unique = self.segmentation_unique[~np.isnan(self.segmentation_unique)]
[docs]
def get_feature_mask(self):
'''
Calculate max features and apply segmentation by `tobac <https://tobac.readthedocs.io/>`_.
Returns
-------
thresholds : list
Thresholds for creating `feature <https://tobac.readthedocs.io/en/latest/feature_detection_overview.html>`_
and `mask <https://tobac.readthedocs.io/en/latest/segmentation.html>`_.
features : :class:`~pandas.DataFrame`
Detected features.
masks : :class:`~xarray.DataArray`
Segmentations based on features.
'''
feature_list = []
mask_list = []
mask_max = 0
LOG.info('Creating plume masks ...')
for seg in self.segmentation_unique:
LOG.debug(f'Creating features for {seg}...')
# loop segmentation (0: ocean, >=1: land)
seg_mask = self.segmentation == seg
gas_mask = self.data.where(seg_mask)
# calculate trimmed mean and std values
trim_mean = trimmed_mean(gas_mask.stack(z=('y', 'x')).dropna('z'), (1e-3, 1e-3))
trim_std = trimmed_std(gas_mask.stack(z=('y', 'x')).dropna('z'), (1e-3, 1e-3))
thresholds = [trim_mean+2*trim_std, trim_mean+3*trim_std]
# detect features
data_mask = self.data.where(seg_mask)
features = tobac.feature_detection_multithreshold(
data_mask,
self.dxy, thresholds,
position_threshold='extreme',
n_min_threshold=self.n_min_threshold,
sigma_threshold=self.sigma_threshold,
)
LOG.debug(f'Creating segmentation for {seg}...')
if features is None:
continue
else:
masks, features_mask = tobac.segmentation_2D(features, data_mask, self.dxy, threshold=[thresholds[0]])
# increase value based on previous largest label
masks = xr.where(masks == 0, masks, masks + mask_max)
mask_max = masks.max().values
mask_list.append(masks)
feature_list.append(features)
if len(feature_list) > 0:
# concat features and reset values
features = pd.concat(feature_list, ignore_index=True)
features['feature'] = np.arange(1, len(features)+1, 1)
# sum masks together
masks = sum(mask_list)
if 'dim_0' in masks.dims:
masks = masks.rename({'dim_0': 'y', 'dim_1': 'x'})
# keep masks of more than n_min_threshold pixels
masks = masks.astype('int')
mask_count = masks.groupby(masks).count()
mask_count = mask_count.where(mask_count > self.n_min_threshold).dropna(masks.name).astype('int')
mask_unique = mask_count.coords['segmentation_mask'].values
masks = masks.where(np.isin(masks, mask_unique, 0))
masks = masks.drop_vars(['time'])
else:
LOG.info('No a priori plume mask has been detected.')
mask_unique = [0]
masks = xr.full_like(self.data, 0).astype('int')
masks.attrs = ''
# Replace values in the DataArray with sequential numbers
for index, value in enumerate(mask_unique):
masks = xr.where(masks == value, index, masks)
# set nan values according to data
masks = masks.where(self.data.notnull().squeeze().values)
# squeeze data and update attrs
masks = masks.rename('plume_mask')
masks.attrs['description'] = 'A priori plume mask (0: background, >0: plumes)'
masks.attrs['area'] = self.data.attrs['area']
# assign coords from AreaDefinition
if isinstance(masks.attrs['area'], AreaDefinition):
masks.coords['y'] = masks.attrs['area'].projection_y_coords
masks.coords['x'] = masks.attrs['area'].projection_x_coords
masks.coords['y'].attrs['units'] = 'm'
masks.coords['x'].attrs['units'] = 'm'
masks.coords['y'].attrs['standard_name'] = 'projection_y_coordinate'
masks.coords['y'].attrs['standard_name'] = 'projection_x_coordinate'
masks.coords['y'].attrs['long_name'] = 'y coordinate of projection'
masks.coords['x'].attrs['long_name'] = 'x coordinate of projection'
return thresholds, features, masks