Skip to content



Builder and Runner for serial and parallel SDE integrator runs.

This modules implements fassades for the convenient execution of SDE runs. This is particularly useful for parallel runs based on MPI. The ParallelRunner automatically takes care of all the necessary steps to integrate a large ensemble of trajectories in parallel.

kwargs for the Runners

The IntegratorBuilder provides a very flexible mechanism to provide all necessary arguments for the SDE integrator via **kwargs. These arguments are dynamically scanned for matching patterns within the respective components. This matching excludes the integration schemes, as it is assumed that the scheme is initialized from drift, diffusion, and random increment only. Furthermore, the IntegratorBuilder requires all arguments to be specified, as kwargs, even the optional ones.


Name Description

Builder for the SDE integrator.


Runner for hybrid parallelism with MPI.

pysde.runner.MPI_LOADED module-attribute


Check if mpi4py can be loaded for parallel execution.


Builder for the SDE integrator.

The IntegratorBuilder is a simple wrapper plugging together parameter and components for an SDEIntegrator object.

build_integrator classmethod

build_integrator(drift_function: Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]], diffusion_function: Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]], scheme_type: type[schemes.BaseScheme], increment_type: type[increments.BaseRandomIncrement], storage_type: type[storages.BaseStorage], **kwargs: Any) -> integrator.SDEIntegrator

Assemble an SDEIntegrator object.

The builder automatically checks that the provided kwargs contain all necessary parameters to initialize the scheme, increment, and storage objects.


Name Type Description Default
drift_function Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]]

Drift function of the SDE (c.f. schemes)

diffusion_function Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]]

Diffusion function of the SDE (c.f. schemes)

scheme_type type[schemes.BaseScheme]

Type of the integration scheme

increment_type type[increments.BaseRandomIncrement]

Type of the random increment

storage_type type[storages.BaseStorage]

Type of the storage object

**kwargs dict[str, Any]

Arguments for initializing scheme, increment, and storage


_init_from_kwargs classmethod

_init_from_kwargs(component_type: cotype, **kwargs: Any) -> cotype

Initialize a component from matching parameters in **kwargs dict.


Runner for parallel integration with MPI.

The ParallelRunner object initiates process-parallel integration based on MPI. It is only available if PySDE has been installed with the mpi option, and an MPI executable is available on the system path. The runner utilizes two-level hybrid parallelism. On the process level, it partitions the ensemble of trajectories across the available processes. On the thread level, it executes thread-parallel for loops over the locally assigned trajectories. This makes scaling to large-scale compute clusters easy.

Internally, the parallel runner adjusts the parameter of the integrator run depending on the invoking MPI rank. It then constructs an SDE integrator with the IntegratorBuilder component from the adjusted parameters. The adjustments are the following:

  1. Partition ensemble of initial states equally across processes.
  2. Adjust the random seed for the random increment object (add the rank number).
  3. Adjust the save directory for the storage object (add the rank number).

Execution from the command line

Thread-parallel numba loops do not intelligently recognize the number of available threads in an MPI environment. Available threads have therefore by mapped to the MPI processes explicitly in the invoking mpirun command:

mpirun -n <NUM_RPOCS> --map-by slot:PE=<NUM_THREADS_PER_PROC> python -m mpi4py

Superfluous threads

The MPI schedular might assign to extra threads to the MPI process of rank 0. The NUM_THREADS_PER_PROC parameter should be adjusted accordingly.


Name Description

Run the SDE integration process, return storage object on given MPI rank.


__init__(drift_function: Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]], diffusion_function: Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]], scheme_type: type[schemes.BaseScheme], increment_type: type[increments.BaseRandomIncrement], storage_type: type[storages.BaseStorage], **kwargs: Any) -> None

Initialize the parallel runner.


Name Type Description Default
drift_function Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]]

Drift function of the SDE (c.f. schemes)

diffusion_function Callable[[npt.NDArray[np.floating], Real], npt.NDArray[np.floating]]

Diffusion function of the SDE (c.f. schemes)

scheme_type type[schemes.BaseScheme]

Type of the integration scheme

increment_type type[increments.BaseRandomIncrement]

Type of the random increment

storage_type type[storages.BaseStorage]

Type of the storage object

**kwargs dict[str, Any]

Arguments for initializing scheme, increment, and storage



Type Description

Exception indicating that MPI is required for parallel execution


run(initial_state: Real | npt.NDArray[np.floating], initial_time: Real, step_size: Real, num_steps: Annotated[int, Is[lambda x: x > 0]], progress_bar: bool = False) -> storages.BaseStorage

Run the SDE integration process, return storage object on given MPI rank.


Name Type Description Default
initial_state Real | npt.NDArray[np.floating]

Initial state of the system, given for all trajectories with shape \(d_X \times N\)

initial_time Real

Initial time \(t_0\) of the stochastic process

step_size Real

Discrete step size \(\Delta t\)

num_steps int

Number of steps to integrate

progress_bar bool

Whether to display a progress bar



Type Description

storages.BaseStorage: Storage object containing the SDE trajectory data for the ensemble subset assigned to the local MPI rank


_adjust_process_dependent_parameters(**kwargs: Any) -> dict[str, Any]

Adjust parameters of the integrator run depending on the MPI rank.


_partition_initial_state(initial_state: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]

Partition trajectory ensemble equally among MPI processes.