Skip to content

Storage

storages

Storage objects for large data processing.

For SDE simulations with large numbers of trajectories and time steps, the resulting data often does not fit into RAM. A possible solution is to only store statistics or implement strides. In PySDE, we provide a simple option for actually storing all the data, and process it a-posteriori. This is realized by data containers that automatically flush data from memory to disk. The data storage backend we employ is Zarr, but any other backend can easily be implemented by deriving from the BaseStorage class. The base class constructor also ensures MPI thread-safety via the avoid_race_condition flag. If this option is set, only the root process will create the save directory, and a barrier synchronizes all processes before proceeding.

Classes:

Name Description
BaseStorage

ABC for data storage objects.

NumpyStorage

Storage object with simple Numpy backend.

ZarrStorage

Storage object with Zarr backend and chunkwise saving.

pysde.storages.MPI_LOADED module-attribute

MPI_LOADED: Final = True

Check if mpi4py can be loaded for parallel execution.

pysde.storages.BaseStorage

Bases: ABC

ABC for data storage objects.

This class provides the minimal interface for storage objects. This includes a stride for sample saves and a path to save data on disk to. Automatic flushing to disk is not enforced on the ABC level, but only included in backend-specific implementations. The internal structure of storage objects is very simplistic. It consists of a list of scalars (for time) and a list of numpy arrays (for trajectory data). These lists is appended to when the store method is called. Further processing of the data is backend-specific.

Methods:

Name Description
store

Store current time and trajectory ensemble in memory.

save

Manual call to save in-memory data to disk.

values

Return numpy-like handles to time and trajectory data.

values abstractmethod property

values: Iterable

Return numpy-like handles to time and trajectory data.

Raises:

Type Description
NotImplementedError

Needs to be implemented by subclasses.

__init__

__init__(stride: Annotated[int, Is[lambda x: x > 0]], save_directory: pathlib.Path | None = None, avoid_race_condition: bool = False) -> None

Initialize storage object with a stride and a save directory.

Parameters:

Name Type Description Default
stride int

Stride for saving every \(n-th\) sample

required
save_directory pathlib.Path | None

Path to save data to, on manual call or potentially automatically. Defaults to None.

None
avoid_race_condition bool

Ensure MPI thread safety. Makes only process 0 create a common directory to store results in. Defaults to False.

False

store

store(time: Real, data: npt.NDArray[np.floating], iteration_number: Annotated[int, Is[lambda x: x >= 0]]) -> None

Store current time and trajectory ensenmble in memory, if stride is reached.

Parameters:

Name Type Description Default
time Real

Current value of time variable of the stochastic process

required
data npt.NDArray[np.floating]

Trajectory data of size \(d_X \times N\)

required
iteration_number int

Iteration number of the calling integrator.

required

_create_thread_safe_directory

_create_thread_safe_directory(save_directory: pathlib.Path | None, avoid_race_condition: bool) -> None

Create directory for saving data, potentially in an MPI-safe manner.

save abstractmethod

save() -> None

Manual call to save in-memory data to disk.

Raises:

Type Description
NotImplementedError

Needs to be implemented by subclasses.

pysde.storages.NumpyStorage

Bases: BaseStorage

Storage object with simple Numpy backend.

Data in this storage object is not automatically flushed to disk, but can be stored manually in npz format using the save method.

Methods:

Name Description
save

Stack internal lists to numpy arrays and save to disk in compressed npz format.

values

Stack internal lists to numpy arrays and return time and trajectory arrays.

values property

values: tuple[npt.NDArray[np.floating], npt.NDArray[np.floating]]

Stack internal lists to numpy arrays and return time and trajectory arrays.

For \(M\) saved snapshots, return time array of shape \((M,)\) and data array of shape \((d_X, N, M)\).

save

save() -> None

Stack internal lists to numpy arrays and save to disk in compressed npz format.

For \(M\) saved snapshots, the time array has shape \((M,)\) and the data array has shape \((d_X, N, M)\).

pysde.storages.ZarrStorage

Bases: BaseStorage

Storage object with Zarr backend and chunkwise saving.

Zarr is a powerful storage backend inspired by the HDF5 format. It provides a numpy-like API while storing data on disk in a compressed and chunked format. This storage object saves data automatically to disk in regular intervals, making it suitable for SDE runs with large ensembles and/or long integration times. The data is stored in a Zarr group.

values property

values: tuple[zarr.Array, zarr.Array]

Return Zarr handles to time and trajectory data.

Numpy-like handles to time and trajectory data. For \(M\) saved snapshots, return time array of shape \((M,)\) and data array of shape \((d_X, N, M)\).

Raises:

Type Description
ValueError

Checks that internal storage has been initialized. This is done automatically when the first chunk of data is flushed to disk.

__init__

__init__(stride: Annotated[int, Is[lambda x: x > 0]], chunk_size: Annotated[int, Is[lambda x: x > 0]], save_directory: pathlib.Path) -> None

Initialize Zarr storage object with a stride, chunk size, and save directory.

Parameters:

Name Type Description Default
stride int

Stride for saving every \(n-th\) sample

required
chunk_size int

Number of snapshots to store in memory before flushing to disk

required
save_directory pathlib.Path

Path to save data to

required

store

store(time: Real, data: npt.NDArray[np.floating], iteration_number: Annotated[int, Is[lambda x: x >= 0]]) -> None

Store current time and trajectory ensemble in memory, if stride is reached.

If the local buffer is full, as defined by chunk_size, the data is flushed to disk.

Parameters:

Name Type Description Default
time Real

Current value of time variable of the stochastic process

required
data npt.NDArray[np.floating]

Trajectory data of size \(d_X \times N\)

required
iteration_number int

Iteration number of the calling integrator.

required

save

save() -> None

Flush remaining data to disk and clear the local buffer.

For \(M\) saved snapshots, the time array has shape \((M,)\) and the data array has shape \((d_X, N, M)\).

_save_to_disk

_save_to_disk() -> None

Stack internal lists to numpy arrays and save to Zarr storages on disk.

_init_storage_and_fill

_init_storage_and_fill(time_array: npt.NDArray[np.floating], data_array: npt.NDArray[np.floating]) -> None

Init Zarr storage and fill with initial time and trajectory data.