#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2023-2024 HyperGas developers
#
# This file is part of hypergas.
#
# hypergas is a library to retrieve trace gases from hyperspectral satellite data
"""Hyper object to hold hyperspectral satellite data."""
import logging
from datetime import timedelta
import os
import yaml
import numpy as np
import xarray as xr
from satpy import DataQuery, Scene
from .angles import Angle2D, compute_raa, compute_sga
from .denoise import Denoise
from .hsi2rgb import Hsi2rgb
from .orthorectification import Ortho
from .retrieve import MatchedFilter
from .tle import TLE
from .wind import Wind
from .a_priori_mask import Mask
AVAILABLE_READERS = ['hsi_l1b', 'emit_l1b', 'hyc_l1']
LOG = logging.getLogger(__name__)
[docs]
class Hyper():
"""The Hyper Class.
Example usage::
from hypergas import Hyper
# create Hyper and open files
hyp = Hyper(filenames='/path/to/file/*')
# load datasets from input files
hyp.load()
# retrieve ch4
hyp.retrieve(self, wvl_intervals=[2110, 2450], species='ch4')
# orthorectification
hyp.terrain_corr(varname='rgb')
# export to NetCDF file
hyp.scene.save_datasets(datasets=['u10', 'v10', 'rgb', 'ch4'], filename='output.nc', writer='cf')
"""
def __init__(self, filename=None, reader=None):
"""Initialize Hyper.
To load Hyper data, ``filename`` and ``reader`` must be specified::
hyp = Hyper(filenames=glob('/path/to/hyper/files/*'), reader='hsi_l1b')
Parameters
----------
filename : list
The files to be loaded.
reader : str
The name of the reader to use for loading the data.
"""
self.filename = filename
self.reader = reader
self.available_dataset_names = self._get_dataset_names()
# load settings
_dirname = os.path.dirname(__file__)
with open(os.path.join(_dirname, 'config.yaml')) as f:
settings = yaml.safe_load(f)
self.species_setting = settings['species']
def _get_dataset_names(self):
"""Get the necessary dataset_names for retrieval."""
if self.reader == 'hsi_l1b':
# EnMAP L1B
swir_rad_id = DataQuery(name='swir', calibration='radiance')
vnir_rad_id = DataQuery(name='vnir', calibration='radiance')
dataset_names = [swir_rad_id, vnir_rad_id, 'deadpixelmap',
'rpc_coef_vnir', 'rpc_coef_swir']
elif self.reader == 'emit_l1b':
# EMIT L1B
dataset_names = ['glt_x', 'glt_y', 'radiance', 'sza', 'vza', 'saa', 'vaa']
elif self.reader == 'hyc_l1':
swir_rad_id = DataQuery(name='swir', calibration='radiance')
vnir_rad_id = DataQuery(name='vnir', calibration='radiance')
dataset_names = [swir_rad_id, vnir_rad_id]
# uncomment the line if you wanna use 2D CW
# note that 2D CW sometimes lead to wrong retrieval and you need to correct the smile effect
# dataset_names = [swir_rad_id, vnir_rad_id, 'cw_vnir', 'cw_swir']
else:
raise ValueError(f"'reader' must be a list of available readers: {AVAILABLE_READERS}")
return dataset_names
def _rgb_composite(self):
"""Create RGB composite"""
try:
# --- HSI2RGB method ---
LOG.info('Use HSI2RGB method for RGB image')
# slice data to VIS range
da_vis = self.scene['radiance'].sel(bands=slice(380, 750))
data = da_vis.stack(z=['y', 'x']).transpose(..., 'bands')
# generate RGB img
rgb = Hsi2rgb(data.coords['bands'], data.data,
da_vis.sizes['y'], da_vis.sizes['x'],
65, 1e-3)
# save to DataArray with correct dim order
rgb = xr.DataArray(rgb, dims=['y', 'x', 'bands'], coords={'bands': np.array([650, 560, 470])})
rgb = rgb.transpose('bands', ...)
# copy attrs
rgb.attrs = self.scene['radiance'].attrs
rgb.attrs['units'] = '1'
rgb.attrs['standard_name'] = 'true_color'
rgb.attrs['long_name'] = 'true color (RGB)'
rgb = rgb.rename('rgb')
self.scene['rgb'] = rgb
except:
# --- nearest method ---
LOG.info('HSI2RGB failed. Using Nearest method for RGB image now.')
def gamma_norm(band):
"""Apply gamma_norm to create RGB composite"""
gamma = 50
band_gamma = np.power(band, 1/gamma)
band_min, band_max = (np.nanmin(band_gamma), np.nanmax(band_gamma))
return ((band_gamma-band_min)/((band_max - band_min)))
rgb = self.scene['radiance'].sel(bands=[650, 560, 470], method='nearest')
if rgb.chunks is not None:
rgb.load()
rgb = xr.apply_ufunc(gamma_norm,
rgb.transpose(..., 'bands'),
exclude_dims=set(('y', 'x')),
input_core_dims=[['y', 'x']],
output_core_dims=[['y', 'x']],
vectorize=True,
)
# copy attrs
rgb.attrs = self.scene['radiance'].attrs
rgb.attrs['units'] = '1'
rgb.attrs['standard_name'] = 'true_color'
# remove useless attrs
if 'calibration' in rgb.attrs:
del rgb.attrs['calibration']
rgb = rgb.rename('rgb')
self.scene['rgb'] = rgb
def _calc_sensor_angle(self):
"""Calculate the VAA and VZA from TLE file"""
# get the TLE info
delta_day = timedelta(days=1)
closest_tle = TLE(self.platform_name).get_tle(self.start_time-delta_day,
self.start_time+delta_day,
overpass_time=self.start_time)
while closest_tle is None:
# in case tle file is not available for the initial time window
delta_day += timedelta(days=1)
closest_tle = TLE(self.platform_name).get_tle(self.start_time-delta_day,
self.start_time+delta_day,
overpass_time=self.start_time)
# calculate the lon and lat center
lons, lats = self.area.get_lonlats()
# Create angle calculator
angle_calc = Angle2D(
start_time=self.start_time,
end_time=self.end_time,
lons=lons,
lats=lats,
tle1=closest_tle[0],
tle2=closest_tle[1],
)
# Compute all angles as xarray.Dataset
angle_ds = angle_calc.compute_all()
# Copy the attrs
for name in angle_ds.data_vars:
attrs = self._extract_attrs(angle_ds[name])
angle_ds[name] = self._copy_attrs(angle_ds[name], attrs)
return angle_ds
# Copy the attrs
# for var_name in angle_ds.data_vars:
# print(f'Copy attrs for {var_name}')
# attrs_dict = {
# key: angle_ds[var_name].attrs[key]
# for key in ['standard_name', 'long_name', 'units', 'description']
# if key in angle_ds[var_name].attrs
# }
# angle_ds[var_name] = self._copy_attrs(angle_ds[var_name], attrs_dict)
# return angle_ds
def _copy_attrs(self, new_data, new_attrs_dict):
"""Copy the radiance attrs to new DataArray
Parameters
----------
new_data : DataArray
new_attrs_dict : dict
dict of new attributes
the dict should include these keys: 'standard_name', 'long_name', 'description', 'units'
"""
# copy attrs and add units
new_data.attrs = self.scene['radiance'].attrs
new_data = new_data.assign_attrs(new_attrs_dict)
# remove useless attrs
if 'calibration' in new_data.attrs:
del new_data.attrs['calibration']
return new_data
def _extract_attrs(self, da):
"""Extract only angle-related attrs from a DataArray."""
_ATTR_KEYS = ('standard_name', 'long_name', 'units', 'description')
return {
k: da.attrs[k]
for k in _ATTR_KEYS
if k in da.attrs
}
def _scale_units(self, units):
"""Scale units from ppm"""
if units == 'ppm':
scale = 1
elif units == 'ppm m':
scale = 1/1.25e-4
elif units == 'ppb':
scale = 1e3
elif units == 'umol m-2':
scale = 1000/2900*1e6 # ppm -> umol m-2
else:
raise ValueError(f'We do not support converting ppm to {units}.')
return scale
[docs]
def load(self, drop_waterbands=True):
"""Load data into xarray Dataset using Satpy.
Parameters
----------
drop_waterbands : bool
whether to drop bands affected by water. Default: True.
"""
# load available datasets
scn = Scene(self.filename, reader=self.reader)
scn.load(self.available_dataset_names)
# merge band dims into one "bands" dims if they are splited (e.g. EnMAP and PRISMA)
if 'radiance' not in self.available_dataset_names:
# Note that although we concat these DataArrays
# there are offsets between EnMAP VNIR and SWIR data
scn['radiance'] = xr.concat([scn['vnir'].rename({'bands_vnir': 'bands', 'fwhm_vnir': 'fwhm'}),
scn['swir'].rename({'bands_swir': 'bands', 'fwhm_swir': 'fwhm'})
],
'bands')
else:
scn['radiance'] = scn['radiance']
# merge 2D VNIR and SWIR central wavelength into one DataArray (PRISMA)
# output dims: (bands, x)
if all(x in self.available_dataset_names for x in ['cw_vnir', 'cw_swir']):
scn['central_wavelengths'] = xr.concat([scn['cw_vnir'].rename({'bands_vnir': 'bands', 'fwhm_vnir': 'fwhm'}),
scn['cw_swir'].rename({'bands_swir': 'bands', 'fwhm_swir': 'fwhm'})
],
'bands')
scn['central_wavelengths'] = scn['central_wavelengths'].drop_duplicates(dim='bands')
# sort bands and remove zero values
scn['central_wavelengths'] = scn['central_wavelengths'].sortby('bands').sel(bands=slice(1e-7, None))
# drop duplicated bands and sort it
# this is the case for PRISMA
scn['radiance'] = scn['radiance'].drop_duplicates(dim='bands')
scn['radiance'] = scn['radiance'].sortby('bands').sel(bands=slice(1e-7, None))
# get attrs
self.start_time = scn['radiance'].attrs['start_time']
self.end_time = scn['radiance'].attrs['end_time']
self.platform_name = scn['radiance'].attrs['platform_name']
self.area = scn['radiance'].attrs['area']
# get loaded variables
loaded_names = [x['name'] for x in scn.keys()]
if drop_waterbands:
# drop water vapor bands
bands = scn['radiance']['bands']
water_mask = ((1358 < bands) & (bands < 1453)) | ((1814 < bands) & (bands < 1961))
scn['radiance'] = scn['radiance'].where(~water_mask, drop=True)
if 'central_wavelengths' in loaded_names:
scn['central_wavelengths'] = scn['central_wavelengths'].where(~water_mask, drop=True)
# load wind data and set flag for determining whether it is loaded successfully
try:
wind = Wind(scn)
scn['u10'] = wind.u10
scn['v10'] = wind.v10
scn['sp'] = wind.sp
self.wind = True
except Exception as e:
LOG.warning(e)
LOG.warning("It seems we can't find any wind data for the date. Please check.")
self.wind = False
# get the radiance at 2100 nm which is useful to check albedo effects
scn['radiance_2100'] = scn['radiance'].sel(
bands=2100, method='nearest').rename('radiance_2100').expand_dims('bands')
scn['radiance_2100'].attrs['long_name'] = 'TOA radiance at 2100 nm'
scn['radiance_2100'].attrs['description'] = 'TOA radiance at 2100 nm'
# drop useless coords of radiance_2100
coords = list(scn['radiance_2100'].coords)
if len(coords) > 0:
scn['radiance_2100'] = scn['radiance_2100'].drop_vars(coords)
# save into scene
self.scene = scn
# calculate 2d angles if they are available from L1 data (e.g., EnMAP and PRISMA)
if 'sza' not in loaded_names:
ds_angles = self._calc_sensor_angle()
# Add each angle as a separate dataset to the Scene
scn['sza'] = ds_angles['sza']
scn['saa'] = ds_angles['saa']
scn['vza'] = ds_angles['vza']
scn['vaa'] = ds_angles['vaa']
scn['raa'] = ds_angles['raa']
scn['sga'] = ds_angles['sga']
# Compute only what is missing from L2 data (e.g., EMIT)
# raa
if 'raa' not in scn:
da = compute_raa(scn['saa'], scn['vaa'])
attrs = self._extract_attrs(da)
scn['raa'] = self._copy_attrs(da, attrs)
# sga
if 'sga' not in scn:
da = compute_sga(
scn['sza'], scn['saa'],
scn['vza'], scn['vaa'],
)
attrs = self._extract_attrs(da)
scn['sga'] = self._copy_attrs(da, attrs)
# make sure the mean "sza" and "vza" are set as attrs of `scn['radiance']`
# we need these for radianceCalc later
if 'sza' not in scn['radiance'].attrs:
scn['radiance'].attrs['sza'] = scn['sza'].mean().load().item()
if 'vza' not in scn['radiance'].attrs:
scn['radiance'].attrs['vza'] = scn['vza'].mean().load().item()
# generate RGB composite
LOG.info('Generating RGB')
self._rgb_composite()
[docs]
def retrieve(self, wvl_intervals=None, species='ch4',
algo='smf', mode='column', rad_dist='normal',
land_mask=True, land_mask_source='OSM',
cluster=False, plume_mask=None):
"""Retrieve trace gas enhancements.
Parameters
----------
wvl_intervals: list
The wavelength range (nm) used in matched filter. It can be one list or nested list.
e.g. ``[2110, 2450]`` or ``[[1600, 1750], [2110, 2450]]``.
Deafult: ``[2110, 2450]`` for ch4 and ``[1930, 2200]`` for co2.
species : str
The species ("ch4", "co2") to be retrieved.
Default: "ch4".
algo : str
The matched filter algorithm, currently supporting only one algorithm:
simple matched filter (smf). This is the original matched filter algorithm.
mode : str
The mode ("column" or "scene") to apply matched filter.
Default: "column". Be careful of noise if you apply the matched filter for the whole scene.
rad_dist : str
The assumed rads distribution ("normal" or "lognormal")
Default: "normal".
land_mask : bool
Whether apply the matched filter to continental and oceanic pixels seperately.
Default: True.
land_mask_source : str
The data source of land mask ("OSM", "GSHHS" or "Natural Earth").
Default: OSM.
cluster : bool
Whether apply the pixel classification.
Default: False.
plume_mask : :class:`numpy.ndarray`
2D manual mask. 0: neglected pixels, 1: valid pixels.
Default: ``None``.
"""
rad_source = self.species_setting[species]['rad_source']
if wvl_intervals is None:
wvl_intervals = self.species_setting[species]['wavelength']
if rad_source not in ['model', 'lut']:
raise ValueError(
f"The rad_source in the config.yaml file should be 'model' or 'lut'. {rad_source} is not supported.")
units = self.species_setting[species]['units']
unit_scale = self._scale_units(units)
if (rad_source == 'lut') and (species not in ['ch4', 'co2']):
raise ValueError(f"Please input a correct species name (ch4 or co2). {species} is not supported by LUT.")
mf = MatchedFilter(self.scene, wvl_intervals, species, mode, rad_dist,
rad_source, land_mask, land_mask_source, cluster, plume_mask)
segmentation = mf.segmentation
enhancement = getattr(mf, algo)()
# load the retrieval results
if enhancement.chunks is not None:
enhancement.load()
# set units
enhancement *= unit_scale
# copy attrs
segmentation_attrs = {'standard_name': 'segmentation',
'long_name': 'pixel segmentation',
'units': 1,
'description': segmentation.attrs['description'],
}
enhancement_attrs = {'standard_name': f"{self.species_setting[species]['name']}_enhancement",
'long_name': f"{self.species_setting[species]['name']}_enhancement",
'units': units,
'description': f"{self.species_setting[species]['name']} enhancement derived by the {wvl_intervals[0]}~{wvl_intervals[1]} nm window",
}
enhancement = self._copy_attrs(enhancement, enhancement_attrs)
segmentation = self._copy_attrs(segmentation, segmentation_attrs)
# copy to scene
self.scene['segmentation'] = segmentation.rename('segmentation')
self.scene[species] = enhancement.rename(species)
# add the MF info
if 'mf' in algo:
self.scene[species].attrs['matched_filter'] = f'{rad_dist} matched filter'
[docs]
def terrain_corr(self, varname='rgb', rpcs=None, gcps=None, gcp_crs=None):
"""Apply orthorectification using :class:`hypergas.orthorectification.Ortho`.
Parameters
----------
varname : str
The variable to be orthorectified.
rpcs:
The Ground Control Points (gcps) or Rational Polynomial Coefficients (rpcs).
If `rpcs` is None, we look for glt_x/glt_y data automatically.
Returns
-------
da_ortho : :class:`~xarray.DataArray`
The orthorectified data.
"""
da_ortho = Ortho(self.scene, varname, rpcs=rpcs, gcps=gcps, gcp_crs=gcp_crs).apply_ortho()
return da_ortho
[docs]
def denoise(self, varname='ch4', method='calibrated_tv_filter', weight=None):
"""Denoise the input data using :class:`hypergas.denoise.Denoise`.
Parameters
----------
varname : str
The variable to be denoised.
method : str
The denoising method: "tv_filter" and "calibrated_tv_filter" (default).
weight : int
The weight for denoise_tv_chambolle.
It would be neglected if method is "calibrated_tv_filter".
If ``weight`` is ``None`` (default) and ``method`` is "tv_filter",
the denoise_tv_chambolle will use the default value (0.1) which is too low for hyperspectral noisy gas field.
Returns
-------
da_denoise : :class:`~xarray.DataArray`
Denoised data.
"""
da_denoise = Denoise(self.scene, varname, method=method, weight=weight).smooth()
return da_denoise
[docs]
def plume_mask(self, varname='ch4_comb_denoise', n_min_threshold=5, sigma_threshold=1):
"""Create a priori plume masks using :class:`hypergas.a_priori_mask.Mask`.
Parameters
----------
scn : Satpy Scene
Scene including one variable named ``segmentation`` which is calculated by :meth:`hypergas.landmask.Land_mask`.
segmentation (:class:`~xarray.DataArray`): 0: ocean, >0: land
varname : str
The variable used to create plume mask. (Recommend: ``<gas>_comb_denoise``)
n_min_threshold : int
The minimum number of pixels per threshold for detecting features. (Default: 5).
sigma_threshold : int
Gaussian filter sigma for smoothing field.
Default: 1. Because the ``<gas>_comb_denoise`` field is already smoothed, 1 should be high enough.
"""
thresholds, features, da_plume_mask = Mask(
self.scene, varname, n_min_threshold=n_min_threshold, sigma_threshold=sigma_threshold).get_feature_mask()
return da_plume_mask