Source code for hypergas.emiss

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2023-2024 HyperGas developers
#
# This file is part of hypergas.
#
# hypergas is a library to retrieve trace gases from hyperspectral satellite data
"""Calculate trace gas emission rate."""

import logging
import os
import random

import pandas as pd
import xarray as xr
from geopy.geocoders import Nominatim

import hypergas
from hypergas.ime_csf import IME_CSF
from hypergas.plume_utils import a_priori_mask_data, cm_mask_data

# set the logger level
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(name)s - %(levelname)s: %(message)s',
                    datefmt='%Y/%m/%d %H:%M:%S')
LOG = logging.getLogger(__name__)


[docs] class Emiss(): """The Emiss class.""" def __init__(self, ds, gas, plume_name): """Initialize Denoise. Parameters ---------- ds : :class:`~xarray.Dataset` The level2 or level3 product. gas : str Gas name (lowercase, e.g., “ch4”). plume_name : str The plume index name ("plume0", "plume1", ....). """ self.ds = ds self.gas = gas self.plume_name = plume_name self.filename = ds.encoding['source'] self.l1_filename = ds.attrs['filename'] # get the crs if self.ds.rio.crs: self.crs = self.ds.rio.crs else: self.crs = None # get the L3 plume NetCDF filename basename = os.path.basename(self.filename) if 'L2' in basename: self.plume_nc_filename = self.filename.replace('.nc', f'_{self.plume_name}.nc').replace('L2', 'L3') elif 'L3' in basename: self.plume_nc_filename = self.filename # we need to close the file because it will be read again in IME_CSF class self.ds.close() else: raise ValueError(f'{self.filename} is not supported. Please input the L2 or L3 nc filename.') if 'EMIT' in basename: self.sensor = 'EMIT' elif 'ENMAP' in basename: self.sensor = 'EnMAP' elif 'PRS' in basename: self.sensor = 'PRISMA' else: raise ValueError(f'{self.filename} is not supported.')
[docs] def mask_data(self, longitude, latitude, wind_source='ERA5', land_only=True, land_mask_source='OSM', only_plume=True, azimuth_diff_max=30, dist_max=180, ): """ Create plume mask using :class:`hypergas.plume_utils.a_priori_mask_data`. """ l2b_html_filename = self.filename.replace('.nc', '.html') self.longitude = longitude self.latitude = latitude self.wind_source = wind_source self.land_only = land_only self.land_mask_source = land_mask_source self.azimuth_diff_max = azimuth_diff_max self.dist_max = dist_max # select connected plume masks self.mask, lon_mask, lat_mask, \ self.longitude, self.latitude, self.plume_html_filename = a_priori_mask_data(self.ds, self.gas, self.longitude, self.latitude, self.plume_name, self.wind_source, only_plume, self.azimuth_diff_max, self.dist_max, l2b_html_filename ) self.cm_mask, self.cm_threshold = cm_mask_data(self.ds, self.gas, self.longitude, self.latitude)
[docs] def export_plume_nc(self,): """ Export plume data to L3 NetCDF file with header attributes. """ # mask data gas_mask = self.ds[self.gas].where(self.mask) with xr.set_options(keep_attrs=True): gas_cm_mask = self.ds[self.gas].where(self.cm_mask).rename(f'{self.gas}_cm') # remove background # gas_cm_mask -= self.cm_threshold gas_cm_mask.attrs['description'] = gas_cm_mask.attrs['description'] + ' masked by the Carbon Mapper v2 method' # calculate mean wind and surface pressure in the plume if they are existed if all(key in self.ds.keys() for key in ['u10', 'v10', 'sp']): u10 = self.ds['u10'].where(self.mask).mean(dim=['y', 'x']) v10 = self.ds['v10'].where(self.mask).mean(dim=['y', 'x']) sp = self.ds['sp'].where(self.mask).mean(dim=['y', 'x']) # keep attrs u10.attrs = self.ds['u10'].attrs v10.attrs = self.ds['v10'].attrs sp.attrs = self.ds['sp'].attrs array_list = [gas_mask, gas_cm_mask, u10, v10, sp] else: array_list = [gas_mask, gas_cm_mask] # save useful number for attrs sza = self.ds[self.gas].attrs['sza'] vza = self.ds[self.gas].attrs['vza'] start_time = self.ds[self.gas].attrs['start_time'] # export masked data (plume) # merge data ds_merge = xr.merge(array_list) # add crs info if self.crs is not None: ds_merge.rio.write_crs(self.crs, inplace=True) # clear attrs ds_merge.attrs = '' # set global attributes header_attrs = {'version': hypergas.__name__+'_'+hypergas.__version__, 'filename': self.l1_filename, 'start_time': start_time, 'sza': sza, 'vza': vza, 'plume_longitude': self.longitude, 'plume_latitude': self.latitude, } ds_merge.attrs = header_attrs LOG.info(f'Exported to {self.plume_nc_filename}') ds_merge.to_netcdf(self.plume_nc_filename)
[docs] def estimate(self, ipcc_sector, sp_manual=None, wspd_manual=None, land_only=True, name=None): """ Calculate the gas emission rate using :class:`hypergas.ime_csf.IME_CSF`. """ # init IME_CSF class ime_csf = IME_CSF(sensor=self.sensor, longitude_source=self.longitude, latitude_source=self.latitude, plume_nc_filename=self.plume_nc_filename, plume_name=self.plume_name, ipcc_sector=ipcc_sector, gas=self.gas, wind_source=self.wind_source, sp_manual=sp_manual, wspd_manual=wspd_manual, land_only=self.land_only, land_mask_source=self.land_mask_source) # calculate emission rates surface_pressure, wind_speed, wdir, wind_speed_all, wdir_all, wind_source_all, \ l_ime, l_eff, u_eff, IME, Q, Q_err, err_random, err_wind, err_calib, \ Q_fetch, Q_fetch_err, err_ime_fetch, err_wind_fetch, \ IME_cm, l_cm, Q_cm, \ ds_csf, n_csf, l_csf, u_eff_csf, Q_csf, Q_csf_err, err_random_csf, err_wind_csf, err_calib_csf = ime_csf.calc_emiss() # export csf data if ds_csf is not None: csf_filename = self.plume_nc_filename.replace('.nc', '_csf.nc') LOG.info(f'Exported CSF lines to {csf_filename}') ds_csf.to_netcdf(csf_filename) # get info info = ime_csf.sensor_info alpha = ime_csf.alpha beta = ime_csf.beta # calculate plume bounds with xr.open_dataset(self.plume_nc_filename) as ds: plume_mask = ~ds[self.gas].isnull() lon_mask = ds['longitude'].where(plume_mask, drop=True) lat_mask = ds['latitude'].where(plume_mask, drop=True) t_overpass = pd.to_datetime(ds[self.gas].attrs['start_time']) bounds = [lon_mask.min().item(), lat_mask.min().item(), lon_mask.max().item(), lat_mask.max().item()] # get the location attrs try: geolocator = Nominatim(user_agent='hypergas'+str(random.randint(1, 100))) location = geolocator.reverse( f'{self.latitude}, {self.longitude}', exactly_one=True, language='en') address = location.raw['address'] except Exception as e: LOG.info('Can not access openstreetmap. Leave location info to empty.') address = {} # set name by folder name if it is not specified if name is None: name = os.path.basename(os.path.dirname(self.filename)).replace('_', ' ') # save results results = {'plume_id': f"{info['instrument']}-{t_overpass.strftime('%Y%m%dt%H%M%S')}-{self.plume_name}", 'plume_latitude': self.latitude, 'plume_longitude': self.longitude, 'datetime': t_overpass.strftime('%Y-%m-%dT%H:%M:%S%z'), 'country': address.get('country', ''), 'state': address.get('state', ''), 'city': address.get('city', ''), 'name': name, 'ipcc_sector': ipcc_sector, 'gas': self.gas.upper(), 'plume_bounds': [bounds], 'instrument': info['instrument'], 'platform': info['platform'], 'provider': info['provider'], 'emission': Q, 'emission_uncertainty': Q_err, 'emission_uncertainty_random': err_random, 'emission_uncertainty_wind': err_wind, 'emission_uncertainty_calibration': err_calib, 'emission_cm': Q_cm, 'emission_fetch': Q_fetch, 'emission_fetch_uncertainty': Q_fetch_err, 'emission_fetch_uncertainty_ime': err_ime_fetch, 'emission_fetch_uncertainty_wind': err_wind_fetch, 'emission_csf': Q_csf, 'emission_csf_uncertainty': Q_csf_err, 'emission_csf_uncertainty_random': err_random_csf, 'emission_csf_uncertainty_wind': err_wind_csf, 'emission_csf_uncertainty_calibration': err_calib_csf, 'surface_pressure': surface_pressure, 'wind_source': self.wind_source, 'wind_speed': wind_speed, 'wind_direction': wdir, 'ime': IME, 'l_ime': l_ime, 'ueff_ime': u_eff, 'leff_ime': l_eff, 'ime_cm': IME_cm, 'l_cm': l_cm, 'ueff_csf': u_eff_csf, 'n_csf': n_csf, 'l_csf': l_csf, 'alpha1': alpha['alpha1'], 'alpha2': alpha['alpha2'], 'alpha3': alpha['alpha3'], 'beta1': beta['beta1'], 'beta2': beta['beta2'], 'wind_speed_all': [wind_speed_all], 'wind_direction_all': [wdir_all], 'wind_source_all': [wind_source_all], 'azimuth_diff_max': self.azimuth_diff_max, 'dist_max': self.dist_max, 'land_only': self.land_only, 'land_mask_source': self.land_mask_source, 'version': hypergas.__name__+'_'+hypergas.__version__, } # convert to DataFrame and export data as csv file df = pd.DataFrame(data=results, index=[0]) savename = self.plume_nc_filename.replace('.nc', '.csv') LOG.info(f'Exported estimates to {savename}') df.to_csv(savename, index=False)