Source code for inundation_toolbox

import numpy as np


def arrival_indices(
    map_data: np.ndarray,
    arrival_threshold: float = 0.02,
) -> np.ndarray:
    """
    Computes the arrival index. This is the index that water_height >= threshold
    Works with classmaps and maps, as long as arrival_threshold corresponds to waterdepth (maps) and class (classmap).

    Args:
        map_data (np.ndarray): map data (time, nodes)
        arrival_threshold: value >= arrival_threshold there is inundation

    Returns:
        ix_arrival (np.ndarray): arrival indices
    """

    # Determine where waterheight >= threshold
    arrived = map_data >= arrival_threshold

    # Find first time that this is the case
    indices = np.argmax(arrived, axis=0).astype(float)

    # if never inundated, then nan
    nevers = np.invert(np.amax(arrived, axis=0))

    indices[nevers] = np.nan

    return indices


[docs]def arrival_times( map_data: np.ndarray, time_step: np.timedelta64, time_unit: str = "s", arrival_threshold: float = 3, event_start: np.timedelta64 = None, ) -> np.ndarray: """ Computes the arrival times. This is the time-index*time_step that water_height >= threshold Works with classmaps and maps, as long as arrival_threshold corresponds to waterdepth (maps) and class (classmap). Args: map_data (np.ndarray): map data (time, nodes) time_step (np.timedelta64): time step between two observations in classmap time_unit (str): desired output time format arrival_threshold: value >= arrival_threshold there is inundation Returns: t_arrival (np.ndarray): arrival times """ # Determine where waterheight >= threshold indices = arrival_indices(map_data=map_data, arrival_threshold=arrival_threshold) # Turn indices into time t_arrival = np.multiply(indices, time_step / np.timedelta64(1, time_unit)) if event_start is not None: t_arrival -= event_start / np.timedelta64(1, time_unit) return t_arrival
[docs]def rising_speeds( map_data: np.ndarray, time_step: np.timedelta64, time_unit: str = "s", ) -> np.ndarray: """ Computes the rising speeds. This is defined as dh/dt between two subsequent datapoints Args: map_data (np.ndarray): filled classmap data (time, nodes) time_step (np.timedelta64): time step between two observations in classmap Returns: dh_dt (np.ndarray): rising_speeds """ dh = map_data[1:, :] - map_data[:-1, :] dt = time_step / np.timedelta64(1, time_unit) dh_dt = dh / dt return dh_dt
def rising_speeds_ssm( map_data: np.ndarray, time_step: np.timedelta64, min_h: float = 0.02, max_h=1.5, ) -> np.ndarray: """ Computes the rising speeds. This is defined as dh/dt between two subsequent datapoints Args: map_data (np.ndarray): filled classmap data (time, nodes) time_step (np.timedelta64): time step between two observations in classmap min_h (float): minimum depth (m) at which depths are considered max_h (float): maximum depth (m) at which depths are considered Returns: dh/dt (np.ndarray): rising_speeds """ # TODO allow for more waterlevels than 1.5 arrival_ixs = arrival_indices(map_data=map_data, arrival_threshold=min_h) max_h_ixs = arrival_indices(map_data=map_data, arrival_threshold=max_h) low_h = np.take_along_axis(map_data, arrival_ixs[np.newaxis, :], axis=0) upp_h = np.take_along_axis(map_data, max_h_ixs[np.newaxis, :], axis=0) dh = upp_h - low_h d_ix = max_h_ixs - arrival_ixs dt = d_ix * time_step return dh / dt
[docs]def height_of_mrs(map_data: np.ndarray, dh_dt: np.ndarray) -> np.ndarray: """ Computes the water depth at maximum rising speed. Usage example of take_along_axis and argmax. The following prints zero: max_rs_ix = np.argmax(dh_dt, axis=0) max_rs = np.take_along_axis(dh_dt, max_rs_ix[np.newaxis, :], axis=0) print(np.sum(max_rs - np.amax(dh_dt, axis=0, keepdims=True))) Args: map_data (np.ndarray): filled classmap data (time, nodes) dh_dt (np.ndarray): rising_speeds Returns: h_mrs (np.ndarray): gevaarhoogte. water depth at maximum rising speed """ # find maximum rising speed per point in mesh max_rs_ix = np.argmax(dh_dt, axis=0) # apply these indices to waterdepth mesh to obtain waterdepths at moment of maximum rising speed h_mrs = np.take_along_axis(map_data[1:, :], max_rs_ix[np.newaxis, :], axis=0) # profit return h_mrs[0, :]