Source code for qutip.solver.integrator.scipy_integrator

"""ODE integrator from scipy."""

__all__ = [
    'IntegratorScipyAdams',
    'IntegratorScipyBDF',
    'IntegratorScipyDop853',
    'IntegratorScipylsoda',
]

import numpy as np
from scipy.integrate import ode
from scipy.integrate._ode import zvode
from qutip.core import data as _data
from qutip.core.data.reshape import column_unstack_dense, column_stack_dense
from ..integrator import IntegratorException, Integrator
from ..solver_base import Solver
import warnings


[docs]class IntegratorScipyAdams(Integrator): """ Integrator using Scipy `ode` with zvode integrator using adams method. Ordinary Differential Equation solver by netlib (https://www.netlib.org/odepack). Usable with ``method="adams"`` """ integrator_options = { 'atol': 1e-8, 'rtol': 1e-6, 'nsteps': 2500, 'order': 12, 'first_step': 0, 'max_step': 0, 'min_step': 0, } support_time_dependant = True supports_blackbox = True method = 'adams' class _zvode(zvode): """Overwrite the scipy's zvode to advance to max to ``t`` with step.""" def step(self, *args): itask = self.call_args[2] self.rwork[0] = args[4] self.call_args[2] = 5 r = self.run(*args) self.call_args[2] = itask return r def _prepare(self): """ Initialize the solver """ self._ode_solver = ode(self._mul_np_vec) self._ode_solver.set_integrator('zvode') self._ode_solver._integrator = self._zvode( method=self.method, **self.options, ) self.name = "scipy zvode " + self.method def _mul_np_vec(self, t, vec): """ Interface between scipy which use numpy and the driver, which use data. """ state = _data.dense.fast_from_numpy(vec) column_unstack_dense(state, self._size, inplace=True) out = self.system.matmul_data(t, state) column_stack_dense(out, inplace=True) return out.as_ndarray().ravel() def set_state(self, t, state0): self._is_set = True self._back = t self._front = t self._mat_state = state0.shape[1] > 1 self._size = state0.shape[0] if self._mat_state: state0 = _data.column_stack(state0) self._ode_solver.set_initial_value(state0.to_array().ravel(), t) def get_state(self, copy=True): if not self._is_set: raise IntegratorException("The state is not initialted") self._check_failed_integration() t = self._ode_solver.t if self._mat_state: state = _data.column_unstack_dense( _data.dense.Dense(self._ode_solver._y, copy=copy), self._size, inplace=True) else: state = _data.dense.Dense(self._ode_solver._y, copy=copy) return t, state def _check_handle(self): """ Do the check for concurrent use of the integrator and reset if used elsewhere. """ integrator = self._ode_solver._integrator if integrator.initialized: if integrator.handle != integrator.__class__.active_global_handle: integrator.reset(len(self._ode_solver._y), False) def integrate(self, t, copy=True): self._check_handle() if t != self._ode_solver.t: self._ode_solver.integrate(t) return self.get_state(copy) def mcstep(self, t, copy=True): # When working with mcstep, we use the dense output feature: # a range in which the state at any time can be computed with # minimal work. We keep track of the range with _back and _front. self._check_handle() if self._ode_solver.t == t: # Exact same `t` as the last call, nothing to do. pass elif self._back > t: # `t` before the range: not supported. raise IntegratorException( f"`t`={t} is behind the integration limit: " f"{t} < {self._back}." ) elif t <= self._front: # In the range, ask for the new state. self._ode_solver.integrate(t) elif self._ode_solver.t < self._front: # `t` after the range but last call (`t_prev`) not at the front. # Advancing the range would make the interval `t_prev`..`_front` # unreachable. Thus advance to _front. self._ode_solver.integrate(self._front) else: # Advance the range. self._back = self._front self._ode_solver.integrate(t, step=True) self._front = self._ode_solver._integrator.rwork[12] return self.get_state(copy) def _check_failed_integration(self): if self._ode_solver.successful(): return messages = { -1: 'Excess work done on this call. Try to increasing ' 'the nsteps parameter in the Options class', -2: 'Excess accuracy requested. (Tolerances too small.)', -3: 'Illegal input detected.', -4: 'Repeated error test failures. (Check all input.)', -5: 'Repeated convergence failures. (Perhaps bad' ' Jacobian supplied or wrong choice of MF or tolerances.)', -6: 'Error weight became zero during problem. (Solution' ' component i vanished, and ATOL or ATOL(i) = 0.)' } raise IntegratorException( messages[self._ode_solver._integrator.istate] ) @property def options(self): """ Supported options by zvode integrator: atol : float, default: 1e-8 Absolute tolerance. rtol : float, default: 1e-6 Relative tolerance. order : int, default: 12, 'adams' or 5, 'bdf' Order of integrator <=12 'adams', <=5 'bdf' nsteps : int, default: 2500 Max. number of internal steps/call. first_step : float, default: 0 Size of initial step (0 = automatic). min_step : float, default: 0 Minimum step size (0 = automatic). max_step : float, default: 0 Maximum step size (0 = automatic) When using pulses, change to half the thinest pulse otherwise it may be skipped. """ return self._options @options.setter def options(self, new_options): Integrator.options.fset(self, new_options)
[docs]class IntegratorScipyBDF(IntegratorScipyAdams): """ Integrator using Scipy `ode` with zvode integrator using bdf method. Ordinary Differential Equation solver by netlib (https://www.netlib.org/odepack). Usable with ``method="bdf"`` """ method = 'bdf' integrator_options = { 'atol': 1e-8, 'rtol': 1e-6, 'nsteps': 2500, 'order': 5, 'first_step': 0, 'max_step': 0, 'min_step': 0, }
[docs]class IntegratorScipyDop853(Integrator): """ Integrator using Scipy `ode` with dop853 integrator. Eight order runge-kutta method by Dormand & Prince. Use fortran implementation from [E. Hairer, S.P. Norsett and G. Wanner, Solving Ordinary Differential Equations i. Nonstiff Problems. 2nd edition. Springer Series in Computational Mathematics, Springer-Verlag (1993)]. Usable with ``method="dop853"`` """ integrator_options = { 'atol': 1e-8, 'rtol': 1e-6, 'nsteps': 2500, 'first_step': 0, 'max_step': 0, 'ifactor': 6.0, 'dfactor': 0.3, 'beta': 0.0, } support_time_dependant = True supports_blackbox = True method = 'dop853' def _prepare(self): """ Initialize the solver """ self._ode_solver = ode(self._mul_np_vec) self._ode_solver.set_integrator('dop853', **self.options) self.name = "scipy ode dop853" def _mul_np_vec(self, t, vec): """ Interface between scipy which use numpy and the driver, which use data. """ state = _data.dense.fast_from_numpy(vec.view(np.complex128)) column_unstack_dense(state, self._size, inplace=True) out = self.system.matmul_data(t, state) column_stack_dense(out, inplace=True) return out.as_ndarray().ravel().view(np.float64) def integrate(self, t, copy=True): if t != self._ode_solver.t: self._ode_solver.integrate(t) return self.get_state(copy) def mcstep(self, t, copy=True): if self._ode_solver.t <= t: # Scipy's DOP853 does not have a step function. # It has a safe step length, but can be 0 if unknown. dt = self._ode_solver._integrator.work[6] if dt: t = min(self._ode_solver.t + dt, t) self._ode_solver.integrate(t) else: # While DOP853 support changing the direction of the integration, # it does not do so efficiently. We do it manually. self._ode_solver._integrator.work[6] *= -1 self._ode_solver.integrate(t) self._ode_solver._integrator.work[6] *= -1 return self.get_state(copy) def get_state(self, copy=True): if not self._is_set: raise IntegratorException("The state is not initialted") self._check_failed_integration() t = self._ode_solver.t if self._mat_state: state = _data.column_unstack_dense( _data.dense.Dense(self._ode_solver._y.view(np.complex128), copy=copy), self._size, inplace=True) else: state = _data.dense.Dense(self._ode_solver._y.view(np.complex128), copy=copy) return t, state def set_state(self, t, state0): self._is_set = True self._mat_state = state0.shape[1] > 1 self._size = state0.shape[0] if self._mat_state: state0 = _data.column_stack(state0) self._ode_solver.set_initial_value( state0.to_array().ravel().view(np.float64), t ) def _check_failed_integration(self): if self._ode_solver.successful(): return messages = { -1: 'input is not consistent', -2: 'larger nsteps is needed, Try to increase the nsteps ' 'parameter in the Options class.', -3: 'step size becomes too small. Try increasing tolerance', -4: 'problem is probably stiff (interrupted), try "bdf" ' 'method instead', } raise IntegratorException( messages[self._ode_solver._integrator.istate] ) @property def options(self): """ Supported options by dop853 integrator: atol : float, default: 1e-8 Absolute tolerance. rtol : float, default: 1e-6 Relative tolerance. nsteps : int, default: 2500 Max. number of internal steps/call. first_step : float, default: 0 Size of initial step (0 = automatic). max_step : float, default: 0 Maximum step size (0 = automatic) ifactor, dfactor : float, default: 6., 0.3 Maximum factor to increase/decrease step size by in one step beta : float, default: 0 Beta parameter for stabilised step size control. See scipy.integrate.ode ode for more detail """ return self._options @options.setter def options(self, new_options): Integrator.options.fset(self, new_options)
[docs]class IntegratorScipylsoda(IntegratorScipyDop853): """ Integrator using Scipy `ode` with lsoda integrator. ODE solver by netlib (https://www.netlib.org/odepack) Automatically choose between 'Adams' and 'BDF' methods to solve both stiff and non-stiff systems. Usable with ``method="lsoda"`` """ integrator_options = { 'atol': 1e-8, 'rtol': 1e-6, 'nsteps': 2500, 'max_order_ns': 12, 'max_order_s': 5, 'first_step': 0.0, 'max_step': 0.0, 'min_step': 0.0, } support_time_dependant = True supports_blackbox = True method = 'lsoda' def _prepare(self): """ Initialize the solver """ self._ode_solver = ode(self._mul_np_vec) self._ode_solver.set_integrator('lsoda', **self.options) self.name = "scipy lsoda" def _check_handle(self): """ Do the check for concurrent use of the integrator and reset if used elsewhere. """ integrator = self._ode_solver._integrator if integrator.initialized: if integrator.handle != integrator.__class__.active_global_handle: integrator.reset(len(self._ode_solver._y), False) def integrate(self, t, copy=True): self._check_handle() return super().integrate(t, copy) def set_state(self, t, state0): self._front = t super().set_state(t, state0) self._back = self.get_state() def mcstep(self, t, copy=True): # This solver support dense output: # a range in which the state at any time can be computed with # minimal work. We keep track of the range with _back[0] and _front. self._check_handle() if self._ode_solver.t == t: # Exact same `t` as the last call, nothing to do. pass elif self._back[0] <= t <= self._front: # In the range, ask for the new state. self._backstep(t) elif self._front < t: # Advance the range. self._one_step(t) else: raise IntegratorException( f"`t`={t} is behind the integration limit: " f"{t} < {self._back[0]}." ) return self.get_state(copy) def _one_step(self, t): """ Advance up to t by one internal solver step. Should be able to retreive the state at any time between the present time and the resulting time using `backstep`. """ # Here we want to advance up to t doing maximum one step. # lsoda officially support step, but sometime it does more work than # needed, so we ask it to advance a fraction of the last step, where it # will advance one internal step of length allowed by the tolerance and # interpolate back to the asked time, effictively getting the single # integration step we want. The first step and abrupt changes in the # `rhs` can cause exceptions to this, but _backstep catch those cases. safe_delta = self._ode_solver._integrator.rwork[11]/100 + 1e-15 t_ode = self._ode_solver.t if t > self._front and t_ode >= self._front: # The state is at self._front, do a step self._back = self.get_state() self._ode_solver.integrate(min(self._front + safe_delta, t)) self._front = self._ode_solver._integrator.rwork[12] # We asked for a fraction of a step, now complete it. self._ode_solver.integrate(min(self._front, t)) elif t > self._front: # The state is at a time before t_front, advance to t_front self._ode_solver.integrate(self._front) def _backstep(self, t): """ Retreive the state at time t, with t smaller than present ODE time. The time t will always be between the last calls of `one_step`. return the pair t, state. """ delta = self._ode_solver._integrator.rwork[11] if t == self._ode_solver.t: pass elif self._front - delta <= t: self._ode_solver.integrate(t) else: self.set_state(*self._back) self._ode_solver.integrate(t) def _check_failed_integration(self): if self._ode_solver.successful(): return messages = { -1: "Excess work done on this call." "Try to increase the nsteps parameter in the Options class.", -2: "Excess accuracy requested (tolerances too small).", -3: "Illegal input detected (internal error).", -4: "Repeated error test failures (internal error).", -5: "Repeated convergence failures " "(perhaps bad Jacobian or tolerances).", -6: "Error weight became zero during problem.", -7: "Internal workspace insufficient to finish (internal error)." } raise IntegratorException( messages[self._ode_solver._integrator.istate] ) @property def options(self): """ Supported options by lsoda integrator: atol : float, default: 1e-8 Absolute tolerance. rtol : float, default: 1e-6 Relative tolerance. nsteps : int, default: 2500 Max. number of internal steps/call. max_order_ns : int, default: 12 Maximum order used in the nonstiff case (<= 12). max_order_s : int, default: 5 Maximum order used in the stiff case (<= 5). first_step : float, default: 0 Size of initial step (0 = automatic). max_step : float, default: 0 Maximum step size (0 = automatic) When using pulses, change to half the thinest pulse otherwise it may be skipped. min_step : float, default: 0 Minimum step size (0 = automatic) """ return self._options @options.setter def options(self, new_options): Integrator.options.fset(self, new_options)
Solver.add_integrator(IntegratorScipyBDF, 'bdf') Solver.add_integrator(IntegratorScipyAdams, 'adams') Solver.add_integrator(IntegratorScipyDop853, 'dop853') Solver.add_integrator(IntegratorScipylsoda, 'lsoda')