#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2023-2024 HyperGas developers
#
# This file is part of hypergas.
#
# hypergas is a library to retrieve trace gases from hyperspectral satellite data
"""Plot orthorectified data and overlay images on folium maps."""
import base64
import gc
import logging
import os
from pathlib import Path
import cartopy.crs as ccrs
import folium
import geojson
import matplotlib.pyplot as plt
import numpy as np
from cartopy.crs import epsg as ccrs_from_epsg
from folium.features import DivIcon
from folium.plugins import (Draw, FeatureGroupSubGroup, Fullscreen, Geocoder,
MousePosition)
from pyresample.geometry import SwathDefinition
from scipy.spatial import ConvexHull
from shapely import geometry
LOG = logging.getLogger(__name__)
[docs]
class Map():
"""Plot data on folium maps."""
def __init__(self, dataset, varnames, center_map=None):
"""Initialize Map class.
Parameters
----------
dataset : :class:`~xarray.Dataset`
The xarray dataset which contains gas fields and geolocations (longitude and latitude).
varnames : list
The list of varnames to be plotted.
center_map : list
The map center: [latitude, longitude].
Examples
--------
Basic usage::
m = Map(ds, ['rgb', 'ch4'])
m.initialize()
m.plot()
m.export()
Full parameters::
m = Map(ds, ['rgb', 'ch4'])
m.initialize()
m.plot(show_layers=[False, True], opacities=[0.9, 0.7])
m.export('full_params.html')
Multiple datasets on one map::
m = Map(ds1, ['rgb', 'ch4'])
m.initialize()
m.plot()
m.ds = ds2
m.varnames = varnames2
m.plot()
m.export()
"""
self.ds = dataset
self.filename = dataset.attrs['filename']
self.varnames = varnames
# check all variabels are loaded
self._check_vars()
# get the swath info
self.swath = SwathDefinition(lons=self.ds.coords['longitude'], lats=self.ds.coords['latitude'])
# calculate the map center
if center_map is None:
self._calc_center()
else:
self.center_map = center_map
def _check_vars(self):
"""Check variables are already loaded."""
loaded_varnames = list(self.ds.data_vars)
if not all([var in loaded_varnames for var in self.varnames]):
raise ValueError(
f'{self.varnames} are not all loaded. Please make sure the name is correct and call terrain_corr() after loading it.')
def _calc_center(self):
"""Calculate center lon and lat based on geotransform info."""
# The mean value doesn't work well
# center_lon = self.ds.coords['longitude'].mean()
# center_lat = self.ds.coords['latitude'].mean()
corners = self.swath.corners
# Extract lon and lat separately
lons_corner = [c.lon for c in corners]
lats_corner = [c.lat for c in corners]
center_lon = np.rad2deg(np.mean(lons_corner))
center_lat = np.rad2deg(np.mean(lats_corner))
self.center_map = [center_lat, center_lon]
def _get_cartopy_crs_from_epsg(self, epsg_code):
if epsg_code:
try:
return ccrs_from_epsg(epsg_code)
except ValueError:
if epsg_code == 4326:
return ccrs.PlateCarree()
else:
raise NotImplementedError('The show_map() method currently does not support the given '
'projection.')
else:
raise ValueError(f'Expected a valid EPSG code. Got {epsg_code}.')
[docs]
def initialize(self):
"""Set the basic folium map background."""
m = folium.Map(location=self.center_map, zoom_start=12, tiles=None, control_scale=True)
openstreet_tile = folium.TileLayer('OpenStreetMap')
esri_tile = folium.TileLayer(
tiles='https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}',
attr='Esri',
name='Esri Satellite',
)
# add tile
m.add_child(esri_tile)
m.add_child(openstreet_tile)
# add full screen
Fullscreen(
position='topleft',
title='Expand me',
title_cancel='Exit me',
force_separate_button=True,
).add_to(m)
# add draw menu
output_geojson = str(
Path(os.path.basename(self.filename).replace('_RAD', '').replace('L1', 'L2')).with_suffix('.geojson'))
Draw(export=True, position='topleft', filename=output_geojson).add_to(m)
# add mouse position
MousePosition(position='bottomleft').add_to(m)
# add geocoder search
Geocoder(position='topleft', collapsed=True).add_to(m)
self.map = m
[docs]
def plot_png(self, out_epsg=3857, vmax=None, export_dir=None, pre_suffix=''):
"""Plot data and export to png files.
Parameters
----------
out_epsg : int
EPSG code of the output projection (3857 is the proj of folium Map).
vmax : float
The cmap vmax for plotting species (unit is as same as species variable).
export_dir : str
The directory to save plotted images (Default: the same path as filename attrs).
pre_suffix : str
The suffix added to the png and html filename (Default: "").
"""
for varname in self.varnames:
# load data
da_ortho = self.ds[varname]
# plot variables
if varname == 'rgb':
# transpose the bands
da_ortho = da_ortho.transpose(..., 'bands')
cmap = None
vmin = None
cmap_vmax = None
elif varname == 'radiance_2100':
cmap = 'viridis'
vmin = da_ortho.quantile(0.01)
cmap_vmax = da_ortho.quantile(0.99)
elif 'denoise' in varname:
# because denoised data is usually smaller than original values
# we set the maximum value as vmax for checking plumes
cmap = 'plasma'
vmin = da_ortho.mean()
cmap_vmax = da_ortho.quantile(0.99)
# crop cmap_max to max(vmax, vmin+1)
if vmax is not None:
if vmax <= cmap_vmax:
# set vmax as user defined value, if cmap_vmax is larger.
cmap_vmax = vmax
if cmap_vmax < vmin:
# if the maximum value is negative, hard-code to 1
cmap_vmax = vmin + 1
elif 'mask' in varname:
# plot a priori plume mask
cmap = 'tab20c'
vmin = 0
cmap_vmax = None
# pick mask with more than 5 pixels
da_count = da_ortho.groupby(da_ortho).count()
da_count = da_count.where(da_count > 5).dropna(varname)
count_unique = da_count.coords[varname].values
da_ortho = da_ortho.where(np.isin(da_ortho, count_unique, 0))
# remove background mask (0)
da_ortho = da_ortho.where(da_ortho > 0).squeeze()
else:
# set vmax for species automatically
# Note that vmax of radiance and denoised variables are set based on percentile values above
if vmax is None:
if 'ch4' in varname:
# hard-code vmax
if da_ortho.attrs['units'] == 'ppb':
cmap_vmax = 300
elif da_ortho.attrs['units'] == 'ppm':
cmap_vmax = 0.3
elif da_ortho.attrs['units'] == 'ppm m':
cmap_vmax = 0.3/1.25e-4
elif da_ortho.attrs['units'] == 'umol m-2':
cmap_vmax = 300/2900*1e6
elif 'co2' in varname:
# hard-code vmax
if da_ortho.attrs['units'] == 'ppb':
cmap_vmax = 1e4
elif da_ortho.attrs['units'] == 'ppm':
cmap_vmax = 10
elif da_ortho.attrs['units'] == 'ppm m':
cmap_vmax = 10/1.25e-4
elif da_ortho.attrs['units'] == 'umol m-2':
cmap_vmax = 1e4/2900*1e6
elif 'no2' in varname:
# hard-code vmax
if da_ortho.attrs['units'] == 'ppb':
cmap_vmax = 80*1e-6*2900
elif da_ortho.attrs['units'] == 'ppm':
cmap_vmax = 80*1e-6*2900/1000
elif da_ortho.attrs['units'] == 'ppm m':
cmap_vmax = 80*1e-6*2900/1000/1.25e-4
elif da_ortho.attrs['units'] == 'umol m-2':
cmap_vmax = 80
else:
raise ValueError(f"{varname} is not supported for auto colormap. Please check and add it here.")
else:
cmap_vmax = vmax
# set the vmax according to user's input
# the variable should be trace gas enhancement
cmap = 'plasma'
vmin = 0
# in case the maximum value is negative
if cmap_vmax < vmin:
cmap_vmax = vmin + 1
fig, ax = plt.subplots(subplot_kw=dict(projection=self._get_cartopy_crs_from_epsg(out_epsg)))
# because we use longitude and latitude, we need to specify the transform.
input_crs = self._get_cartopy_crs_from_epsg(4326)
ax.pcolormesh(self.ds.longitude, self.ds.latitude, da_ortho, vmin=vmin, vmax=cmap_vmax, cmap=cmap,
transform=input_crs, antialiased=True)
# turn off axis
ax.axis('off')
# set png filename
# hard code for renaming EMIT RAD filename
if export_dir is None:
export_dir = os.path.dirname(self.filename)
basename = os.path.basename(self.filename)
output_png = Path(os.path.join(export_dir, basename.replace('.', f'_{varname}.')).replace('_RAD', '').replace('L1', 'L2')).with_suffix('.png')
else:
output_png = Path(os.path.join(export_dir,
os.path.basename(self.filename).replace('.', f'_{varname}.')
.replace('_RAD', '').replace('L1', 'L2'))).with_suffix('.png')
# append pre-suffix
output_png = str(output_png).replace('.png', f'{pre_suffix}.png')
# delete pads and remove edges
fig.savefig(output_png, bbox_inches='tight', pad_inches=0.0, edgecolor=None, transparent=True, dpi=1000)
# calculate the bounds
# we need to use bounds for image overlay on folium map
extent_4326 = ax.get_extent(crs=ccrs.PlateCarree())
self.img_bounds = [[extent_4326[2], extent_4326[0]], [extent_4326[3], extent_4326[1]]]
# clean vars
del da_ortho, fig, ax
gc.collect()
[docs]
def plot(self, out_epsg=3857, vmax=None, show_layers=None, opacities=None,
marker=None, df_marker=None, export_dir=None, draw_polygon=True,
pre_suffix=''):
"""Plot data, export to png files, plot folium map, and export to html files.
Parameters
----------
out_epsg : int
EPSG code of the output projection (3857 is the proj of folium Map).
vmax : float
The cmap vmax for plotting species (unit is as same as species variable).
show_layers : bool list
Whether the layers will be shown on opening (the length should be as same as ``varnames``).
opacities : float list
The opacities of layer (the length should be as same as ``varnames``).
marker : list
The coords [lat, lon] for a yellow circle marker.
df_marker : DataFrame
The DataFrame (columns: latitude, longitude) for adding blue circle markers.
export_dir : str
The directory to save plotted images (Default: the same path as filename attrs).
draw_polygon : bool
Whether plot the scene boundary polygon (Default: True).
pre_suffix : str
The suffix added to the png and html filename (Default: "").
"""
# check the length of self.
if show_layers is None:
self.show_layers = [True]*len(self.varnames)
else:
self.show_layers = show_layers
if len(self.show_layers) != len(self.varnames):
raise ValueError(
f"self.'s length ({len(self.show_layers)}) should be as same as varnames's length ({len(self.varnames)})")
if opacities is None:
self.opacities = [0.8]*len(self.varnames)
else:
self.opacities = opacities
if len(self.opacities) != len(self.varnames):
raise ValueError(
f"opacities's length ({len(self.opacities)}) should be as same as varnames's length ({len(self.varnames)})")
# plot png images
self.plot_png(out_epsg=out_epsg, vmax=vmax, export_dir=export_dir, pre_suffix=pre_suffix)
# get the swath polygon from area boundary
hull = ConvexHull(self.swath.boundary().vertices)
lonlatPoly = geometry.Polygon(hull.points[hull.vertices])
self.gjs = geojson.Feature(geometry=lonlatPoly, properties={})
# overlay images on foilum map
self.marker = marker
self.df_marker = df_marker
self.export_dir = export_dir
self.draw_polygon = draw_polygon
self.plot_folium(pre_suffix)
# delete vars
del self.gjs
gc.collect()
[docs]
def plot_wind(self, source='ERA5', position='bottomright'):
"""Plot the wind as html element.
Parameters
----------
source : str
wind source: "ERA5" or "GEOS-FP".
position : str
the position to add the wind marker.
"""
# read data
u10 = self.ds['u10'].sel(source=source).mean().item()
v10 = self.ds['v10'].sel(source=source).mean().item()
# calculate wspd and wdir
wspd = np.sqrt(u10**2 + v10**2)
wdir = (270 - np.rad2deg(np.arctan2(v10, u10))) % 360
# read arrow image
_dirname = os.path.dirname(__file__)
arrow_img = os.path.join(_dirname, 'imgs', 'arrow.png')
with open(arrow_img, "rb") as f:
image_data = f.read()
image_base64 = base64.b64encode(image_data).decode("utf-8")
html = f"""
<img src="data:image/png;base64,{image_base64}" style="transform:rotate({wdir+180}deg);">
<h1 style="color:white;">{wspd:.1f} m/s</h1>
"""
# add marker with icon
icon = DivIcon(html=html)
gplot = FeatureGroupSubGroup(self.fg, f'{self.time_str} | {source} Wind', show=False)
self.map.add_child(gplot)
gplot.add_child(folium.Marker(self.center_map, icon=icon, draggable=True))
[docs]
def plot_folium(self, pre_suffix=''):
"""Overlay png images on folium map.
Parameters
----------
pre_suffix : str
The suffix added to the png and html filename.
"""
# get the time string
self.time_str = self.ds[self.varnames[0]].attrs['start_time']
sensor_name = self.ds[self.varnames[0]].attrs['sensor']
# add swath poly
if self.draw_polygon:
fg_poly = folium.FeatureGroup(name='Swath polygons')
self.map.add_child(fg_poly)
style = {'fillColor': '#00000000', 'color': 'dodgerblue'}
folium.GeoJson(self.gjs,
control=False,
tooltip=self.time_str,
zoom_on_click=False,
style_function=lambda x: style,
# highlight_function= lambda feat: {'fillColor': 'blue'},
).add_to(fg_poly)
# add the group which controls all subgroups (varnames)
self.fg = folium.FeatureGroup(name=f'{self.time_str} group ({sensor_name})')
self.map.add_child(self.fg)
for index, varname in enumerate(self.varnames):
layer_name = f'{self.time_str} | {varname}'
gplot = FeatureGroupSubGroup(self.fg, layer_name, show=self.show_layers[index])
self.map.add_child(gplot)
if self.export_dir is None:
export_dir = os.path.dirname(self.filename)
basename = os.path.basename(self.filename)
output_png = Path(os.path.join(export_dir, basename.replace('.', f'_{varname}.')).replace('_RAD', '').replace('L1', 'L2')).with_suffix('.png')
else:
output_png = Path(os.path.join(self.export_dir,
os.path.basename(self.filename)
.replace('.', f'_{varname}.')
.replace('_RAD', '').replace('L1', 'L2'))).with_suffix('.png')
# append pre-suffix
output_png = str(output_png).replace('.png', f'{pre_suffix}.png')
# plot 2D trace gas field
raster = folium.raster_layers.ImageOverlay(image=str(output_png),
opacity=self.opacities[index],
bounds=self.img_bounds,
name=layer_name,
)
raster.add_to(gplot)
# plot wind
try:
self.plot_wind(source='ERA5')
self.plot_wind(source='GEOS-FP')
except Exception as e:
LOG.warning(e)
LOG.warning('Wind data is not available in this file. Skip plotting wind arrows.')
if self.marker is not None:
# add a yellow circle marker (lat, lon)
folium.Circle([self.marker[0], self.marker[1]], radius=100, color='yellow').add_to(self.map)
if self.df_marker is not None:
# loop dataframe to add CircleMarker with popup table
for (index, row) in self.df_marker.iterrows():
html = self.df_marker.iloc[index]\
.to_frame().to_html(classes="table table-striped table-hover table-condensed table-responsive")
popup = folium.Popup(html, max_width=500)
folium.CircleMarker(location=[row.loc['latitude'], row.loc['longitude']], radius=6,
color='dodgerblue', fill_color='dodgerblue', fill_opacity=0.3,
popup=popup,
).add_to(self.map)
[docs]
def export(self, savename=None, pre_suffix=''):
"""Export plotted folium map to html file.
Parameters
----------
savename : str
The exported html filename.
pre_suffix : str
The suffix added to the html filename.
"""
layer_control = folium.LayerControl(collapsed=False, position='topleft', draggable=True)
self.map.add_child(layer_control)
if savename is None:
savename = str(Path(self.filename.replace('_RAD', '').replace('L1', 'L2')).with_suffix('.html'))
# append pre-suffix
savename = savename.replace('.html', f'{pre_suffix}.html')
LOG.info(
f'Export folium map to {savename}')
self.map.save(savename)