import h5py
import subprocess
import shutil
import numpy as np
import tempfile
import os
[docs]
def load_complete_file(f):
"""
Read all the information in an HDF5 file or group into
a nested dictionary.
Using this on large files will quickly run out of memory!
Only use it on small test data.
Parameters
----------
f: h5py.File or h5py.Group
The file or group to be walked through
Returns
-------
output: dict
Nested dictionary with all file content.
"""
output = {}
# This function is applied recursively
def visit(name, value):
paths = name.split("/")
out = output
for p in paths[:-1]:
out = out[p]
if isinstance(value, h5py.Group):
out[paths[-1]] = {}
d = dict(value.attrs)
if d:
out[paths[-1]]["attrs"] = d
else:
out[paths[-1]] = value[:]
f.visititems(visit)
return output
[docs]
def repack(filename):
"""
In-place HDF5 repack operation on file.
"""
with tempfile.TemporaryDirectory() as tmpdir:
tmp_name = os.path.join(tmpdir, os.path.basename(filename))
subprocess.check_call(f"h5repack {filename} {tmp_name}", shell=True)
shutil.move(tmp_name, filename)
[docs]
def create_dataset_early_allocated(group, name, size, dtype):
"""
Create an HdF5 dataset, allocating the full space for it at the start of the process.
This can make it faster to write data incrementally from multiple processes.
The dataset is also not pre-filled, saving more time.
Parameters
----------
group: h5py.Group
the parent for the dataset
name: str
name for the new dataset
size: int
The size of the new data set (which must be 1D)
dtype: str
Data type, One of f4, f8, i4, i8
"""
# create a data-space object, which describes the dimensions of the dataset
space_id = h5py.h5s.create_simple((size,))
# Create and fill a property list describing options
# which apply to the data set.
plist = h5py.h5p.create(h5py.h5p.DATASET_CREATE)
plist.set_fill_time(h5py.h5d.FILL_TIME_NEVER)
plist.set_alloc_time(h5py.h5d.ALLOC_TIME_EARLY)
dtype = {
"f8": h5py.h5t.NATIVE_DOUBLE,
"f4": h5py.h5t.NATIVE_FLOAT,
"i4": h5py.h5t.NATIVE_INT32,
"i8": h5py.h5t.NATIVE_INT64,
}[dtype]
datasetid = h5py.h5d.create(group.id, name.encode("ascii"), dtype, space_id, plist)
data_set = h5py.Dataset(datasetid)
return data_set
[docs]
class BatchWriter:
"""
This class is designed to batch together writes to
an HDF5 file to minimize the contention when many
processes are writing to a file at the same time
using MPI
"""
def __init__(self, group, col_dtypes, offset, max_size=1_000_000):
self.group = group
self.index = 0
self.written_index = 0
self.offset = offset
self.max_size = max_size
self.cols = list(col_dtypes.keys())
self.data = {name: np.empty(max_size, dtype=dtype) for name, dtype in col_dtypes.items()}
def write(self, **data):
n = None
# check all the lengths are the same
for name, values in data.items():
n1 = len(values)
if (n is not None) and (n1 != n):
raise ValueError("Different length cols passed to Batchwriter.write")
n = n1
if n == 0:
return
# range of our output to write
s = 0
e = min(n, self.max_size - self.index)
while e - s > 0:
d = {name: col[s:e] for name, col in data.items()}
self._batch_chunk(d, e - s)
s = e
e = min(n, s + self.max_size - self.index)
def _batch_chunk(self, data, n):
s = self.index
e = s + n
for name, out_col in self.data.items():
col = data[name]
out_col[s:e] = col[:n]
self.index = e
if e == self.max_size:
self._write()
self.index = 0
def _write(self):
s_in = 0
e_in = self.index
# number to write in this block
n = e_in - s_in
s_out = self.written_index
e_out = s_out + n
for name, col in self.data.items():
self.group[name][s_out + self.offset : e_out + self.offset] = col[s_in:e_in]
self.written_index = e_out
def finish(self):
self._write()
[docs]
def h5py_shorten(group, name, n):
"""
Trim an HDF5 column down to length n.
Parameters
----------
group: h5py.Group
name: str
n: int
"""
# just some random suffix
tmp_name = name + "_tmp_5kb04scgllj6khsd3286ij"
group[tmp_name] = group[name][:n]
del group[name]
group[name] = group[tmp_name]
del group[tmp_name]