Source code for hypergas.landmask

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2023-2024 HyperGas developers
#
# This file is part of hypergas.
#
# hypergas is a library to retrieve trace gases from hyperspectral satellite data
"""Create 2D landmask for hyperspectral satellite data."""

import logging
import os
import gc

import cartopy.feature as cfeature
import geopandas as gpd
import numpy as np
import xarray as xr
import yaml
from pyresample import kd_tree
from pyresample.geometry import GridDefinition, SwathDefinition

LOG = logging.getLogger(__name__)


[docs] def find_tiles(lat_min, lat_max, lon_min, lon_max): """ Generate the list of OSM filenames for the tiles that cover the given bounding box. Parameters ---------- lat_min : float minimum latitude of the bounding box. lat_max : float maximum latitude of the bounding box. lon_min : float minimum longitude of the bounding box. lon_max : float maximum longitude of the bounding box. Returns ------- tiles : list The list of tif filenames. """ lat_range = range(int(lat_min // 5 * 5), int((lat_max // 5 + 1) * 5), 5) lon_range = range(int(lon_min // 5 * 5), int((lon_max // 5 + 1) * 5), 5) tiles = [] for lat in lat_range: for lon in lon_range: lat_prefix = 'n' if lat >= 0 else 's' lon_prefix = 'e' if lon >= 0 else 'w' tile_filename = f"{lat_prefix}{abs(lat):02d}{lon_prefix}{abs(lon):03d}.tif" tiles.append(tile_filename) return tiles
[docs] def Land_mask(lons, lats, source='OSM'): """Create the segmentation for land and ocean/lake types. Parameters ---------- lons : :class:`numpy.ndarray` 2D longitude of pixels. lats : :class:`numpy.ndarray` 2D latitude of pixels. source : str The data source of land mask (“OSM”, “GSHHS” or “Natural Earth”), Default: “OSM”. Returns ------- Land_mask : :class:`~xarray.DataArray` 2D array, 0: ocean/lake, 1: land """ # load land data LOG.info(f'Creating land mask using {source} data') if source == 'OSM': # get the OSM data path _dirname = os.path.dirname(__file__) with open(os.path.join(_dirname, 'config.yaml')) as f: settings = yaml.safe_load(f) osm_dir = os.path.join(_dirname, settings['data']['osm_dir']) # read OSM+ESA_WorldCover Watermask lat_min, lat_max = lats.min(), lats.max() lon_min, lon_max = lons.min(), lons.max() osm_filenames = find_tiles(lat_min, lat_max, lon_min, lon_max) osm_paths = [os.path.join(osm_dir, fname) for fname in osm_filenames] # Load and crop the OSM data with xr.open_mfdataset(osm_paths) as ds_osm: da_osm = ds_osm['band_data'].isel(band=0) osm_crop = da_osm.sel(y=slice(lat_max, lat_min), x=slice(lon_min, lon_max)) osm_crop.load() # set the resample grids lon_grid, lat_grid = np.meshgrid(osm_crop.x, osm_crop.y) swath_def = SwathDefinition(lons=lons, lats=lats) # resample data by nearest (10 m) grid_def = GridDefinition(lons=lon_grid, lats=lat_grid) landmask = kd_tree.resample_nearest(grid_def, osm_crop.data, swath_def, radius_of_influence=10).astype('int') # landmask: 0->1, 1->0 landmask = np.where((landmask == 0) | (landmask == 1), landmask ^ 1, landmask).astype(float) del da_osm, osm_crop, lon_grid, lat_grid gc.collect() elif source in ['Natural Earth', 'GSHHS']: if source == 'Natural Earth': land_data = cfeature.NaturalEarthFeature('physical', 'land', '10m') elif source == 'GSHHS': land_data = cfeature.GSHHSFeature(scale='full') # load data into GeoDataFrame land_polygons = list(land_data.geometries()) land_gdf = gpd.GeoDataFrame(crs='epsg:4326', geometry=land_polygons) # create Point GeoDataFrame points = gpd.GeoSeries(gpd.points_from_xy(lons.ravel(), lats.ravel())) points_gdf = gpd.GeoDataFrame(geometry=points, crs="EPSG:4326") # Spatially join the points with the land polygons joined = gpd.sjoin(points_gdf, land_gdf, how='left', predicate='within') # Check if each point is within a land polygon is_within_land = joined['index_right'].notnull() # create the mask landmask = is_within_land.values.reshape(lons.shape).astype(float) else: raise ValueError( "Please input the correct land data source ('GSHHS' or 'Natural Earth'). {land_data} is not supported") LOG.info(f'Creating land mask using {source} data (Done)') # save to DataArray segmentation = xr.DataArray(landmask, dims=['y', 'x']) segmentation.attrs['description'] = f'{source} land mask (0: ocean/lake, 1: land)' return segmentation