Source code for gamale

"""
Module for handling galactic magnetic field lenses. The lenses can be created with the lens-factory:
https://git.rwth-aachen.de/astro/lens-factory
"""
import gzip
import os
from struct import pack, unpack

import numpy as np
from scipy import sparse

import astrotools.healpytools as hpt

# python 2/3 compatibility
try:
    basestring
except NameError:     # pragma: no cover
    basestring = str  # pylint: disable=W0622,C0103


[docs]def save_lens_part(mat, fname): """ Save the lens part in scipy.sparse matrix format with shape (npix, npix). :param mat: lens part as scipy.sparse matrix :param fname: file name to save the lens part (either .npz or .mldat) """ if fname.endswith(".npz"): if not isinstance(mat, sparse.csc_matrix): # pragma: no cover try: # this works e.g. for scipy.sparse.lil_matrix mat = mat.tocsc() except AttributeError: raise AttributeError("Data can not be converted into csc format") np.savez(fname, data=mat.data, indices=mat.indices, indptr=mat.indptr, shape=mat.shape) else: if not isinstance(mat, sparse.coo_matrix): # pragma: no cover try: # this works e.g. for scipy.sparse.lil_matrix mat = mat.tocoo() except AttributeError: raise AttributeError("Data can not be converted into csc format") fout = open(fname, 'wb') fout.write(pack('i', mat.nnz)) fout.write(pack('i', mat.shape[0])) fout.write(pack('i', mat.shape[1])) data = np.zeros((mat.nnz,), dtype=np.dtype([('row', 'i4'), ('col', 'i4'), ('data', 'f8')])) data['row'] = mat.row data['col'] = mat.col data['data'] = mat.data data.tofile(fout) fout.close()
[docs]def load_lens_part(fname, transpose=False): """ Load a lens part from the given filename (should be .npz or .mldat). :param fname: file name to save the lens part (either .npz or .mldat) :param transpose: transposes the matrix (for use with NY convention) :return: lens part as scipy.sparse matrix """ if fname.endswith(".npz"): data = np.load(fname) mat = sparse.csc_matrix((data['data'], data['indices'], data['indptr']), shape=data['shape']) if transpose: mat = mat.transpose() return mat zipped = fname.endswith(".gz") if zipped: fin = gzip.open(fname, 'rb') else: fin = open(fname, 'rb') _ = unpack('i', fin.read(4))[0] # Do not delete this line! (Pops first 4 bytes) nrows = unpack('i', fin.read(4))[0] ncols = unpack('i', fin.read(4))[0] if zipped: data = np.frombuffer(fin.read(), dtype=np.dtype([('row', 'i4'), ('col', 'i4'), ('data', 'f8')])) else: data = np.fromfile(fin, dtype=np.dtype([('row', 'i4'), ('col', 'i4'), ('data', 'f8')])) fin.close() if transpose: mat = sparse.coo_matrix((data['data'], (data['row'], data['col'])), shape=(nrows, ncols), dtype='int32').transpose() else: mat = sparse.coo_matrix((data['data'], (data['row'], data['col'])), shape=(nrows, ncols), dtype='float32') return mat.tocsc()
[docs]def mat2nside(mat): """ Calculate nside from a given lenspart matrice. :param mat: lens part as scipy.sparse matrix :return: Healpy nside of the lens part """ nrows, ncols = mat.shape assert nrows == ncols, "Matrix not square %i x %i" % (nrows, ncols) return hpt.npix2nside(nrows)
[docs]def extragalactic_vector(mat, i): """ Return the HEALpix vector of extragalactic directions for a given matrix and observed pixel i. :param mat: lens part as scipy.sparse matrix :param i: pixel of the observed direction :return: extragalactic distribution for observed direction i (HEALpix vector of size npix) """ row = mat.getrow(i) return np.array(row.todense())[0].astype(float)
[docs]def observed_vector(mat, j): """ Return the HEALpix vector of observed directions for a given matrix and extragalactic pixel j. :param mat: lens part as scipy.sparse matrix :param j: pixel of the extragalactic direction :return: observed distribution for extragalactic direction j (HEALpix vector of size npix) """ col = mat.getcol(j) return np.array(col.transpose().todense())[0].astype(float)
[docs]def mean_deflection(mat, skymap=False): """ Computes the mean deflection in radian of the given matrix. :param mat: lens part, scipy sparse matrix with shape (npix, npix) :param skymap: if not False: returns entire skymap of size npix :return: mean deflection in radians """ if not isinstance(mat, sparse.csc_matrix): mat = mat.tocsc() npix = mat.shape[0] nside = hpt.npix2nside(npix) row, col = mat.nonzero() counts = np.squeeze(np.asarray(mat[row, col])) if skymap is False: ang = hpt.angle(nside, row, col) return np.sum(counts * ang) / np.sum(counts) ang = hpt.angle(nside, np.repeat(row, counts), np.repeat(col, counts)) return np.mean(np.reshape(ang, (npix, -1)), -1)
[docs]def flux_map(mat, observed_map=None): """ Computes the flux (transparency) of the galactic magnetic field outside the Galaxy :param mat: lens part, scipy sparse matrix with shape (npix, npix) :param observed_map: HEALPix map of assumed observed map, if None uniform distribution :return: flux map as HEALpix vector of size npix """ observed_map = observed_map if observed_map is not None else np.ones(mat.shape[0]) return mat.transpose().dot(observed_map)
[docs]class Lens: """ Galactic magnetic field lens class. Lenses can be created with the lens-factory tool (requires CRPropa): https://git.rwth-aachen.de/astro/lens-factory We use the following conventions: - the lens maps directions at the galactic border (pointing outwards back to the source) to observed directions on Earth (pointing outwards) - the Galactic coordinate system is used - spherical coordinates are avoided - for each logarithmic energy bin there is a lens part represented by a matrix - energies are given in log10(energy[eV]) - the matrices (lens_parts) are in compressed sparse column format (scipy.sparse.csc) - for each matrix M_ij - the row number i indexes the observed direction - the column number j the direction at the Galactic edge - indices are HEALPix pixel in ring scheme. """ def __init__(self, cfname=None): """ Load and normalize a lens from the given configuration file. Otherwise an empty lens is created. Per default load the lens parts on demand :param cfname: path where the config filename can be found """ self.lens_parts = [] # list of matrices in order of ascending energy self.lens_paths = [] # list of pathes in order of ascending energy self.log10r_mins = [] # lower rigidity bounds of lens (log10(E/Z/[eV])) self.log10r_max = [] # upper rigidity bounds of lens (log10(E/Z/[eV])) self.nside = None # HEALpix nside parameter self.stat = None self.neutral_lens_part = None # matrix for neutral particles self.max_column_sum = None # maximum of column sums of all matrices self.cfname = cfname try: self.ny = False self.load(cfname) except IndexError: self.ny = True self.load_ny(cfname)
[docs] def load(self, cfname): """ Load and configure the lens from a config file (columns: filename minR maxR ...) in order of ascending rigidity. For conventions see: https://git.rwth-aachen.de/astro/lens-factory :param cfname: path where the config filename can be found """ self.cfname = cfname # noinspection PyTypeChecker if not isinstance(cfname, basestring): return dirname = os.path.dirname(cfname) # read cfg header, to find nside and stat with open(cfname) as f: for line in f: if 'nside' in line: self.nside = int(line[1:].split()[2]) # sanity check assert hpt.isnsideok(self.nside), "Healpy nside value from .cfg header not OK." if 'stat' in line: self.stat = int(line[1:].split()[2]) # break condition if nside and stat is extracted or header finished _read = (self.nside is not None) and (self.stat is not None) _finished = ('.npz' in line) or ('.mldat' in line) if _read or _finished: break try: dtype = [('fname', 'S1000'), ('lR0', float), ('lR1', float), ('tol', float), ('MCS', float)] data = np.atleast_1d(np.genfromtxt(cfname, dtype=dtype)) self.max_column_sum = data["MCS"] self.tolerance = data["tol"] except ValueError: # pragma: no cover # Except old lens config format dtype = [('fname', 'S1000'), ('lR0', float), ('lR1', float)] data = np.genfromtxt(cfname, dtype=dtype) self.log10r_mins = data["lR0"] self.log10r_max = data["lR1"] self.dlog10r = (data["lR1"][0] - data["lR0"][0]) / 2. assert np.allclose(data["lR1"], data["lR0"] + 2 * self.dlog10r) self.lens_paths = [os.path.join(dirname, fname.decode('utf-8')) for fname in data["fname"]] self.lens_parts = self.lens_paths[:] # Fill with matrices first when is neeed self.neutral_lens_part = sparse.identity(hpt.nside2npix(self.nside), format='csc')
[docs] def load_ny(self, cfname): """ Load a lens which was not produced with the lenstools (produced in NY by Glennys Farrar and Chen Ding) :param cfname: path where the config filename can be found """ self.cfname = '/net/scratch/auger/lenses/jf12-regular-turbulent-1-30pc-glennys/jf12-regular-turbulent-s1.cfg' # noinspection PyTypeChecker dirname = os.path.dirname(cfname) self.nside = 32 self.stat = 1 self.log10r_bins = np.array([17.4, 17.5, 17.6, 17.7, 17.8, 17.9, 18.0, 18.1, 18.2, 18.3, 18.4, 18.5, 18.6, 18.7, 18.75, 18.8, 18.85, 18.9, 19.0, 19.2, 19.4, 19.5, 19.6, 19.8, 20.0]) rig_bins = np.array([20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 87, 90, 92, 95, 100, 110, 120, 125, 130, 140, 150]) self.lens_paths = [dirname + '/jf12-regular-turbulent-s1-' + '%s' % rig + '.npz' for rig in rig_bins] self.lens_parts = self.lens_paths[:] # Fill with matrices first when is neeed self.neutral_lens_part = sparse.identity(hpt.nside2npix(self.nside), format='csc')
[docs] def check_lens_part(self, lp): """ Perform sanity checks and set HEALpix nside parameter. :param lp: lens part as scipy.sparse matrix (alternative: float for log10(R/V)) """ if isinstance(lp, (int, float)): lp = self.get_lens_part(lp) nside = mat2nside(lp) if self.nside is None: self.nside = nside elif self.nside != nside: raise Exception("Matrix have different HEALpix nside than in .cfg header") stat = int(lp.sum(axis=1).max()) if self.stat is None: self.stat = stat elif self.stat != stat: raise Exception("Matrix have different stat than in .cfg header") return True
[docs] def get_lens_part(self, log10e, z=1, cache=True, force=False): """ Return the matrix corresponding to a given energy log10e [log_10(energy[eV])] and charge number Z :param log10e: energy in units log_10(energy / eV) of the lens part :param z: charge number z of the lens part :param cache: Caches all the loaded lens parts (increases speed, but may consume a lot of memory!) :param force: Forces to take the closest available bin, even if not directly covered :return: the specified lens part as scipy.sparse matrix """ if z == 0: return self.neutral_lens_part if not self.lens_parts: raise Exception("Lens empty. Load a valid config file before usage!") assert isinstance(log10e, (float, int)), "Type of log10e not understood" log10r = log10e - np.log10(z) if not self.ny: if force: log10r_center = self.log10r_mins + self.dlog10r i = np.argmin(np.abs(log10r - log10r_center)) else: log10r_bins = np.append(self.log10r_mins, np.max(self.log10r_max)) i = np.digitize(log10r, log10r_bins) - 1 is_i_in_limits = (i < 0) or (i < len(log10r_bins) - 1) if is_i_in_limits: diff2bin = np.abs(self.log10r_mins[i] + self.dlog10r - log10r) is_close = np.isclose(max(self.dlog10r, diff2bin), self.dlog10r) else: is_close = False if not is_i_in_limits or not is_close: raise ValueError("Rigidity 10^(%.2f - np.log10(%i)) not covered" % (log10r, z)) else: i = np.argmin(np.abs(log10r - self.log10r_bins)) if not force: if log10r < 17.35: raise ValueError("Rigidity 10^(%.2f - np.log10(%i)) not covered" % (log10r, z)) if isinstance(self.lens_parts[i], sparse.csc.csc_matrix): return self.lens_parts[i] if isinstance(self.lens_parts[i], sparse.csr.csr_matrix): return self.lens_parts[i] lp = load_lens_part(self.lens_paths[i], transpose=self.ny) if not self.ny: self.check_lens_part(lp) if cache: self.lens_parts[i] = lp return lp