Source code for obs

"""
Cosmic ray observables
"""
import numpy as np

from astrotools import coord


[docs]def two_pt_auto(v, bins=180, **kwargs): """ Angular two-point auto correlation for a set of directions v. WARNING: Due to the vectorized calculation this function does not work for large numbers of events. :param v: directions, (3 x n) matrix with the rows holding x,y,z :param bins: number of angular bins or np.ndarray with bin in radians :param kwargs: additional named arguments - weights : weights for each event (optional) - cumulative : make cumulative (default=True) - normalized : normalize to 1 (default=False) :return: bincount of size bins (or len(bins)) """ if isinstance(bins, int): bins = np.linspace(0, np.pi, bins+1) n = np.shape(v)[1] idx = np.triu_indices(n, 1) # upper triangle indices without diagonal ang = coord.angle(v, v, each2each=True)[idx] # optional weights w = kwargs.get('weights', None) if w is not None: w = np.outer(w, w)[idx] dig = np.digitize(ang, bins) ac = np.bincount(dig, minlength=len(bins) + 1, weights=w) ac = ac.astype(float)[1:-1] # convert to float and remove overflow bins if kwargs.get("cumulative", True): ac = np.cumsum(ac) if kwargs.get("normalized", False): if w is not None: ac /= sum(w) else: ac /= (n ** 2 - n) / 2 return ac
[docs]def two_pt_cross(v1, v2, bins=180, **kwargs): """ Angular two-point cross correlation for two sets of directions v1, v2. WARNING: Due to the vectorized calculation this function does not work for large numbers of events. :param v1: directions, (3 x n1) matrix with the rows holding x,y,z :param v2: directions, (3 x n2) matrix with the rows holding x,y,z :param bins: number of angular bins or np.ndarray with bin in radians :param kwargs: additional named arguments - weights1, weights2: weights for each event (optional) - cumulative: make cumulative (default=True) - normalized: normalize to 1 (default=False) :return: bincount of size bins (or len(bins)) """ if isinstance(bins, int): bins = np.linspace(0, np.pi, bins+1) ang = coord.angle(v1, v2, each2each=True).flatten() dig = np.digitize(ang, bins) # optional weights w1 = kwargs.get('weights1', None) w2 = kwargs.get('weights2', None) if (w1 is not None) and (w2 is not None): w = np.outer(w1, w2).flatten() else: w = None cc = np.bincount(dig, minlength=len(bins) + 1, weights=w) cc = cc.astype(float)[1:-1] if kwargs.get("cumulative", True): cc = np.cumsum(cc) if kwargs.get("normalized", False): if w is not None: cc /= sum(w) else: n1 = np.shape(v1)[1] n2 = np.shape(v2)[1] cc /= n1 * n2 return cc
# noinspection PyTypeChecker
[docs]def thrust(p, weights=None, minimize_distances=None, ntry=1000): """ Thrust observable for an array (n x 3) of 3-momenta. Returns 3 values (thrust, thrust major, thrust minor) and the corresponding axes. :param p: 3-momenta, (3 x n) matrix with the columns holding px, py, pz :param weights: (optional) weights for each event, e.g. 1/exposure (1 x n) :param minimize_distances: Instead of maximizing the distances to n3, this will miniminze the distances to n2 axis :param ntry: number of samples for the brute force computation of thrust major :return: tuple consisting of the following values - thrust, thrust major, thrust minor (shape: (3)) - thrust axis, thrust major axis, thrust minor axis (shape: (3, 3)) """ # optional weights p = (p * weights) if weights is not None else p # thrust n1 = np.sum(p, axis=1) n1 /= np.linalg.norm(n1) t1 = np.sum(abs(np.dot(n1, p))) # thrust major, brute force calculation _, ep, et = coord.sph_unit_vectors(*coord.vec2ang(n1)) alpha = np.linspace(0, np.pi, ntry) n23_try = np.outer(np.cos(alpha), et) + np.outer(np.sin(alpha), ep) t23_try = np.sum(abs(np.dot(n23_try, p)), axis=1) if minimize_distances is None: i = np.argmax(t23_try) # maximize distances to n3 axis n2 = n23_try[i] n3 = np.cross(n1, n2) t2, t3 = t23_try[i], np.sum(abs(np.dot(n3, p))) else: i = np.argmin(t23_try) # minimize distances to n1 axis n3 = n23_try[i] n2 = np.cross(n3, n1) t2, t3 = t23_try[i], np.sum(abs(np.dot(n2, p))) # normalize sum_p = np.sum(np.sum(p ** 2, axis=0) ** .5) t = np.array((t1, t2, t3)) / sum_p n = np.array((n1, n2, n3)) return t, n
[docs]def energy_energy_correlation(vec, energy, vec_roi, alpha_max=0.25, nbins=10, **kwargs): """ Calculates the Energy-Energy-Correlation (EEC) of a given dataset for a given ROI. :param vec: arrival directions of CR events shape=(3, n) :param energy: energies of CR events in [EeV] :param vec_roi: position of ROI center shape=(3,) :param alpha_max: radial extend of ROI in radians :param nbins: number of angular bins in ROI :param kwargs: additional named arguments - bin_type: indicates if binning is linear in alpha ('lin') or with equal area covered per bin ('area') - e_ref: indicates if the 'mean' or the 'median' is taken for the average energy - ref_mode: indicates if 'e_ref' is taken within 'bin' or the whole 'roi' - verbose: if False, dont print warnings :return: omega_mean: mean values of EEC (size: nbins) :return: alpha_bins: angular binning (size: nbins) :return: ncr_bin: average number of CR in each angular bin (size: nbins) """ assert len(vec_roi) == 3 bins = np.arange(nbins+1).astype(np.float) if kwargs.get("bin_type", 'area') == 'lin': alpha_bins = alpha_max * bins / nbins else: alpha_bins = 2 * np.arcsin(np.sqrt(bins/nbins) * np.sin(alpha_max/2)) # angular distances to Center of each ROI dist_to_rois = coord.angle(vec_roi, vec, each2each=True) # instantiate output arrays omega_ij_list = [np.array([])] * nbins omega = np.zeros(nbins) ncr_roi_bin = np.zeros(nbins) # select CRs inside ROI mask_in_roi = dist_to_rois[0] < alpha_max # type: np.ndarray ncr = int(vec[:, mask_in_roi].shape[1]) e_cr = energy[mask_in_roi] # indices of angular bin for each CR alpha_cr = dist_to_rois[0, mask_in_roi] idx_cr = np.digitize(alpha_cr, alpha_bins) - 1 # energy reference mean roi-wise if kwargs.get("ref_mode", 'bin') == 'roi': e_ref_roi = getattr(np, kwargs.get("e_ref", 'mean'))(e_cr) e_ref = np.repeat(e_ref_roi, nbins) # energy reference mean bin-wise else: e_ref = np.zeros(nbins) for i in range(nbins): mask_bin = idx_cr == i # type: np.ndarray if np.sum(mask_bin) > 0: e_ref[i] = getattr(np, kwargs.get("e_ref", 'mean'))(e_cr[mask_bin]) # Omega_ij for each pair of CRs in whole ROI omega_matrix = (np.array([e_cr]) - np.array([e_ref[idx_cr]])) / np.array([e_cr]) omega_ij = omega_matrix * omega_matrix.T # sort Omega_ij into respective angular bins for i in range(nbins): ncr_roi_bin[i] = len(alpha_cr[idx_cr == i]) mask_bin = (np.repeat(idx_cr, ncr).reshape((ncr, ncr)) == i) * (np.identity(ncr) == 0) omega_ij_list[i] = np.append(omega_ij_list[i], omega_ij[mask_bin]) if (len(omega_ij_list[i]) == 0) and (kwargs.get('verbose', True)): print('Warning: Binning in dalpha is too small; no cosmic rays in bin %i.' % i) continue omega[i] = np.mean(omega_ij_list[i]) return omega, alpha_bins, ncr_roi_bin