Source code for hypergas.a_priori_mask

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2023-2024 HyperGas developers
#
# This file is part of hypergas.
#
# hypergas is a library to retrieve trace gases from hyperspectral satellite data
"""Create a priori plume mask for hyperspectral satellite data."""

import logging

import numpy as np
import pandas as pd
import tobac
import xarray as xr
from pyresample.geometry import AreaDefinition
from scipy.stats.mstats import trimmed_std, trimmed_mean

LOG = logging.getLogger(__name__)


[docs] class Mask(): """Create a priori plume mask from trace gas enhancement field.""" def __init__(self, scn, varname, n_min_threshold=5, sigma_threshold=1): '''Initialize Mask. Args: scn (:class:`~satpy.Scene`): Satpy Scene including one variable named ``segmentation``, which is generated by :class:`hypergas.landmask.Land_mask`. ``segmentation`` is a :class:`~xarray.DataArray`: 0 = ocean, >0 = land. varname (str): The variable used to create plume mask (recommend: ``<gas>_comb_denoise``, which is the denoised data). n_min_threshold (int): The minimum number of pixels per threshold for detecting features. Default is 5. sigma_threshold (int): Gaussian filter sigma for smoothing field. Default is 1. Because the ``<gas>_comb_denoise`` field is already smoothed, 1 should be high enough. ''' # read variable, pixel resolution, and segmentation self.data = scn[varname] self.dxy = abs(self.data.y.diff('y')[0]) self.n_min_threshold = n_min_threshold self.sigma_threshold = sigma_threshold # remove attrs for iris self.data.attrs['standard_name'] = '' # add time dim for tobac time_da = pd.date_range(start=self.data.attrs['start_time'], periods=1, freq='D').to_numpy() self.data = self.data.squeeze() self.data = self.data.expand_dims(time=time_da) # get the segmentation values self.segmentation = scn['segmentation'] self.segmentation = self.segmentation.squeeze() self.segmentation_unique = pd.unique(self.segmentation.data.flatten()) self.segmentation_unique = self.segmentation_unique[~np.isnan(self.segmentation_unique)]
[docs] def get_feature_mask(self): ''' Calculate max features and apply segmentation by `tobac <https://tobac.readthedocs.io/>`_. Returns ------- thresholds : list Thresholds for creating `feature <https://tobac.readthedocs.io/en/latest/feature_detection_overview.html>`_ and `mask <https://tobac.readthedocs.io/en/latest/segmentation.html>`_. features : :class:`~pandas.DataFrame` Detected features. masks : :class:`~xarray.DataArray` Segmentations based on features. ''' feature_list = [] mask_list = [] mask_max = 0 LOG.info('Creating plume masks ...') for seg in self.segmentation_unique: LOG.debug(f'Creating features for {seg}...') # loop segmentation (0: ocean, >=1: land) seg_mask = self.segmentation == seg gas_mask = self.data.where(seg_mask) # calculate trimmed mean and std values trim_mean = trimmed_mean(gas_mask.stack(z=('y', 'x')).dropna('z'), (1e-3, 1e-3)) trim_std = trimmed_std(gas_mask.stack(z=('y', 'x')).dropna('z'), (1e-3, 1e-3)) thresholds = [trim_mean+2*trim_std, trim_mean+3*trim_std] # detect features data_mask = self.data.where(seg_mask) features = tobac.feature_detection_multithreshold( data_mask, self.dxy, thresholds, position_threshold='extreme', n_min_threshold=self.n_min_threshold, sigma_threshold=self.sigma_threshold, ) LOG.debug(f'Creating segmentation for {seg}...') if features is None: continue else: masks, features_mask = tobac.segmentation_2D(features, data_mask, self.dxy, threshold=[thresholds[0]]) # increase value based on previous largest label masks = xr.where(masks == 0, masks, masks + mask_max) mask_max = masks.max().values mask_list.append(masks) feature_list.append(features) if len(feature_list) > 0: # concat features and reset values features = pd.concat(feature_list, ignore_index=True) features['feature'] = np.arange(1, len(features)+1, 1) # sum masks together masks = sum(mask_list) if 'dim_0' in masks.dims: masks = masks.rename({'dim_0': 'y', 'dim_1': 'x'}) # keep masks of more than n_min_threshold pixels masks = masks.astype('int') mask_count = masks.groupby(masks).count() mask_count = mask_count.where(mask_count > self.n_min_threshold).dropna(masks.name).astype('int') mask_unique = mask_count.coords['segmentation_mask'].values masks = masks.where(np.isin(masks, mask_unique, 0)) masks = masks.drop_vars(['time']) else: LOG.info('No a priori plume mask has been detected.') mask_unique = [0] masks = xr.full_like(self.data, 0).astype('int') masks.attrs = '' # Replace values in the DataArray with sequential numbers for index, value in enumerate(mask_unique): masks = xr.where(masks == value, index, masks) # set nan values according to data masks = masks.where(self.data.notnull().squeeze().values) # squeeze data and update attrs masks = masks.rename('plume_mask') masks.attrs['description'] = 'A priori plume mask (0: background, >0: plumes)' masks.attrs['area'] = self.data.attrs['area'] # assign coords from AreaDefinition if isinstance(masks.attrs['area'], AreaDefinition): masks.coords['y'] = masks.attrs['area'].projection_y_coords masks.coords['x'] = masks.attrs['area'].projection_x_coords masks.coords['y'].attrs['units'] = 'm' masks.coords['x'].attrs['units'] = 'm' masks.coords['y'].attrs['standard_name'] = 'projection_y_coordinate' masks.coords['y'].attrs['standard_name'] = 'projection_x_coordinate' masks.coords['y'].attrs['long_name'] = 'y coordinate of projection' masks.coords['x'].attrs['long_name'] = 'x coordinate of projection' return thresholds, features, masks