Source code for xrspatial.terrain

# std lib
from functools import partial
from typing import List, Optional, Tuple, Union

import datashader as ds
# 3rd-party
import numpy as np
import pandas as pd
import xarray as xr

try:
    import cupy
except ImportError:
    class cupy(object):
        ndarray = False

import dask.array as da

# local modules
from xrspatial.utils import (ArrayTypeFunctionMapping, cuda_args, get_dataarray_resolution,
                             not_implemented_func)

from .perlin import _perlin, _perlin_gpu


def _scale(value, old_range, new_range):
    d = (value - old_range[0]) / (old_range[1] - old_range[0])
    return d * (new_range[1] - new_range[0]) + new_range[0]


def _gen_terrain(height_map, seed, x_range=(0, 1), y_range=(0, 1)):
    height, width = height_map.shape

    # multiplier, (xfreq, yfreq)
    NOISE_LAYERS = ((1 / 2**i, (2**i, 2**i)) for i in range(16))

    linx = np.linspace(
        x_range[0], x_range[1], width, endpoint=False, dtype=np.float32
    )
    liny = np.linspace(
        y_range[0], y_range[1], height, endpoint=False, dtype=np.float32
    )
    x, y = np.meshgrid(linx, liny)
    nrange = np.arange(2**20, dtype=np.int32)
    for i, (m, (xfreq, yfreq)) in enumerate(NOISE_LAYERS):
        np.random.seed(seed+i)
        p = np.random.permutation(nrange)
        p = np.append(p, p)

        noise = _perlin(p, x * xfreq, y * yfreq) * m

        height_map += noise

    height_map /= (1.00 + 0.50 + 0.25 + 0.13 + 0.06 + 0.03)
    height_map = height_map ** 3
    return height_map


def _terrain_numpy(data: np.ndarray,
                   seed: int,
                   x_range_scaled: tuple,
                   y_range_scaled: tuple,
                   zfactor: int) -> np.ndarray:

    # data.fill(0)
    data = data * 0
    data[:] = _gen_terrain(
        data, seed, x_range=x_range_scaled, y_range=y_range_scaled
    )

    data = (data - np.min(data))/np.ptp(data)
    data[data < 0.3] = 0  # create water
    data *= zfactor

    return data


def _terrain_dask_numpy(data: da.Array,
                        seed: int,
                        x_range_scaled: tuple,
                        y_range_scaled: tuple,
                        zfactor: int) -> da.Array:
    data = data * 0

    height, width = data.shape
    linx = da.linspace(
        x_range_scaled[0], x_range_scaled[1], width, endpoint=False,
        dtype=np.float32
    )
    liny = da.linspace(
        y_range_scaled[0], y_range_scaled[1], height, endpoint=False,
        dtype=np.float32
    )
    x, y = da.meshgrid(linx, liny)

    nrange = np.arange(2 ** 20, dtype=np.int32)

    # multiplier, (xfreq, yfreq)
    NOISE_LAYERS = ((1 / 2 ** i, (2 ** i, 2 ** i)) for i in range(16))
    for i, (m, (xfreq, yfreq)) in enumerate(NOISE_LAYERS):
        np.random.seed(seed + i)
        p = np.random.permutation(nrange)
        p = np.append(p, p)

        _func = partial(_perlin, p)
        noise = da.map_blocks(
            _func,
            x * xfreq,
            y * yfreq,
            meta=np.array((), dtype=np.float32)
        )

        data += noise * m

    data /= (1.00 + 0.50 + 0.25 + 0.13 + 0.06 + 0.03)
    data = data ** 3

    data = (data - np.min(data)) / np.ptp(data)
    data[data < 0.3] = 0  # create water
    data *= zfactor

    return data


