Runner¶
runner
¶
Builder and Runner for serial and parallel SDE integrator runs.
This modules implements fassades for the convenient execution of SDE runs. This is particularly useful for parallel runs based on MPI. The ParallelRunner automatically takes care of all the necessary steps to integrate a large ensemble of trajectories in parallel.
kwargs for the Runners
The IntegratorBuilder
provides a very flexible mechanism to
provide all necessary arguments for the SDE integrator via **kwargs
. These arguments are
dynamically scanned for matching patterns within the respective components. This matching
excludes the integration schemes, as it is assumed that the scheme is initialized from drift,
diffusion, and random increment only. Furthermore, the IntegratorBuilder
requires all
arguments to be specified, as kwargs, even the optional ones.
Classes:
Name | Description |
---|---|
IntegratorBuilder |
Builder for the SDE integrator. |
ParallelRunner |
Runner for hybrid parallelism with MPI. |
pysde.runner.MPI_LOADED
module-attribute
¶
Check if mpi4py can be loaded for parallel execution.
pysde.runner.IntegratorBuilder
¶
Builder for the SDE integrator.
The IntegratorBuilder
is a simple wrapper plugging together parameter and components for an
SDEIntegrator
object.
build_integrator
classmethod
¶
build_integrator(drift_function: Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]], diffusion_function: Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]], scheme_type: type[schemes.BaseScheme], increment_type: type[increments.BaseRandomIncrement], storage_type: type[storages.BaseStorage], **kwargs: Any) -> integrator.SDEIntegrator
Assemble an SDEIntegrator
object.
The builder automatically checks that the provided kwargs contain all necessary parameters to initialize the scheme, increment, and storage objects.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
drift_function
|
Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]]
|
Drift function of the SDE (c.f. |
required |
diffusion_function
|
Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]]
|
Diffusion function of the SDE (c.f. |
required |
scheme_type
|
type[schemes.BaseScheme]
|
Type of the integration scheme |
required |
increment_type
|
type[increments.BaseRandomIncrement]
|
Type of the random increment |
required |
storage_type
|
type[storages.BaseStorage]
|
Type of the storage object |
required |
**kwargs
|
dict[str, Any]
|
Arguments for initializing scheme, increment, and storage |
{}
|
_init_from_kwargs
classmethod
¶
Initialize a component from matching parameters in **kwargs
dict.
pysde.runner.ParallelRunner
¶
Runner for parallel integration with MPI.
The ParallelRunner
object initiates process-parallel integration based on MPI. It is only
available if PySDE has been installed with the mpi
option, and an MPI executable is available
on the system path. The runner utilizes two-level hybrid parallelism. On the process level, it
partitions the ensemble of trajectories across the available processes. On the thread level, it
executes thread-parallel for loops over the locally assigned trajectories. This makes scaling
to large-scale compute clusters easy.
Internally, the parallel runner adjusts the parameter of the integrator run depending on the invoking MPI rank. It then constructs an SDE integrator with the IntegratorBuilder component from the adjusted parameters. The adjustments are the following:
- Partition ensemble of initial states equally across processes.
- Adjust the random seed for the random increment object (add the rank number).
- Adjust the save directory for the storage object (add the rank number).
Execution from the command line
Thread-parallel numba loops do not intelligently recognize the number of available threads
in an MPI environment. Available threads have therefore by mapped to the MPI processes
explicitly in the invoking mpirun
command:
Superfluous threads
The MPI schedular might assign to extra threads to the MPI process of rank 0. The
NUM_THREADS_PER_PROC
parameter should be adjusted accordingly.
Methods:
Name | Description |
---|---|
run |
Run the SDE integration process, return storage object on given MPI rank. |
__init__
¶
__init__(drift_function: Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]], diffusion_function: Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]], scheme_type: type[schemes.BaseScheme], increment_type: type[increments.BaseRandomIncrement], storage_type: type[storages.BaseStorage], **kwargs: Any) -> None
Initialize the parallel runner.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
drift_function
|
Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]]
|
Drift function of the SDE (c.f. |
required |
diffusion_function
|
Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]]
|
Diffusion function of the SDE (c.f. |
required |
scheme_type
|
type[schemes.BaseScheme]
|
Type of the integration scheme |
required |
increment_type
|
type[increments.BaseRandomIncrement]
|
Type of the random increment |
required |
storage_type
|
type[storages.BaseStorage]
|
Type of the storage object |
required |
**kwargs
|
dict[str, Any]
|
Arguments for initializing scheme, increment, and storage |
{}
|
Raises:
Type | Description |
---|---|
ImportError
|
Exception indicating that MPI is required for parallel execution |
run
¶
run(initial_state: Real | npt.NDArray[np.floating], initial_time: Real, step_size: Real, num_steps: Annotated[int, Is[lambda x: x > 0]], progress_bar: bool = False) -> storages.BaseStorage
Run the SDE integration process, return storage object on given MPI rank.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
initial_state
|
Real | npt.NDArray[np.floating]
|
Initial state of the system, given for all trajectories with shape \(d_X \times N\) |
required |
initial_time
|
Real
|
Initial time \(t_0\) of the stochastic process |
required |
step_size
|
Real
|
Discrete step size \(\Delta t\) |
required |
num_steps
|
int
|
Number of steps to integrate |
required |
progress_bar
|
bool
|
Whether to display a progress bar |
False
|
Returns:
Type | Description |
---|---|
storages.BaseStorage
|
storages.BaseStorage: Storage object containing the SDE trajectory data for the ensemble subset assigned to the local MPI rank |
_adjust_process_dependent_parameters
¶
Adjust parameters of the integrator run depending on the MPI rank.
_partition_initial_state
¶
Partition trajectory ensemble equally among MPI processes.