Source code for hypergas.unit_spectrum

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2023-2024 HyperGas developers
#
# This file is part of hypergas.
#
# hypergas is a library to retrieve trace gases from hyperspectral satellite data
"""Calculate unit spectrum for matched filter."""

import logging
import os
from datetime import datetime
from math import cos, pi, radians

import h5py
import numpy as np
import pandas as pd
import xarray as xr
import yaml

from .lut_interp import spline_5deg_lookup

LOG = logging.getLogger(__name__)

MODE = {0: 'tropical', 1: 'midlatitudesummer', 2: 'midlatitudewinter',
        3: 'subarcticsummer', 4: 'subarcticwinter', 5: 'standard'}


[docs] class Unit_spec(): """Calculate the unit spectrum.""" def __init__(self, radiance, wvl_sensor, wvl_min, wvl_max, species='ch4', rad_source='model'): """Initialize unit_spec class. Parameters ---------- radiance : :class:`~xarray.DataArray` 3D radiance dataarray (bands, y, x). wvl_sensor : :class:`~xarray.DataArray` 1D ['bands'] or 2D['bands','x']. The central wavelengths (nm) of sensor. wvl_min : float The lower limit of wavelength (nm) for matched filter. wvl_max : float The upper limit of wavelength (nm) for matched filter. species : str The species to be retrieved: 'ch4' or 'co2'. Default: 'ch4'. rad_source : str The data ('model' or 'lut') used for calculating rads or transmissions. Default: 'model'. References ---------- - rad_source (model): `Gloudemans et al. (2008) <https://doi.org/10.5194/acp-8-3999-2008>`_. - rad_source (lut): only supporting ch4 and co2; `Foote et al. (2021) <https://hive.utah.edu/concern/datasets/9w0323039>`_. """ # load settings _dirname = os.path.dirname(__file__) with open(os.path.join(_dirname, 'config.yaml')) as f: settings = yaml.safe_load(f) data_setting = settings['data'] species_setting = settings['species'] self.absorption_dir = os.path.join(_dirname, data_setting['absorption_dir']) self.irradiance_dir = os.path.join(_dirname, data_setting['irradiance_dir']) self.modtran_dir = os.path.join(_dirname, data_setting['modtran_dir']) self.rad_source = rad_source # load variables from the "radiance" DataArray self.radiance = radiance self.wvl_sensor = wvl_sensor self.fwhm_sensor = radiance['fwhm'] self.sza = radiance.attrs['sza'] self.vza = radiance.attrs['vza'] self.wvl_min = wvl_min self.wvl_max = wvl_max # create an array of concentrations # you can modify it, but please keep the first one as zero # the unit should be ppm for rad_source=='model' or 'ppm m' for rad_source=='lut' self.conc = np.array(species_setting[species]['concentrations']) self.species = species.upper() # read ref data date_time = radiance.attrs['start_time'] self.doy = (date_time - datetime(date_time.year, 1, 1)).days + 1 self.lat = radiance.attrs['area'].get_lonlats()[1].mean() self._read_refdata() def _model(self): """ Determine atmospheric model 0 - Tropical 1 - Mid-Latitude Summer 2 - Mid-Latitude Winter 3 - Sub-Arctic Summer 4 - Sub-Arctic Winter 5 - US Standard Atmosphere """ # Determine season if self.doy < 121 or self.doy > 274: if self.lat < 0: summer = True else: summer = False else: if self.lat < 0: summer = False else: summer = True # Determine model if abs(self.lat) <= 15: model = 0 elif abs(self.lat) >= 60: if summer: model = 3 else: model = 4 else: if summer: model = 1 else: model = 2 # fixme: NO2 only supports US Standard Atmosphere if self.species == 'NO2': model = 5 self.model = model def _read_refdata(self): '''Read reference data''' # determine the model name self._model() # read atmosphere profile atm_filename = f'atmosphere_{MODE[self.model]}.dat' LOG.info(f'Read atm file: {atm_filename}') df_atm = pd.read_csv(os.path.join(self.absorption_dir, atm_filename), comment='#', header=None, sep='\t') if len(df_atm.columns) == 10: col_names = ['thickness', 'pressure', 'temperature', 'H2O', 'CO2', 'O3', 'N2O', 'CO', 'CH4', 'O2'] if len(df_atm.columns) == 11: # US standard atmosphere profile col_names = ['thickness', 'pressure', 'temperature', 'H2O', 'CO2', 'O3', 'N2O', 'CO', 'CH4', 'O2', 'NO2'] # !!! test # if len(df_atm.columns) == 12: # col_names = ['thickness', 'pressure', 'temperature', 'H2O', 'CO2', 'O3', 'N2O', 'CO', 'CH4', 'O2', 'NO2', 'O4'] df_atm.columns = col_names # fixme: set NO2 to 0 if the column is not existed. if 'NO2' not in col_names: df_atm['NO2'] = 0 # !!! test # df_atm['O4'] = 0 # read solar irradiance data E_filename = 'solar_irradiance_0400-2600nm_highres_sparse.nc' Edata = xr.open_dataset(os.path.join(self.irradiance_dir, E_filename))['irradiance'] # convert units to W m-2 um-1 Edata *= 1e3 # read abs data abs_filename = f'absorption_cs_ALL_{MODE[self.model]}.nc' LOG.debug(f'Reading the absorption file: {abs_filename}') ds_abs = xr.open_dataset(os.path.join(self.absorption_dir, abs_filename)) # save into class self.atm = df_atm self.solar_irradiance = Edata self.abs = ds_abs def _radianceCalc(self, del_omega, albedo=0.15, return_type='transmission'): """Function to calculate spectral radiance over selected band range based on trace gas del_omega (mol/m2) added to the first layer of atmosphere Parameters ---------- del_omega : float Trace gas column enhancement [mol/m2]. albedo : float Albedo. This is cancelled out in the matched filter. return_type : str Returned data type: 'transmission' or 'radiance'. Returns ------- Wavelength : array The wavelength range (nm) in the solar_irradiance data. Transmission or spectral radiance: array If return_type is 'transmission', Transmission is returned. If return_type is 'radiance', spectral radiance [1/(s*cm^2*sr*nm) is returned. """ # column number density [cm^-2] # we need to assign the copied data, otherwise it will be overwrited in each loop nH2O = self.atm['H2O'].copy().values nCO2 = self.atm['CO2'].copy().values nO3 = self.atm['O3'].copy().values nN2O = self.atm['N2O'].copy().values nCO = self.atm['CO'].copy().values nCH4 = self.atm['CH4'].copy().values nNO2 = self.atm['NO2'].copy().values # nO4 = self.atm['O4'].copy().values # add species by "d_omega" [mol m-2] to the first layer nH2O[0] = nH2O[0] + del_omega['H2O'] * 6.023e+23 / 10000 nCO2[0] = nCO2[0] + del_omega['CO2'] * 6.023e+23 / 10000 nO3[0] = nO3[0] + del_omega['O3'] * 6.023e+23 / 10000 nN2O[0] = nN2O[0] + del_omega['N2O'] * 6.023e+23 / 10000 nCO[0] = nCO[0] + del_omega['CO'] * 6.023e+23 / 10000 nCH4[0] = nCH4[0] + del_omega['CH4'] * 6.023e+23 / 10000 nNO2[0] = nNO2[0] + del_omega['NO2'] * 6.023e+23 / 10000 LOG.debug(f"RadianceCalc omega2: {nCH4[0]}") # crop solar irradiance and absorption data to the same wavelength range # the numeric issue could lead to one or two offset # so it is better to use the nearest method to get the same data with small tolerance wavelength = self.abs['abs_H2O'].sel(wavelength=slice(self.wvl_min, self.wvl_max)).coords['wavelength'].data Edata = self.solar_irradiance Edata_subset = Edata.sel(wavelength=wavelength, tolerance=1e-5, method='nearest') Edata_subset = Edata_subset.data sigma_H2O_subset = self.abs['abs_H2O'].sel(wavelength=slice(self.wvl_min, self.wvl_max)).data sigma_CO2_subset = self.abs['abs_CO2'].sel(wavelength=slice(self.wvl_min, self.wvl_max)).data sigma_N2O_subset = self.abs['abs_N2O'].sel(wavelength=slice(self.wvl_min, self.wvl_max)).data sigma_CO_subset = self.abs['abs_CO'].sel(wavelength=slice(self.wvl_min, self.wvl_max)).data sigma_CH4_subset = self.abs['abs_CH4'].sel(wavelength=slice(self.wvl_min, self.wvl_max)).data optd_H2O = np.matmul(sigma_H2O_subset, nH2O) optd_CO2 = np.matmul(sigma_CO2_subset, nCO2) optd_N2O = np.matmul(sigma_N2O_subset, nN2O) optd_CO = np.matmul(sigma_CO_subset, nCO) optd_CH4 = np.matmul(sigma_CH4_subset, nCH4) if 'NO2' in list(self.abs.keys()): # fixme: only calculate optd when the NO2 profile is available (standard model) sigma_NO2_subset = self.abs['abs_NO2'].sel(wavelength=slice(self.wvl_min, self.wvl_max)).data optd_NO2 = np.matmul(sigma_NO2_subset, nNO2) sigma_O3_subset = self.abs['abs_O3'].sel(wavelength=slice(self.wvl_min, self.wvl_max)).data optd_O3 = np.matmul(sigma_O3_subset, nO3) # !!! test # sigma_O4_subset = self.abs['abs_O4'].sel(wavelength=slice(self.wvl_min, self.wvl_max)).data # optd_O4 = np.matmul(sigma_O4_subset, nO4) else: optd_NO2 = 0 optd_O3 = 0 # !!! test # optd_O4 = 0 tau_vert = optd_H2O + optd_CO2 + optd_O3 + optd_N2O + optd_CO + optd_CH4 + optd_NO2 # + optd_O4 def f_young(za): za = radians(za) # air mass defined in Young (1994): https://encyclopedia.pub/entry/28536 f = (1.002432*(cos(za))**2 + 0.148386*cos(za) + 0.0096467)\ / ((cos(za))**3 + 0.149864*(cos(za))**2 + 0.0102963*cos(za) + 0.000303978) return f # amf: air mass factor amf = f_young(self.sza) + f_young(self.vza) # transmission = exp(-tau*amf), Ref: Eq.2 of Jongaramrungruang (2021) tau_lambda = amf * tau_vert transmission = np.exp(-tau_lambda) if return_type == 'transmission': return wavelength, transmission elif return_type == 'radiance': # irradiance to radiance self.albedo = albedo consTerm = albedo * cos(radians(self.sza)) / pi L_lambda = consTerm * np.exp(-tau_lambda) * Edata_subset return wavelength, L_lambda else: raise ValueError(f"Unrecognized return_type: {return_type}. It should be 'transmission' or 'radiance'") def _convolve(self, wvl_sensor, fwhm_sensor, wvl_lut, rad_lut): '''Convert high-resolution spectral radiance to low-resolution signal and create the unit methane absorption spectrum References: https://github.com/markusfoote/mag1c/blob/8b9ceae186f4e125bc9f628db82f41bce4c6011f/mag1c/mag1c.py#L229-L243 https://github.com/Prikaziuk/retrieval_rtmo/blob/master/src/%2Bhelpers/create_sensor_from_fwhm.m ''' # convert to numpy array if not isinstance(wvl_sensor, np.ndarray): wvl_sensor = wvl_sensor.data if not isinstance(fwhm_sensor, np.ndarray): fwhm_sensor = fwhm_sensor.data sigma = fwhm_sensor / (2.0 * np.sqrt(2.0 * np.log(2.0))) # Evaluate normal distribution explicitly var = sigma ** 2 denom = (2 * np.pi * var) ** 0.5 numer = np.exp(-(wvl_lut[:, None] - wvl_sensor[None, :])**2 / (2*var)) response = numer / denom # Normalize each gaussian response to sum to 1. response = np.divide(response, response.sum( axis=0), where=response.sum(axis=0) > 0, out=response) # implement resampling as matrix multiply resampled = rad_lut.dot(response) return resampled
[docs] def convolve_rads(self): """Calculate the convolved sensor-reaching rads or transmissions. Returns ------- convolved rads : :class:`~xarray.DataArray` 2D (conc*wvl) or 3D (bands*conc*wvl) radiances or transmissions for ``conc`` defined in the ``config.yaml`` file. """ # set the enhancement of multiple gases # xch4 is converted from ppb to mol/m2 by divideing by 2900 delta_omega = {'H2O': 0, 'CO2': 0, 'O3': 0, 'N2O': 0, 'CO': 0, 'CH4': 0, 'NO2': 0} delta_omega.update({self.species: self.conc*1000/2900}) # calculate the transmission or sensor-reaching radiance with these gases # reference self.wvl_lut, self.rad_lut = self._radianceCalc({x: 0 for x in delta_omega}, return_type='radiance') # create array for saving radiance data rads_omega = np.zeros((len(self.conc), len(self.wvl_lut))) # iterate each conc and calculate the tau for i, omega in enumerate(delta_omega[self.species]): tmp_omega = delta_omega.copy() tmp_omega.update({self.species: omega}) _, rad = self._radianceCalc(tmp_omega) rads_omega[i, :] = rad if len(self.wvl_sensor.dims) == 1: # dims: conc, bands resampled = self._convolve(self.wvl_sensor, self.fwhm_sensor, self.wvl_lut, rads_omega) elif len(self.wvl_sensor.dims) == 2: # dims: x, conc, bands self.wvl_sensor.load() resampled = xr.apply_ufunc(self._convolve, self.wvl_sensor, kwargs={'fwhm_sensor': self.fwhm_sensor, 'wvl_lut': self.wvl_lut, 'rad_lut': rads_omega}, exclude_dims=set(('bands',)), input_core_dims=[['bands', ]], output_core_dims=[['conc', 'bands']], vectorize=True, dask='parallelized', output_dtypes=['float64'], dask_gufunc_kwargs=dict(output_sizes={'x': self.wvl_sensor.sizes['x'], 'conc': len(self.conc), 'bands': self.wvl_sensor.sizes['bands'] }) ) else: raise ValueError(f'self.wvl_sensor should be 1D or 2D. Your input is {self.wvl_sensor.sizes}') return resampled
[docs] def convolve_rads_lut(self): """Calculate the convolved sensor-reaching rads. Returns ------- convolved rads : :class:`~xarray.DataArray` 2D (conc*wvl) radiances for ``conc`` defined in the ``config.yaml`` file. """ # set params for LUT sensor_altitude = 100 ground_elevation = 0 order = 1 # Spline interpolation degree water_vapor = 1.3 param = {'zenith': self.sza, # Model uses sensor height above ground 'sensor': sensor_altitude - ground_elevation, 'ground': ground_elevation, 'water': water_vapor, 'gas': self.species.lower(), 'order': order } # read LUT lut_file = os.path.join(self.modtran_dir, f'dataset_{self.species.lower()}_full.hdf5') lut = h5py.File(lut_file, 'r', rdcc_nbytes=4194304) self.wvl_lut = lut['wave'][:] grid_data = lut['modtran_data'] # calculate the rads with 1 ppm m CH4 for `unit_spec` # rad_unit = spline_5deg_lookup(grid_data, conc=1, **param) # array for saving rads rads_omega = np.empty((len(self.conc), grid_data.shape[-1])) # iterate each conc and calculate the rads for i, ppmm in enumerate(self.conc): rads_omega[i, :] = spline_5deg_lookup(grid_data, conc=ppmm, **param) if len(self.wvl_sensor.dims) == 1: resampled = self._convolve(self.wvl_sensor, self.fwhm_sensor, self.wvl_lut, rads_omega) else: raise ValueError(f'self.wvl_sensor should be 1D for LUT. Your input is {self.wvl_sensor.sizes}') return resampled
def _unit_fit(self, rads): # https://github.com/markusfoote/mag1c/blob/8b9ceae186f4e125bc9f628db82f41bce4c6011f/mag1c/mag1c.py#L241 lograd = np.log(rads, out=np.zeros_like(rads), where=rads > 0) # calculate slope [ln(Δradiance)/ Δc]: ln(xm) = ln(xr) - kΔc # Ref: Schaum (2021) and Pei (2023) slope, residuals, _, _ = np.linalg.lstsq( np.stack((np.ones_like(self.conc), self.conc)).T, lograd, rcond=None) K = slope[1, :] return K
[docs] def fit_slope(self, scaling=None): """Fit the slope for conc and rads. Parameters ---------- scaling : float The scaling factor to ensure numerical stability. Default: ``None``. """ # calculate rads based on conc LOG.info(f'Convolving rads ({self.wvl_min}~{self.wvl_max} nm) ...') if self.rad_source == 'model': rads = self.convolve_rads() elif self.rad_source == 'lut': rads = self.convolve_rads_lut() else: raise ValueError(f"rad_source only supports 'model' or 'lut'). {self.rad_source} is not supported.") LOG.info('Convolving rads (Done)') if len(rads.shape) == 2: K = self._unit_fit(rads) elif len(rads.shape) == 3: K_list = [] for x in range(rads.shape[0]): K = self._unit_fit(rads[x, ...]) K_list.append(K) K = np.stack(K_list).T # dims: bands, x if scaling is None: # auto calculation of scaling factor K_negative_max = K[K < 0].max() if K_negative_max > -1: scaling = round(-1/K_negative_max, 1) else: scaling = round(-K_negative_max, 1) return K * scaling, scaling