__all__ = ['mcsolve', "MCSolver"]
import numpy as np
from ..core import QobjEvo, spre, spost, Qobj, unstack_columns
from .multitraj import MultiTrajSolver, _MultiTrajRHS
from .solver_base import Solver, Integrator, _solver_deprecation
from .result import McResult, McTrajectoryResult, McResultImprovedSampling
from .mesolve import mesolve, MESolver
from ._feedback import _QobjFeedback, _DataFeedback, _CollapseFeedback
import qutip.core.data as _data
from time import time
[docs]def mcsolve(H, state, tlist, c_ops=(), e_ops=None, ntraj=500, *,
args=None, options=None, seeds=None, target_tol=None, timeout=None,
**kwargs):
r"""
Monte Carlo evolution of a state vector :math:`|\psi \rangle` for a
given Hamiltonian and sets of collapse operators. Options for the
underlying ODE solver are given by the Options class.
Parameters
----------
H : :class:`.Qobj`, :class:`.QobjEvo`, ``list``, callable.
System Hamiltonian as a Qobj, QobjEvo. It can also be any input type
that QobjEvo accepts (see :class:`.QobjEvo`'s documentation).
``H`` can also be a superoperator (liouvillian) if some collapse
operators are to be treated deterministically.
state : :class:`.Qobj`
Initial state vector.
tlist : array_like
Times at which results are recorded.
c_ops : list
A ``list`` of collapse operators in any input type that QobjEvo accepts
(see :class:`.QobjEvo`'s documentation). They must be operators
even if ``H`` is a superoperator. If none are given, the solver will
defer to ``sesolve`` or ``mesolve``.
e_ops : list, optional
A ``list`` of operator as Qobj, QobjEvo or callable with signature of
(t, state: Qobj) for calculating expectation values. When no ``e_ops``
are given, the solver will default to save the states.
ntraj : int, default: 500
Maximum number of trajectories to run. Can be cut short if a time limit
is passed with the ``timeout`` keyword or if the target tolerance is
reached, see ``target_tol``.
args : dict, optional
Arguments for time-dependent Hamiltonian and collapse operator terms.
options : dict, optional
Dictionary of options for the solver.
- | store_final_state : bool
| Whether or not to store the final state of the evolution in the
result class.
- | store_states : bool, None
| Whether or not to store the state vectors or density matrices.
On `None` the states will be saved if no expectation operators are
given.
- | progress_bar : str {'text', 'enhanced', 'tqdm', ''}
| How to present the solver progress.
'tqdm' uses the python module of the same name and raise an error
if not installed. Empty string or False will disable the bar.
- | progress_kwargs : dict
| kwargs to pass to the progress_bar. Qutip's bars use `chunk_size`.
- | method : str ["adams", "bdf", "lsoda", "dop853", "vern9", etc.]
| Which differential equation integration method to use.
- | atol, rtol : float
| Absolute and relative tolerance of the ODE integrator.
- | nsteps : int
| Maximum number of (internally defined) steps allowed in one
``tlist`` step.
- | max_step : float
| Maximum length of one internal step. When using pulses, it should
be less than half the width of the thinnest pulse.
- | keep_runs_results : bool, [False]
| Whether to store results from all trajectories or just store the
averages.
- | map : str {"serial", "parallel", "loky", "mpi"}
| How to run the trajectories. "parallel" uses the multiprocessing
module to run in parallel while "loky" and "mpi" use the "loky" and
"mpi4py" modules to do so.
- | num_cpus : int
| Number of cpus to use when running in parallel. ``None`` detect the
number of available cpus.
- | norm_t_tol, norm_tol, norm_steps : float, float, int
| Parameters used to find the collapse location. ``norm_t_tol`` and
``norm_tol`` are the tolerance in time and norm respectively.
An error will be raised if the collapse could not be found within
``norm_steps`` tries.
- | mc_corr_eps : float
| Small number used to detect non-physical collapse caused by
numerical imprecision.
- | improved_sampling : Bool
| Whether to use the improved sampling algorithm from Abdelhafez et
al. PRA (2019)
Additional options are listed under
`options <./classes.html#qutip.solver.mcsolve.MCSolver.options>`__.
More options may be available depending on the selected
differential equation integration method, see
`Integrator <./classes.html#classes-ode>`_.
seeds : int, SeedSequence, list, optional
Seed for the random number generator. It can be a single seed used to
spawn seeds for each trajectory or a list of seeds, one for each
trajectory. Seeds are saved in the result and they can be reused with::
seeds=prev_result.seeds
target_tol : float, tuple, list, optional
Target tolerance of the evolution. The evolution will compute
trajectories until the error on the expectation values is lower than
this tolerance. The maximum number of trajectories employed is
given by ``ntraj``. The error is computed using jackknife resampling.
``target_tol`` can be an absolute tolerance or a pair of absolute and
relative tolerance, in that order. Lastly, it can be a list of pairs of
(atol, rtol) for each e_ops.
timeout : float, optional
Maximum time for the evolution in second. When reached, no more
trajectories will be computed.
Returns
-------
results : :class:`.McResult`
Object storing all results from the simulation. Which results is saved
depends on the presence of ``e_ops`` and the options used. ``collapse``
and ``photocurrent`` is available to Monte Carlo simulation results.
Notes
-----
The simulation will end when the first end condition is reached between
``ntraj``, ``timeout`` and ``target_tol``.
"""
options = _solver_deprecation(kwargs, options, "mc")
H = QobjEvo(H, args=args, tlist=tlist)
if not isinstance(c_ops, (list, tuple)):
c_ops = [c_ops]
c_ops = [QobjEvo(c_op, args=args, tlist=tlist) for c_op in c_ops]
if len(c_ops) == 0:
if options is None:
options = {}
options = {
key: options[key]
for key in options
if key in MESolver.solver_options
}
return mesolve(H, state, tlist, e_ops=e_ops, args=args,
options=options)
if isinstance(ntraj, (list, tuple)):
raise TypeError(
"ntraj must be an integer. "
"A list of numbers is not longer supported."
)
mc = MCSolver(H, c_ops, options=options)
result = mc.run(state, tlist=tlist, ntraj=ntraj, e_ops=e_ops,
seeds=seeds, target_tol=target_tol, timeout=timeout)
return result
class _MCRHS(_MultiTrajRHS):
"""
Container for the operators of the solver.
"""
def __init__(self, H, c_ops, n_ops):
self.rhs = H
self.c_ops = c_ops
self.n_ops = n_ops
def __call__(self):
return self.rhs
def arguments(self, args):
self.rhs.arguments(args)
for c_op in self.c_ops:
c_op.arguments(args)
for n_op in self.n_ops:
n_op.arguments(args)
def _register_feedback(self, key, val):
self.rhs._register_feedback({key: val}, solver="McSolver")
for c_op in self.c_ops:
c_op._register_feedback({key: val}, solver="McSolver")
for n_op in self.n_ops:
n_op._register_feedback({key: val}, solver="McSolver")
class MCIntegrator:
"""
Integrator like object for mcsolve trajectory.
"""
name = "mcsolve"
def __init__(self, integrator, system, options=None):
self._integrator = integrator
self.system = system
self._c_ops = system.c_ops
self._n_ops = system.n_ops
self.options = options
self._generator = None
self.method = f"{self.name} {self._integrator.method}"
self._is_set = False
self.issuper = self._c_ops[0].issuper
def set_state(self, t, state0, generator,
no_jump=False, jump_prob_floor=0.0):
"""
Set the state of the ODE solver.
Parameters
----------
t : float
Initial time
state0 : qutip.Data
Initial state.
generator : numpy.random.generator
Random number generator.
no_jump: Bool
whether or not to sample the no-jump trajectory.
If so, the "random number" should be set to zero
jump_prob_floor: float
if no_jump == False, this is set to the no-jump
probability. This setting ensures that we sample
a trajectory with jumps
"""
self.collapses = []
self.system._register_feedback("CollapseFeedback", self.collapses)
self._generator = generator
if no_jump:
self.target_norm = 0.0
else:
self.target_norm = (
self._generator.random() * (1 - jump_prob_floor)
+ jump_prob_floor
)
self._integrator.set_state(t, state0)
self._is_set = True
def get_state(self, copy=True):
return *self._integrator.get_state(copy), self._generator
def integrate(self, t, copy=False):
t_old, y_old = self._integrator.get_state(copy=False)
norm_old = self._prob_func(y_old)
while t_old < t:
t_step, state = self._integrator.mcstep(t, copy=False)
norm = self._prob_func(state)
if norm <= self.target_norm:
t_col, state = self._find_collapse_time(norm_old, norm,
t_old, t_step)
self._do_collapse(t_col, state)
t_old, y_old = self._integrator.get_state(copy=False)
norm_old = 1.
else:
t_old, y_old = t_step, state
norm_old = norm
return t_old, _data.mul(y_old, 1 / self._norm_func(y_old))
def run(self, tlist):
for t in tlist[1:]:
yield self.integrate(t, False)
def reset(self, hard=False):
self._integrator.reset(hard)
def _prob_func(self, state):
if self.issuper:
return _data.trace_oper_ket(state).real
return _data.norm.l2(state)**2
def _norm_func(self, state):
if self.issuper:
return _data.trace_oper_ket(state).real
return _data.norm.l2(state)
def _find_collapse_time(self, norm_old, norm, t_prev, t_final):
"""Find the time of the collapse and state just before it."""
tries = 0
while tries < self.options['norm_steps']:
tries += 1
if (t_final - t_prev) < self.options['norm_t_tol']:
t_guess = t_final
_, state = self._integrator.get_state()
break
t_guess = (
t_prev
+ ((t_final - t_prev)
* np.log(norm_old / self.target_norm)
/ np.log(norm_old / norm))
)
if (t_guess - t_prev) < self.options['norm_t_tol']:
t_guess = t_prev + self.options['norm_t_tol']
_, state = self._integrator.mcstep(t_guess, copy=False)
norm2_guess = self._prob_func(state)
if (
np.abs(self.target_norm - norm2_guess) <
self.options['norm_tol'] * self.target_norm
):
break
elif (norm2_guess < self.target_norm):
# t_guess is still > t_jump
t_final = t_guess
norm = norm2_guess
else:
# t_guess < t_jump
t_prev = t_guess
norm_old = norm2_guess
if tries >= self.options['norm_steps']:
raise RuntimeError(
"Could not find the collapse time within desired tolerance. "
"Increase accuracy of the ODE solver or lower the tolerance "
"with the options 'norm_steps', 'norm_tol', 'norm_t_tol'.")
return t_guess, state
def _do_collapse(self, collapse_time, state):
"""
Do the collapse:
- Find which operator did the collapse.
- Update the state and Integrator.
- Next collapse norm location
- Store collapse info.
"""
# collapse_time, state is at the collapse
if len(self._n_ops) == 1:
which = 0
else:
probs = np.zeros(len(self._n_ops))
for i, n_op in enumerate(self._n_ops):
probs[i] = n_op.expect_data(collapse_time, state).real
probs = np.cumsum(probs)
which = np.searchsorted(probs,
probs[-1] * self._generator.random())
state_new = self._c_ops[which].matmul_data(collapse_time, state)
new_norm = self._norm_func(state_new)
if new_norm < self.options['mc_corr_eps']:
# This happen when the collapse is caused by numerical error
state_new = _data.mul(state, 1 / self._norm_func(state))
else:
state_new = _data.mul(state_new, 1 / new_norm)
self.collapses.append((collapse_time, which))
# this does not need to be modified for improved sampling:
# as noted in Abdelhafez PRA (2019),
# after a jump we reset to the full range [0, 1)
self.target_norm = self._generator.random()
self._integrator.set_state(collapse_time, state_new)
def arguments(self, args):
if args:
self._integrator.arguments(args)
for c_op in self._c_ops:
c_op.arguments(args)
for n_op in self._n_ops:
n_op.arguments(args)
@property
def integrator_options(self):
return self._integrator.integrator_options
# -----------------------------------------------------------------------------
# MONTE CARLO CLASS
# -----------------------------------------------------------------------------
[docs]class MCSolver(MultiTrajSolver):
r"""
Monte Carlo Solver of a state vector :math:`|\psi \rangle` for a
given Hamiltonian and sets of collapse operators. Options for the
underlying ODE solver are given by the Options class.
Parameters
----------
H : :class:`.Qobj`, :class:`.QobjEvo`, list, callable.
System Hamiltonian as a Qobj, QobjEvo. It can also be any input type
that QobjEvo accepts (see :class:`.QobjEvo`'s documentation).
``H`` can also be a superoperator (liouvillian) if some collapse
operators are to be treated deterministically.
c_ops : list
A ``list`` of collapse operators in any input type that QobjEvo accepts
(see :class:`.QobjEvo`'s documentation). They must be operators
even if ``H`` is a superoperator.
options : dict, [optional]
Options for the evolution.
"""
name = "mcsolve"
_trajectory_resultclass = McTrajectoryResult
_mc_integrator_class = MCIntegrator
solver_options = {
"progress_bar": "text",
"progress_kwargs": {"chunk_size": 10},
"store_final_state": False,
"store_states": None,
"keep_runs_results": False,
"map": "serial",
"mpi_options": {},
"num_cpus": None,
"bitgenerator": None,
"method": "adams",
"mc_corr_eps": 1e-10,
"norm_steps": 5,
"norm_t_tol": 1e-6,
"norm_tol": 1e-4,
"improved_sampling": False,
}
def __init__(self, H, c_ops, *, options=None):
_time_start = time()
if isinstance(c_ops, (Qobj, QobjEvo)):
c_ops = [c_ops]
c_ops = [QobjEvo(c_op) for c_op in c_ops]
if H.issuper:
self._c_ops = [
spre(c_op) * spost(c_op.dag()) if c_op.isoper else c_op
for c_op in c_ops
]
self._n_ops = self._c_ops
rhs = QobjEvo(H)
for c_op in c_ops:
cdc = c_op.dag() @ c_op
rhs -= 0.5 * (spre(cdc) + spost(cdc))
else:
self._c_ops = c_ops
self._n_ops = [c_op.dag() * c_op for c_op in c_ops]
rhs = -1j * QobjEvo(H)
for n_op in self._n_ops:
rhs -= 0.5 * n_op
self._num_collapse = len(self._c_ops)
self.options = options
system = _MCRHS(rhs, self._c_ops, self._n_ops)
super().__init__(system, options=options)
def _restore_state(self, data, *, copy=True):
"""
Retore the Qobj state from its data.
"""
# Duplicated from the Solver class, but removed the check for the
# normalize_output option, since MCSolver doesn't have that option.
if self._state_metadata['dims'] == self.rhs._dims[1]:
state = Qobj(unstack_columns(data),
**self._state_metadata, copy=False)
else:
state = Qobj(data, **self._state_metadata, copy=copy)
return state
def _initialize_stats(self):
stats = super()._initialize_stats()
stats.update({
"method": self.options["method"],
"solver": "Master Equation Evolution",
"num_collapse": self._num_collapse,
})
return stats
def _run_one_traj(self, seed, state, tlist, e_ops, **integrator_kwargs):
"""
Run one trajectory and return the result.
"""
seed, result = super()._run_one_traj(seed, state, tlist, e_ops,
**integrator_kwargs)
result.collapse = self._integrator.collapses
return seed, result
[docs] def run(self, state, tlist, ntraj=1, *,
args=None, e_ops=(), timeout=None, target_tol=None, seeds=None):
# Overridden to sample the no-jump trajectory first. Then, the no-jump
# probability is used as a lower-bound for random numbers in future
# monte carlo runs
if not self.options.get("improved_sampling", False):
return super().run(state, tlist, ntraj=ntraj, args=args,
e_ops=e_ops, timeout=timeout,
target_tol=target_tol, seeds=seeds)
stats, seeds, result, map_func, map_kw, state0 = self._initialize_run(
state,
ntraj,
args=args,
e_ops=e_ops,
timeout=timeout,
target_tol=target_tol,
seeds=seeds,
)
# first run the no-jump trajectory
start_time = time()
seed0, no_jump_result = self._run_one_traj(seeds[0], state0, tlist,
e_ops, no_jump=True)
_, state, _ = self._integrator.get_state(copy=False)
no_jump_prob = self._integrator._prob_func(state)
result.no_jump_prob = no_jump_prob
result.add((seed0, no_jump_result))
result.stats['no jump run time'] = time() - start_time
# run the remaining trajectories with the random number floor
# set to the no jump probability such that we only sample
# trajectories with jumps
start_time = time()
map_func(
self._run_one_traj, seeds[1:],
task_args=(state0, tlist, e_ops),
task_kwargs={'no_jump': False, 'jump_prob_floor': no_jump_prob},
reduce_func=result.add, map_kw=map_kw,
progress_bar=self.options["progress_bar"],
progress_bar_kwargs=self.options["progress_kwargs"]
)
result.stats['run time'] = time() - start_time
return result
def _get_integrator(self):
_time_start = time()
method = self.options["method"]
if method in self.avail_integrators():
integrator = self.avail_integrators()[method]
elif issubclass(method, Integrator):
integrator = method
else:
raise ValueError("Integrator method not supported.")
integrator_instance = integrator(self.rhs(), self.options)
mc_integrator = self._mc_integrator_class(
integrator_instance, self.rhs, self.options
)
self._init_integrator_time = time() - _time_start
return mc_integrator
@property
def _resultclass(self):
if self.options.get("improved_sampling", False):
return McResultImprovedSampling
else:
return McResult
@property
def options(self):
"""
Options for monte carlo solver:
store_final_state: bool, default: False
Whether or not to store the final state of the evolution in the
result class.
store_states: bool, default: None
Whether or not to store the state vectors or density matrices.
On `None` the states will be saved if no expectation operators are
given.
progress_bar: str {'text', 'enhanced', 'tqdm', ''}, default: "text"
How to present the solver progress.
'tqdm' uses the python module of the same name and raise an error
if not installed. Empty string or False will disable the bar.
progress_kwargs: dict, default: {"chunk_size":10}
Arguments to pass to the progress_bar. Qutip's bars use
``chunk_size``.
keep_runs_results: bool, default: False
Whether to store results from all trajectories or just store the
averages.
method: str, default: "adams"
Which differential equation integration method to use.
map: str {"serial", "parallel", "loky", "mpi"}, default: "serial"
How to run the trajectories. "parallel" uses the multiprocessing
module to run in parallel while "loky" and "mpi" use the "loky" and
"mpi4py" modules to do so.
mpi_options: dict, default: {}
Only applies if map is "mpi". This dictionary will be passed as
keyword arguments to the `mpi4py.futures.MPIPoolExecutor`
constructor. Note that the `max_workers` argument is provided
separately through the `num_cpus` option.
num_cpus: None, int
Number of cpus to use when running in parallel. ``None`` detect the
number of available cpus.
bitgenerator: {None, "MT19937", "PCG64", "PCG64DXSM", ...}
Which of numpy.random's bitgenerator to use. With ``None``, your
numpy version's default is used.
mc_corr_eps: float, default: 1e-10
Small number used to detect non-physical collapse caused by
numerical imprecision.
norm_t_tol: float, default: 1e-6
Tolerance in time used when finding the collapse.
norm_tol: float, default: 1e-4
Tolerance in norm used when finding the collapse.
norm_steps: int, default: 5
Maximum number of tries to find the collapse.
improved_sampling: Bool, default: False
Whether to use the improved sampling algorithm
of Abdelhafez et al. PRA (2019)
"""
return self._options
@options.setter
def options(self, new_options):
MultiTrajSolver.options.fset(self, new_options)
@classmethod
def avail_integrators(cls):
if cls is Solver:
return cls._avail_integrators.copy()
return {
**Solver.avail_integrators(),
**cls._avail_integrators,
}
[docs] @classmethod
def CollapseFeedback(cls, default=None):
"""
Collapse of the trajectory argument for time dependent systems.
When used as an args:
``QobjEvo([op, func], args={"cols": MCSolver.CollapseFeedback()})``
The ``func`` will receive a list of ``(time, operator number)`` for
each collapses of the trajectory as ``cols``.
.. note::
CollapseFeedback can't be added to a running solver when updating
arguments between steps: ``solver.step(..., args={})``.
Parameters
----------
default : callable, default : []
Default function used outside the solver.
"""
return _CollapseFeedback(default)
[docs] @classmethod
def StateFeedback(cls, default=None, raw_data=False, open=False):
"""
State of the evolution to be used in a time-dependent operator.
When used as an args:
``QobjEvo([op, func], args={"state": MCSolver.StateFeedback()})``
The ``func`` will receive the density matrix as ``state`` during the
evolution.
Parameters
----------
default : Qobj or qutip.core.data.Data, default : None
Initial value to be used at setup of the system.
open : bool, default False
Set to ``True`` when using the monte carlo solver for open systems.
raw_data : bool, default : False
If True, the raw matrix will be passed instead of a Qobj.
For density matrices, the matrices can be column stacked or square
depending on the integration method.
"""
if raw_data:
return _DataFeedback(default, open=open)
return _QobjFeedback(default, open=open)