Skip to content

Runner

runner

Builder and Runner for serial and parallel SDE integrator runs.

This modules implements fassades for the convenient execution of SDE runs. This is particularly useful for parallel runs based on MPI. The ParallelRunner automatically takes care of all the necessary steps to integrate a large ensemble of trajectories in parallel.

kwargs for the Runners

The IntegratorBuilder provides a very flexible mechanism to provide all necessary arguments for the SDE integrator via **kwargs. These arguments are dynamically scanned for matching patterns within the respective components. This matching excludes the integration schemes, as it is assumed that the scheme is initialized from drift, diffusion, and random increment only. Furthermore, the IntegratorBuilder requires all arguments to be specified, as kwargs, even the optional ones.

Classes:

Name Description
IntegratorBuilder

Builder for the SDE integrator.

ParallelRunner

Runner for hybrid parallelism with MPI.

pysde.runner.MPI_LOADED module-attribute

MPI_LOADED = True

Check if mpi4py can be loaded for parallel execution.

pysde.runner.IntegratorBuilder

Builder for the SDE integrator.

The IntegratorBuilder is a simple wrapper plugging together parameter and components for an SDEIntegrator object.

build_integrator classmethod

build_integrator(drift_function: Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]], diffusion_function: Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]], scheme_type: type[schemes.BaseScheme], increment_type: type[increments.BaseRandomIncrement], storage_type: type[storages.BaseStorage], **kwargs: Any) -> integrator.SDEIntegrator

Assemble an SDEIntegrator object.

The builder automatically checks that the provided kwargs contain all necessary parameters to initialize the scheme, increment, and storage objects.

Parameters:

Name Type Description Default
drift_function Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]]

Drift function of the SDE (c.f. schemes)

required
diffusion_function Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]]

Diffusion function of the SDE (c.f. schemes)

required
scheme_type type[schemes.BaseScheme]

Type of the integration scheme

required
increment_type type[increments.BaseRandomIncrement]

Type of the random increment

required
storage_type type[storages.BaseStorage]

Type of the storage object

required
**kwargs dict[str, Any]

Arguments for initializing scheme, increment, and storage

{}

_init_from_kwargs classmethod

_init_from_kwargs(component_type: cotype, **kwargs: Any) -> cotype

Initialize a component from matching parameters in **kwargs dict.

pysde.runner.ParallelRunner

Runner for parallel integration with MPI.

The ParallelRunner object initiates process-parallel integration based on MPI. It is only available if PySDE has been installed with the mpi option, and an MPI executable is available on the system path. The runner utilizes two-level hybrid parallelism. On the process level, it partitions the ensemble of trajectories across the available processes. On the thread level, it executes thread-parallel for loops over the locally assigned trajectories. This makes scaling to large-scale compute clusters easy.

Internally, the parallel runner adjusts the parameter of the integrator run depending on the invoking MPI rank. It then constructs an SDE integrator with the IntegratorBuilder component from the adjusted parameters. The adjustments are the following:

  1. Partition ensemble of initial states equally across processes.
  2. Adjust the random seed for the random increment object (add the rank number).
  3. Adjust the save directory for the storage object (add the rank number).

Execution from the command line

Thread-parallel numba loops do not intelligently recognize the number of available threads in an MPI environment. Available threads have therefore by mapped to the MPI processes explicitly in the invoking mpirun command:

mpirun -n <NUM_RPOCS> --map-by slot:PE=<NUM_THREADS_PER_PROC> python -m mpi4py parallel_runner.py

Superfluous threads

The MPI schedular might assign to extra threads to the MPI process of rank 0. The NUM_THREADS_PER_PROC parameter should be adjusted accordingly.

Methods:

Name Description
run

Run the SDE integration process, return storage object on given MPI rank.

__init__

__init__(drift_function: Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]], diffusion_function: Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]], scheme_type: type[schemes.BaseScheme], increment_type: type[increments.BaseRandomIncrement], storage_type: type[storages.BaseStorage], **kwargs: Any) -> None

Initialize the parallel runner.

Parameters:

Name Type Description Default
drift_function Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]]

Drift function of the SDE (c.f. schemes)

required
diffusion_function Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]]

Diffusion function of the SDE (c.f. schemes)

required
scheme_type type[schemes.BaseScheme]

Type of the integration scheme

required
increment_type type[increments.BaseRandomIncrement]

Type of the random increment

required
storage_type type[storages.BaseStorage]

Type of the storage object

required
**kwargs dict[str, Any]

Arguments for initializing scheme, increment, and storage

{}

Raises:

Type Description
ImportError

Exception indicating that MPI is required for parallel execution

run

run(initial_state: Real | npt.NDArray[np.floating], initial_time: Real, step_size: Real, num_steps: Annotated[int, Is[lambda x: x > 0]], progress_bar: bool = False) -> storages.BaseStorage

Run the SDE integration process, return storage object on given MPI rank.

Parameters:

Name Type Description Default
initial_state Real | npt.NDArray[np.floating]

Initial state of the system, given for all trajectories with shape \(d_X \times N\)

required
initial_time Real

Initial time \(t_0\) of the stochastic process

required
step_size Real

Discrete step size \(\Delta t\)

required
num_steps int

Number of steps to integrate

required
progress_bar bool

Whether to display a progress bar

False

Returns:

Type Description
storages.BaseStorage

storages.BaseStorage: Storage object containing the SDE trajectory data for the ensemble subset assigned to the local MPI rank

_adjust_process_dependent_parameters

_adjust_process_dependent_parameters(**kwargs: Any) -> dict[str, Any]

Adjust parameters of the integrator run depending on the MPI rank.

_partition_initial_state

_partition_initial_state(initial_state: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]

Partition trajectory ensemble equally among MPI processes.