Storage¶
storages
¶
Storage objects for large data processing.
For SDE simulations with large numbers of trajectories and time steps, the resulting data often
does not fit into RAM. A possible solution is to only store statistics or implement strides. In
PySDE, we provide a simple option for actually storing all the data, and process it a-posteriori.
This is realized by data containers that automatically flush data from memory to disk. The data
storage backend we employ is Zarr, but any other backend
can easily be implemented by deriving from the BaseStorage
class.
The base class constructor also ensures MPI thread-safety via the avoid_race_condition
flag.
If this option is set, only the root process will create the save directory, and a barrier
synchronizes all processes before proceeding.
Classes:
Name | Description |
---|---|
BaseStorage |
ABC for data storage objects. |
NumpyStorage |
Storage object with simple Numpy backend. |
ZarrStorage |
Storage object with Zarr backend and chunkwise saving. |
pysde.storages.MPI_LOADED
module-attribute
¶
Check if mpi4py can be loaded for parallel execution.
pysde.storages.BaseStorage
¶
Bases: ABC
ABC for data storage objects.
This class provides the minimal interface for storage objects. This includes a stride for sample
saves and a path to save data on disk to. Automatic flushing to disk is not enforced on the
ABC level, but only included in backend-specific implementations. The internal structure of
storage objects is very simplistic. It consists of a list of scalars (for time) and a list of
numpy arrays (for trajectory data). These lists is appended to when the
store
method is called. Further processing of the data is
backend-specific.
Methods:
Name | Description |
---|---|
store |
Store current time and trajectory ensemble in memory. |
save |
Manual call to save in-memory data to disk. |
values |
Return numpy-like handles to time and trajectory data. |
values
abstractmethod
property
¶
Return numpy-like handles to time and trajectory data.
Raises:
Type | Description |
---|---|
NotImplementedError
|
Needs to be implemented by subclasses. |
__init__
¶
__init__(stride: Annotated[int, Is[lambda x: x > 0]], save_directory: pathlib.Path | None = None, avoid_race_condition: bool = False) -> None
Initialize storage object with a stride and a save directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stride
|
int
|
Stride for saving every \(n-th\) sample |
required |
save_directory
|
pathlib.Path | None
|
Path to save data to, on manual call or potentially automatically. Defaults to None. |
None
|
avoid_race_condition
|
bool
|
Ensure MPI thread safety. Makes only process 0 create a common directory to store results in. Defaults to False. |
False
|
store
¶
store(time: Real, data: npt.NDArray[np.floating], iteration_number: Annotated[int, Is[lambda x: x >= 0]]) -> None
Store current time and trajectory ensenmble in memory, if stride is reached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
time
|
Real
|
Current value of time variable of the stochastic process |
required |
data
|
npt.NDArray[np.floating]
|
Trajectory data of size \(d_X \times N\) |
required |
iteration_number
|
int
|
Iteration number of the calling integrator. |
required |
_create_thread_safe_directory
¶
_create_thread_safe_directory(save_directory: pathlib.Path | None, avoid_race_condition: bool) -> None
Create directory for saving data, potentially in an MPI-safe manner.
save
abstractmethod
¶
Manual call to save in-memory data to disk.
Raises:
Type | Description |
---|---|
NotImplementedError
|
Needs to be implemented by subclasses. |
pysde.storages.NumpyStorage
¶
Bases: BaseStorage
Storage object with simple Numpy backend.
Data in this storage object is not automatically flushed to disk, but can be stored manually
in npz
format using the save
method.
Methods:
Name | Description |
---|---|
save |
Stack internal lists to numpy arrays and save to disk in compressed |
values |
Stack internal lists to numpy arrays and return time and trajectory arrays. |
values
property
¶
Stack internal lists to numpy arrays and return time and trajectory arrays.
For \(M\) saved snapshots, return time array of shape \((M,)\) and data array of shape \((d_X, N, M)\).
save
¶
Stack internal lists to numpy arrays and save to disk in compressed npz
format.
For \(M\) saved snapshots, the time array has shape \((M,)\) and the data array has shape \((d_X, N, M)\).
pysde.storages.ZarrStorage
¶
Bases: BaseStorage
Storage object with Zarr backend and chunkwise saving.
Zarr is a powerful storage backend inspired by the HDF5 format. It provides a numpy-like API while storing data on disk in a compressed and chunked format. This storage object saves data automatically to disk in regular intervals, making it suitable for SDE runs with large ensembles and/or long integration times. The data is stored in a Zarr group.
values
property
¶
Return Zarr handles to time and trajectory data.
Numpy-like handles to time and trajectory data. For \(M\) saved snapshots, return time array of shape \((M,)\) and data array of shape \((d_X, N, M)\).
Raises:
Type | Description |
---|---|
ValueError
|
Checks that internal storage has been initialized. This is done automatically when the first chunk of data is flushed to disk. |
__init__
¶
__init__(stride: Annotated[int, Is[lambda x: x > 0]], chunk_size: Annotated[int, Is[lambda x: x > 0]], save_directory: pathlib.Path) -> None
Initialize Zarr storage object with a stride, chunk size, and save directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stride
|
int
|
Stride for saving every \(n-th\) sample |
required |
chunk_size
|
int
|
Number of snapshots to store in memory before flushing to disk |
required |
save_directory
|
pathlib.Path
|
Path to save data to |
required |
store
¶
store(time: Real, data: npt.NDArray[np.floating], iteration_number: Annotated[int, Is[lambda x: x >= 0]]) -> None
Store current time and trajectory ensemble in memory, if stride is reached.
If the local buffer is full, as defined by chunk_size
, the data is flushed to disk.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
time
|
Real
|
Current value of time variable of the stochastic process |
required |
data
|
npt.NDArray[np.floating]
|
Trajectory data of size \(d_X \times N\) |
required |
iteration_number
|
int
|
Iteration number of the calling integrator. |
required |
save
¶
Flush remaining data to disk and clear the local buffer.
For \(M\) saved snapshots, the time array has shape \((M,)\) and the data array has shape \((d_X, N, M)\).
_save_to_disk
¶
Stack internal lists to numpy arrays and save to Zarr storages on disk.
_init_storage_and_fill
¶
_init_storage_and_fill(time_array: npt.NDArray[np.floating], data_array: npt.NDArray[np.floating]) -> None
Init Zarr storage and fill with initial time and trajectory data.