"""
Extensions to healpy that covers:
-> pixel, vector, angular transformations
-> drawing vectors uniformly in pixel
-> various probability density functions (exposure, fisher, dipole)
"""
import healpy as hp
# noinspection PyUnresolvedReferences
from healpy import * # keep namespace of healpy
import numpy as np
from astrotools import coord
[docs]def rand_pix_from_map(healpy_map, n=1):
"""
Draw n random pixels from a HEALpix map.
:param healpy_map: healpix map (not necessarily normalized)
:param n: number of pixels that are drawn from the map
:return: an array of pixels with size n, that are drawn from the map
"""
p = np.cumsum(healpy_map)
return p.searchsorted(np.random.rand(n) * p[-1])
[docs]def rand_vec_from_map(healpy_map, n=1, nest=False):
"""
Draw n random vectors from a HEALpix map.
:param healpy_map: healpix map (not necessarily normalized)
:param n: number of pixels that are drawn from the map
:param nest: set True in case you work with healpy's nested scheme
:return: an array of vectors with size n, that are drawn from the map
"""
pix = rand_pix_from_map(healpy_map, n)
nside = hp.get_nside(healpy_map)
return rand_vec_in_pix(nside, pix, nest)
[docs]def rand_vec_in_pix(nside, ipix, nest=False):
"""
Draw vectors from a uniform distribution within a HEALpixel.
:param nside: nside of the healpy pixelization
:param ipix: pixel number(s)
:param nest: set True in case you work with healpy's nested scheme
:return: vectors containing events from the pixel(s) specified in ipix
"""
if not nest:
ipix = hp.ring2nest(nside, ipix=ipix)
n_order = nside2norder(nside)
n_up = 29 - n_order
i_up = ipix * 4 ** n_up
i_up += np.random.randint(0, 4 ** n_up, size=np.size(ipix))
v = pix2vec(nside=2 ** 29, ipix=i_up, nest=True)
return np.array(v)
[docs]def rand_exposure_vec_in_pix(nside, ipix, a0=-35.25, zmax=60, coord_system='gal', deviation=0.5, nest=False):
"""
Draw vectors from a distribution within a HEALpixel that follow the exposure
distribution within the pixel. It is much slower than rand_vec_in_pix() and
should therefore only be used for problematic pixels (close to zero exposure).
:param nside: nside of the healpy pixelization
:param ipix: pixel number(s)
:param a0: latitude of detector (-90, 90) in degrees (default: Auger)
:param zmax: maximum acceptance zenith angle (0, 90) degrees
:param coord_system: choose between different coordinate systems - gal, eq, sgal, ecl
:param deviation: maximum relative deviation between exposure values in pixel corners
:param nest: set True in case you work with healpy's nested scheme
:return: vectors containing events from the pixel(s) specified in ipix
"""
ipix = np.atleast_1d(ipix)
vecs = np.zeros((3, ipix.size))
mask = check_problematic_pixel(nside, ipix, a0, zmax, deviation)
vecs[:, ~mask] = rand_vec_in_pix(nside, ipix[~mask], nest)
if not nest:
ipix = hp.ring2nest(nside, ipix=ipix)
for pix in np.unique(ipix[mask]):
n = np.sum(ipix == pix)
# increase resolution of healpy schemes cooresponding to number of crs per pixel
n_up = max(3, int(np.ceil(np.log10(10*n) / np.log10(4))))
pix_new = pix * 4 ** n_up + np.arange(4 ** n_up)
v = pix2vec(nside=nside * 2**n_up, ipix=pix_new, nest=True)
if coord_system != 'eq':
v = getattr(coord, '%s2eq' % coord_system)(v)
p = coord.exposure_equatorial(coord.vec2ang(v)[1], a0, zmax)
pixel = np.random.choice(pix_new, size=n, replace=False, p=p/np.sum(p))
vecs[:, ipix == pix] = pix2vec(nside=nside * 2**n_up, ipix=pixel, nest=True)
return np.array(vecs)
[docs]def check_problematic_pixel(nside, ipix, a0, zmax, deviation=0.5, coord_system='gal'):
"""
Checks input pixel for exposure deviation within the corner points from more than certain
threshold (default: 0.5).
:param nside: nside of the healpy pixelization
:param ipix: pixel number(s)
:param a0: latitude of detector (-90, 90) in degrees (default: Auger)
:param zmax: maximum acceptance zenith angle (0, 90) degrees
:param deviation: maximum deviation between exposure values in pixel corners
:param coord_system: choose between different coordinate systems - gal, eq, sgal, ecl
"""
npix = hp.nside2npix(nside)
v = np.swapaxes(hp.boundaries(nside, np.arange(npix), step=1), 0, 1).reshape(3, -1)
if coord_system != 'eq':
v = getattr(coord, '%s2eq' % coord_system)(v)
# exposure values of corner points
exposure = coord.exposure_equatorial(coord.vec2ang(v)[1], a0, zmax).reshape((npix, 4))
# check for maximal deviation of corner points
_min, _max = np.min(exposure, axis=-1), np.max(exposure, axis=-1)
mask = _max > 0
eps = np.min(_min[_min > 0]) / 2.
_min[_min < eps] = eps
mask = mask * (_max / _min > (1 + deviation))
return mask[ipix]
[docs]def pix2map(nside, ipix):
"""
Converts healpy pixel to healpix map
:param nside: nside of the healpy pixelization
:param ipix: pixel number(s) in nside scheme
:return: healpix map of size npix
"""
npix = hp.nside2npix(nside)
ipix = np.atleast_1d(ipix)
assert (ipix < npix).all(), "Nside does not match the given pixel(s)!"
return np.bincount(ipix, minlength=npix)
[docs]def pix2vec(nside, ipix, nest=False):
"""
Convert HEALpixel ipix to cartesian vector
Substitutes hp.pix2vec
:param nside: nside of the healpy pixelization
:param ipix: pixel number(s)
:param nest: set True in case you work with healpy's nested scheme
:return: vector of the pixel center(s)
"""
v = hp.pix2vec(nside, ipix, nest=nest)
return np.asarray(v)
[docs]def pix2ang(nside, ipix, nest=False):
"""
Convert HEALpixel ipix to spherical angles (astrotools definition)
Substitutes hp.pix2ang
:param nside: nside of the healpy pixelization
:param ipix: pixel number(s)
:param nest: set True in case you work with healpy's nested scheme
:return: angles (phi, theta) in astrotools definition
"""
v = pix2vec(nside, ipix, nest=nest)
phi, theta = coord.vec2ang(v)
return phi, theta
[docs]def ang2vec(phi, theta):
"""
Substitutes healpy.ang2vec() to use our angle convention
:param phi: longitude, range (pi, -pi), 0 points in x-direction, pi/2 in y-direction
:param theta: latitude, range (pi/2, -pi/2), pi/2 points in z-direction
:return: vector of shape (3, n)
"""
return coord.ang2vec(phi, theta)
[docs]def ang2pix(nside, phi, theta, nest=False):
"""
Convert spherical angle (astrotools definition) to HEALpixel ipix
Substitutes hp.ang2pix
:param nside: nside of the healpy pixelization
:param phi: longitude in astrotools definition
:param theta: latitude in astrotools definition
:param nest: set True in case you work with healpy's nested scheme
:return: pixel number(s)
"""
v = ang2vec(phi, theta)
# mimic ValueError behavior from healpys function ang2pix()
if not np.all(hp.isnsideok(nside)):
raise ValueError("%s is not a valid nside parameter (must be a power of 2, less than 2**30)" % str(nside))
ipix = hp.vec2pix(nside, *v, nest=nest)
return ipix
[docs]def vec2ang(v):
"""
Substitutes healpy.vec2ang() to use our angle convention.
:param v: vector(s) of shape (3, n)
:return: tuple consisting of
- phi [range (pi, -pi), 0 points in x-direction, pi/2 in y-direction],
- theta [range (pi/2, -pi/2), pi/2 points in z-direction]
"""
return coord.vec2ang(v)
[docs]def vec2pix(nside, v, y=None, z=None, nest=False):
"""
Convert HEALpixel ipix to spherical angles (astrotools definition)
Substitutes hp.vec2pix
:param nside: nside of the healpy pixelization
:param v: either (x, y, z) vector of the pixel center(s) or only x-coordinate
:param y: y-coordinate(s) of the center
:param z: z-coordinate(s) of the center
:param nest: set True in case you work with healpy's nested scheme
:return: vector of the pixel center(s)
"""
if y is None and z is None:
v, y, z = v
ipix = hp.vec2pix(nside, v, y, z, nest=nest)
return ipix
[docs]def query_disc(nside, v, radius, nest=False, **kwargs):
"""
Substitutes hp.query_disc but supports also pixel number input
:param nside: nside of the healpy pixelization
:param v: either (x, y, z) vector of the pixel center or healpy pixel
:param radius: radius of disk in radians
:param nest: set True in case you work with healpy's nested scheme
:return: pixels thay lie within radius from center v
"""
if isinstance(v, (int, np.int32, np.int64)):
v = pix2vec(nside, v, nest=nest)
return hp.query_disc(nside, v, radius, nest=nest, **kwargs)
[docs]def angle(nside, ipix, jpix, nest=False, each2each=False):
"""
Give the angular distance between two pixel arrays.
:param nside: nside of the healpy pixelization
:param ipix: healpy pixel i (either int or array like int)
:param jpix: healpy pixel j (either int or array like int)
:param nest: use the nesting scheme of healpy
:param each2each: if true, calculates every combination of the two lists v1, v2
:return: angular distance in radians
"""
v1 = pix2vec(nside, ipix, nest)
v2 = pix2vec(nside, jpix, nest)
return coord.angle(v1, v2, each2each=each2each)
[docs]def rotate_map(healpy_map, rotation_axis, rotation_angle):
"""
Perform rotation of healpy map for given rotation axis and angle(s).
:param healpy_map: healpix map to be rotated
:param rotation_axis: rotation axis, either np.array([x, y, z]) or ndarray with shape (3, n)
:param rotation_angle: rotation angle in radians, either float or array size n
:return: rotated healpy map, same shape as input healpy map or shape (n, npix)
"""
rotation_axis = coord.atleast_kd(rotation_axis, 2)
nside, npix = hp.get_nside(healpy_map), len(healpy_map)
n_ang, n_ax = np.size(rotation_angle), np.shape(rotation_axis)[-1]
assert (n_ang == n_ax) or (np.min([n_ang, n_ax]) == 1), "Rotation axes and angles dimensions not compatible."
n = np.max([n_ang, n_ax])
rotation_vectors = pix2vec(nside, np.arange(npix))
if n > 1:
rotation_vectors = np.tile(rotation_vectors, n)
rotation_axis = np.repeat(rotation_axis, npix*((n_ax == 1) * n + (n_ax > 1)), axis=1)
rotation_angle = np.repeat(rotation_angle, npix*((n_ang == 1) * n + (n_ang > 1)))
_vecs = coord.rotate(rotation_vectors, rotation_axis, -rotation_angle)
_phi, _theta = vec2ang(np.squeeze(_vecs.reshape((3, -1, npix))))
return hp.get_interp_val(healpy_map, np.pi / 2. - _theta, _phi)
[docs]def norder2npix(norder):
"""
Give the number of pixel for the given HEALpix order.
:param norder: norder of the healpy pixelization
:return: npix, number of pixels of the healpy pixelization
"""
return 12 * 4 ** norder
[docs]def npix2norder(npix):
# pylint: disable = no-member
"""
Give the HEALpix order for the given number of pixel.
:param npix: number of pixels of the healpy pixelization
:return: norder, norder of the healpy pixelization
"""
norder = np.log(npix / 12) / np.log(4)
if not (norder.is_integer()):
raise ValueError('Wrong pixel number (it is not 12*4**norder)')
return int(norder)
[docs]def norder2nside(norder):
"""
Give the HEALpix nside parameter for the given HEALpix order.
:param norder: norder of the healpy pixelization
:return: nside, nside of the healpy pixelization
"""
return 2 ** norder
[docs]def nside2norder(nside):
# pylint: disable = no-member
"""
Give the HEALpix order for the given HEALpix nside parameter.
:param nside: nside of the healpy pixelization
:return: norder, norder of the healpy pixelization
"""
norder = np.log2(nside)
if not (norder.is_integer()):
raise ValueError('Wrong nside number (it is not 2**norder)')
return int(norder)
[docs]def statistic(nside, v, y=None, z=None, statistics='count', vals=None):
"""
Create HEALpix map of count, frequency or mean or rms value.
:param nside: nside of the healpy pixelization
:param v: either (x, y, z) vector of the pixel center(s) or only x-coordinate
:param y: y-coordinate(s) of the center
:param z: z-coordinate(s) of the center
:param statistics: keywords 'count', 'frequency', 'mean' or 'rms' possible
:param vals: values (array like) for which the mean or rms is calculated
:return: either count, frequency, mean or rms maps
"""
npix = hp.nside2npix(nside)
pix = vec2pix(nside, v, y, z)
n_map = np.bincount(pix, minlength=npix)
if statistics == 'count':
v_map = n_map.astype('float')
elif statistics == 'frequency':
v_map = n_map.astype('float')
v_map /= max(n_map) # frequency [0,1]
elif statistics == 'mean':
if vals is None:
raise ValueError
v_map = np.bincount(pix, weights=vals, minlength=npix)
v_map /= n_map # mean
elif statistics == 'rms':
if vals is None:
raise ValueError
v_map = np.bincount(pix, weights=vals ** 2, minlength=npix)
v_map = (v_map / n_map) ** .5 # rms
else:
raise NotImplementedError("Unknown keyword")
return v_map
[docs]def skymap_mean_quantile(skymap, quantile=0.68):
"""
Calculates mean direction and quantile from an arbitrary healpy skymap
:param skymap: healpy skymap of valid healpy size
:param quantile: quantile for which the scatter angle is calculated
:return: either count, frequency, mean or rms maps
"""
npix = len(skymap)
nside = hp.npix2nside(npix)
norm = np.sum(skymap)
assert norm > 0
skymap /= norm
# calculate mean vector
vecs = pix2vec(nside, range(npix))
vec_mean = np.sum(vecs * skymap[np.newaxis], axis=1)
vec_mean /= np.sqrt(np.sum(vec_mean**2))
# calculate alpha
alpha = np.arccos(np.sum(vecs * vec_mean[:, np.newaxis], axis=0))
srt = np.argsort(alpha)
idx = np.searchsorted(np.cumsum(skymap[srt]), quantile)
alpha_quantile = alpha[srt][idx]
return vec_mean, alpha_quantile
[docs]def exposure_pdf(nside=64, a0=-35.25, zmax=60, coord_system='gal', pdf=True):
"""
Exposure (type: density function) of an experiment located at equatorial
declination a0 and measuring events with zenith angles up to zmax degrees.
:param nside: nside of the output healpy map
:param a0: equatorial declination [deg] of the experiment (default: AUGER, a0=-35.25 deg)
:param zmax: maximum zenith angle [deg] for the events
:param coord_system: choose between different coordinate systems - gal, eq, sgal, ecl
:param pdf: if false, return relative exposure
:return: weights of the exposure map
"""
npix = hp.nside2npix(nside)
v = pix2vec(nside, range(npix))
if coord_system != 'eq':
v = getattr(coord, '%s2eq' % coord_system)(v)
_, theta = vec2ang(v)
exposure = coord.exposure_equatorial(theta, a0, zmax)
if pdf is True:
exposure /= exposure.sum()
return exposure
[docs]def fisher_pdf(nside, v, y=None, z=None, k=None, threshold=4, sparse=False, pdf=True):
"""
Probability density function of a fisher distribution of healpy pixels with mean direction (x, y, z) and
concentration parameter kappa; normalized to 1.
:param nside: nside of the healpy map
:param v: either (x, y, z) vector of the center or only x-coordinate
:param y: y-coordinate of the center
:param z: z-coordinate of the center
:param k: kappa for the fisher distribution, 1 / sigma**2
:param threshold: Threshold in sigma up to where the distribution should be calculated
:param sparse: returns the map in the form (pixels, weights); this may be meaningfull for small distributions
:param pdf: if false, return values with maximum 1
:return: pixels, weights at pixels if sparse else a full map with length nside2npix(nside)
"""
assert k is not None, "Concentration parameter 'k' for fisher_pdf() must be set!"
sigma = 1. / np.sqrt(k) # in radians
if (y is not None) and (z is not None):
v = np.array([v, y, z])
elif (y is None and z is not None) or (z is None and y is not None):
raise ValueError("Either 'y' and 'z' are set to None or both are not None. No mixture allowed.")
v = np.squeeze(v)
assert v.shape[0] == 3, "Input vector v does not have proper shape (3, ...)"
# if alpha_max is larger than a reasonable np.pi then query disk takes care of using only
# np.pi as maximum range.
if isinstance(k, (float, int)):
alpha_max = threshold * sigma
else: # type array
alpha_max = threshold * np.amax(sigma)
pixels = np.array(hp.query_disc(nside, v, alpha_max)) # shape(pixels, )
if len(pixels) == 0:
# If sigma is too small, the pixel sequence will be empty
pixels = np.array([vec2pix(nside, v)])
weights = np.array([1.])
else:
pvec = pix2vec(nside, pixels) # shape(3, pixels)
length = np.sum(v ** 2, axis=0) ** 0.5 # shape()
d = np.sum(v[:, np.newaxis] * pvec, axis=0) / length # shape(pixels,)
# for large values of kappa exp(k * d) goes to infinity which is meaningless. So we use the trick to write:
# exp(k * d) = exp(k * d + k - k) = exp(k) * exp(k * (d-1)). As we normalize the function to one in the end,
# we can leave out the first factor exp(k)
weights = np.exp(k * (d - 1)) if k > 30 else np.exp(k * d) #shape(pixels,)
weights = weights / np.sum(weights) if pdf is True else weights / np.max(weights)
if sparse:
return pixels, weights
fisher_map = np.zeros(hp.nside2npix(nside))
fisher_map[pixels] = weights
return fisher_map
[docs]def dipole_pdf(nside, a, v, y=None, z=None, pdf=True):
"""
Probability density function of a dipole. Returns 1 + a * cos(theta) for all pixels in hp.nside2npix(nside).
:param nside: nside of the healpy map
:param a: amplitude of the dipole, 0 <= a <= 1, automatically clipped
:param v: either (x, y, z) vector of the pixel center(s) or only x-coordinate
:param y: y-coordinate(s) of the center
:param z: z-coordinate(s) of the center
:param pdf: if false, return unnormalized map (modulation around 1)
:return: weights
"""
assert 0 <= a <= 1, "Dipole amplitude must be between 0 and 1"
a = np.clip(a, 0., 1.)
if y is not None and z is not None:
v = np.array([v, y, z], dtype=np.float)
# normalize to one
direction = v / np.sqrt(np.sum(v ** 2))
npix = hp.nside2npix(nside)
v = np.array(pix2vec(nside, np.arange(npix)))
cos_angle = np.sum(v.T * direction, axis=1)
dipole_map = 1 + a * cos_angle
if pdf is True:
dipole_map /= np.sum(dipole_map)
return dipole_map