def _terrain_gpu(height_map, seed, x_range=(0, 1), y_range=(0, 1)):

    NOISE_LAYERS = ((1 / 2**i, (2**i, 2**i)) for i in range(16))

    noise = cupy.empty_like(height_map, dtype=np.float32)

    griddim, blockdim = cuda_args(height_map.shape)

    for i, (m, (xfreq, yfreq)) in enumerate(NOISE_LAYERS):

        # cupy.random.seed(seed+i)
        # p = cupy.random.permutation(2**20)

        # use numpy.random then transfer data to GPU to ensure the same result
        # when running numpy backed and cupy backed data array.
        np.random.seed(seed+i)
        p = cupy.asarray(np.random.permutation(2**20))
        p = cupy.append(p, p)

        _perlin_gpu[griddim, blockdim](
            p, x_range[0] * xfreq, x_range[1] * xfreq,
            y_range[0] * yfreq, y_range[1] * yfreq,
            m, noise
        )

        height_map += noise

    height_map /= (1.00 + 0.50 + 0.25 + 0.13 + 0.06 + 0.03)
    height_map = height_map ** 3
    return height_map


def _terrain_cupy(data: cupy.ndarray,
                  seed: int,
                  x_range_scaled: tuple,
                  y_range_scaled: tuple,
                  zfactor: int) -> cupy.ndarray:

    data = data * 0

    data[:] = _terrain_gpu(data, seed,
                           x_range=x_range_scaled,
                           y_range=y_range_scaled)
    minimum = cupy.amin(data)
    maximum = cupy.amax(data)

    data[:] = (data - minimum)/(maximum - minimum)
    data[data < 0.3] = 0  # create water
    data *= zfactor

    return data


[docs]def generate_terrain(agg: xr.DataArray, x_range: tuple = (0, 500), y_range: tuple = (0, 500), seed: int = 10, zfactor: int = 4000, full_extent: Optional[Union[Tuple, List]] = None, name: str = 'terrain' ) -> xr.DataArray: """ Generates a pseudo-random terrain which can be helpful for testing raster functions. Parameters ---------- x_range : tuple, default=(0, 500) Range of x values. x_range : tuple, default=(0, 500) Range of y values. seed : int, default=10 Seed for random number generator. zfactor : int, default=4000 Multipler for z values. full_extent : str, default=None bbox<xmin, ymin, xmax, ymax>. Full extent of coordinate system. Returns ------- terrain : xr.DataArray 2D array of generated terrain values. References ---------- - Michael McHugh: https://www.youtube.com/watch?v=O33YV4ooHSo - Red Blob Games: https://www.redblobgames.com/maps/terrain-from-noise/ Examples -------- .. plot:: :include-source: >>> import numpy as np >>> import xarray as xr >>> from xrspatial import generate_terrain >>> W = 400 >>> H = 300 >>> data = np.zeros((H, W), dtype=np.float32) >>> raster = xr.DataArray(data, dims=['y', 'x']) >>> xrange = (-20e6, 20e6) >>> yrange = (-20e6, 20e6) >>> seed = 2 >>> zfactor = 10 >>> terrain = generate_terrain(raster, xrange, yrange, seed, zfactor) >>> terrain.plot.imshow() """ height, width = agg.shape if full_extent is None: full_extent = (x_range[0], y_range[0], x_range[1], y_range[1]) elif not isinstance(full_extent, (list, tuple)) and len(full_extent) != 4: raise TypeError('full_extent must be tuple(4)') full_xrange = (full_extent[0], full_extent[2]) full_yrange = (full_extent[1], full_extent[3]) x_range_scaled = (_scale(x_range[0], full_xrange, (0.0, 1.0)), _scale(x_range[1], full_xrange, (0.0, 1.0))) y_range_scaled = (_scale(y_range[0], full_yrange, (0.0, 1.0)), _scale(y_range[1], full_yrange, (0.0, 1.0))) mapper = ArrayTypeFunctionMapping( numpy_func=_terrain_numpy, cupy_func=_terrain_cupy, dask_func=_terrain_dask_numpy, dask_cupy_func=lambda *args: not_implemented_func( *args, messages='generate_terrain() does not support dask with cupy backed DataArray' # noqa ) ) out = mapper(agg)(agg.data, seed, x_range_scaled, y_range_scaled, zfactor) canvas = ds.Canvas( plot_width=width, plot_height=height, x_range=x_range, y_range=y_range ) # DataArray coords were coming back different from cvs.points... hack_agg = canvas.points(pd.DataFrame({'x': [], 'y': []}), 'x', 'y') res = get_dataarray_resolution(hack_agg) result = xr.DataArray(out, name=name, coords=hack_agg.coords, dims=hack_agg.dims, attrs={'res': res}) return result