import h5py
import sys
import numpy as np
"""
sl files use an lz4 compression filter for which a plugin needs to be installed
Mac:
sudo port install hdf5-lz4-plugin
Ubuntu: (http://gisaxs.com/index.php/HDF5)
sudo add-apt-repository ppa:eugenwintersberger/pni
sudo apt-get update
sudo apt-get install hdf5-plugin-lz4
note that the filter isn't actually registered until the first dataset is access so won't be reported by h5py until after then.
Remember to restart terminal running python
Whilst h5py claims to look in the default directory for filters I had to add the environment variable:
os.environ['HDF5_PLUGIN_PATH'] = "/opt/local/lib/hdf5"
"""
[docs]class slFile():
def __init__(self, input_filename, region_name=""):
self.region_name = region_name
self.load_file(input_filename)
def _check_file_version(self):
self.file_version = self.sl['Version'][0]
print 'sl file version: {}'.format(self.file_version)
if not self.file_version in range(16, 23):
raise ValueError('File version {} out of range.'.format(self.file_version))
def _get_spotlist(self):
### get root groups from input data
if self.region_name == "":
self.spotlist = range(self.sl['SpectraGroups']['InitialMeasurement']['images'].shape[1])
else:
print self.region_name
region_name = self.sl['Regions'].visit(self.find_name)
if region_name == None:
raise ValueError("Requested region {} not found".format(self.region_name))
self.spotlist = self.sl['Regions'][region_name]['SpotList']
self.spotlist = np.asarray(self.spotlist)
def _get_spectragroup(self):
self.initialMeasurement = self.sl['SpectraGroups']['InitialMeasurement']
self.Mzs = np.asarray(self.initialMeasurement['SamplePositions'][
'SamplePositions']) # we don't write this but will use it for peak detection
self.spectra = self.initialMeasurement['spectra']
def _get_coordinates(self):
### Get Coordinates for spotlist
self.coords = np.asarray(self.sl['Registrations']['0']['Coordinates'])
if np.shape(self.coords)[0] != 3:
self.coords = self.coords.T
if np.shape(self.coords)[0] != 3:
raise ValueError('coords second dimension should be 3 {}'.format(np.shape(self.coords)))
[docs] def load_file(self, input_filename):
# get a handle on the file
self.sl = h5py.File(input_filename, 'r') # Readonly, file must exist
self._check_file_version()
self._get_spotlist()
self._get_spectragroup()
self._get_coordinates()
[docs] def get_spectrum(self, index):
intensities = np.asarray(self.spectra[index, :])
return self.Mzs, intensities
[docs] def find_name(self, name):
if 'name' in self.sl['Regions'][name].attrs.keys():
if self.sl['Regions'][name].attrs['name'] == self.region_name:
assert isinstance(name, object)
return name
[docs]def centroid_imzml(input_filename, output_filename, step=[], apodization=False, w_size=10, min_intensity=1e-5,
region_name="", prevent_duplicate_pixels=False):
# write a file to imzml format (centroided)
"""
:type min_intensity: float
"""
from pyimzml.ImzMLWriter import ImzMLWriter
from pyMSpec.centroid_detection import gradient
sl = slFile(input_filename, region_name=region_name)
mz_dtype = sl.Mzs.dtype
int_dtype = sl.get_spectrum(0)[1].dtype
# Convert coords to index -> kinda hacky
coords = np.asarray(sl.coords.copy()).T.round(5)
coords -= np.amin(coords, axis=0)
if step == []: # have a guesss
step = np.array([np.median(np.diff(np.unique(coords[sl.spotlist, i]))) for i in range(3)])
step[np.isnan(step)] = 1
print 'estimated pixel size: {} x {}'.format(step[0], step[1])
coords = coords / np.reshape(step, (3,)).T
coords = coords.round().astype(int)
ncol, nrow, _ = np.amax(coords, axis=0) + 1
print 'new image size: {} x {}'.format(nrow, ncol)
if prevent_duplicate_pixels:
b = np.ascontiguousarray(coords).view(np.dtype((np.void, coords.dtype.itemsize * coords.shape[1])))
_, coord_idx = np.unique(b, return_index=True)
print np.shape(sl.spotlist), np.shape(coord_idx)
print "original number of spectra: {}".format(len(coords))
else:
coord_idx = range(len(coords))
n_total = len(coord_idx)
print 'spectra to write: {}'.format(n_total)
with ImzMLWriter(output_filename, mz_dtype=mz_dtype, intensity_dtype=int_dtype) as imzml:
done = 0
for key in sl.spotlist:
if all((prevent_duplicate_pixels, key not in coord_idx)):# skip duplicate pixels
#print 'skip {}'.format(key)
continue
mzs, intensities = sl.get_spectrum(key)
if apodization:
from pyMSpec import smoothing
# todo - add to processing list in imzml
mzs, intensities = smoothing.apodization(mzs, intensities)
mzs_c, intensities_c, _ = gradient(mzs, intensities, weighted_bins=5, min_intensity=min_intensity)
pos = coords[key]
pos = (pos[0], nrow - 1 - pos[1], pos[2])
imzml.addSpectrum(mzs_c, intensities_c, pos)
done += 1
if done % 1000 == 0:
print "[%s] progress: %.1f%%" % (input_filename, float(done) * 100.0 / n_total)
print "finished!"
[docs]def centroid_IMS(input_filename, output_filename, instrumentInfo={}, sharedDataInfo={}):
from pyMS.centroid_detection import gradient
# write out a IMS_centroid.hdf5 file
sl = slFile(input_filename)
n_total = np.shape(sl.spectra)[0]
with h5py.File(output_filename, 'w') as f_out:
### make root groups for output data
spectral_data = f_out.create_group('spectral_data')
spatial_data = f_out.create_group('spatial_data')
shared_data = f_out.create_group('shared_data')
### populate common variables - can hardcode as I know what these are for h5 data
# parameters
instrument_parameters_1 = shared_data.create_group('instrument_parameters/001')
if instrumentInfo != {}:
for tag in instrumentInfo:
instrument_parameters_1.attrs[tag] = instrumentInfo[tag]
# ROIs
# todo - determine and propagate all ROIs
roi_1 = shared_data.create_group('regions_of_interest/001')
roi_1.attrs['name'] = 'root region'
roi_1.attrs['parent'] = ''
# Sample
sample_1 = shared_data.create_group('samples/001')
if sharedDataInfo != {}:
for tag in sharedDataInfo:
sample_1.attrs[tag] = sharedDataInfo[tag]
done = 0
for key in range(0, n_total):
mzs, intensities = sl.get_spectrum(key)
mzs_c, intensities_c, _ = gradient(mzs, intensities)
this_spectrum = spectral_data.create_group(str(key))
_ = this_spectrum.create_dataset('centroid_mzs', data=np.float32(mzs_c), compression="gzip",
compression_opts=9)
# intensities
_ = this_spectrum.create_dataset('centroid_intensities', data=np.float32(intensities_c), compression="gzip",
compression_opts=9)
# coordinates
_ = this_spectrum.create_dataset('coordinates',
data=(sl.coords[0, key], sl.coords[1, key], sl.coords[2, key]))
## link to shared parameters
# ROI
this_spectrum['ROIs/001'] = h5py.SoftLink('/shared_data/regions_of_interest/001')
# Sample
this_spectrum['samples/001'] = h5py.SoftLink('/shared_data/samples/001')
# Instrument config
this_spectrum['instrument_parameters'] = h5py.SoftLink('/shared_data/instrument_parameters/001')
done += 1
if done % 1000 == 0:
print "[%s] progress: %.1f%%" % (input_filename, float(done) * 100.0 / n_total)
print "finished!"
if __name__ == '__main__':
centroid_imzml(sys.argv[1], sys.argv[1][:-3] + ".imzML")