"""
A module for simulating population protocols.
The main class :any:`Simulation` is created with a description of the protocol and the initial condition,
and is responsible for running the simulation.
The general syntax is
.. code-block:: python
a, b, u = 'A', 'B', 'U'
approx_majority = {
(a,b): (u,u),
(a,u): (a,a),
(b,u): (b,b),
}
n = 10 ** 5
init_config = {a: 0.51 * n, b: 0.49 * n}
sim = Simulation(init_config=init_config, rule=approx_majority)
sim.run()
sim.history.plot()
More examples given in https://github.com/UC-Davis-molecular-computing/ppsim/tree/main/examples
:py:meth:`time_trials` is a convenience function used for gathering data about the
convergence time of a protocol.
"""
import dataclasses
from copy import deepcopy
from dataclasses import dataclass
from datetime import timedelta
import math
from time import perf_counter
from typing import Union, Hashable, Dict, Tuple, Callable, Optional, List, Iterable, Set, Any
from natsort import natsorted
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
from . import simulator
from ppsim.crn import Reaction, reactions_to_dict
from ppsim.snapshot import Snapshot, TimeUpdate
# TODO: these names are not showing up in the mouseover information
State = Hashable
Output = Union[Tuple[State, State], Dict[Tuple[State, State], float]]
TransitionFunction = Callable[[State, State], Output]
Rule = Union[TransitionFunction, Dict[Tuple[State, State], Output], Iterable[Reaction]]
"""Type representing transition rule for protocol. Is one of three types: TODO"""
ConvergenceDetector = Callable[[Dict[State, int]], bool]
# TODO: give other option for when the number of reachable states is large or unbounded
[docs]def state_enumeration(init_dist: Dict[State, int], rule: Callable[[State, State], Output]) -> Set[State]:
"""Finds all reachable states by breadth-first search.
Args:
init_dist: dictionary mapping states to counts
(states are any hashable type, commonly NamedTuple or String)
rule: function mapping a pair of states to either a pair of states
or to a dictionary mapping pairs of states to probabilities
Returns:
a set of all reachable states
"""
checked_states = set()
unchecked_states = set(init_dist.keys())
while len(unchecked_states) > 0:
unchecked_state = unchecked_states.pop()
if unchecked_state not in checked_states:
checked_states.add(unchecked_state)
for checked_state in checked_states:
for new_states in [rule(checked_state, unchecked_state),
rule(unchecked_state, checked_state)]:
if new_states is not None:
if isinstance(new_states, dict):
# if the output is a distribution
new_states = sum(new_states.keys(), ())
for new_state in new_states:
if new_state not in checked_states and new_state not in unchecked_states:
unchecked_states.add(new_state)
return checked_states
[docs]@dataclass
class Simulation:
"""Class to simulate a population protocol."""
state_list: List[State]
"""A sorted list of all reachable states."""
state_dict: Dict[State, int]
"""Maps states to their integer index to be used
in array representations."""
simulator: simulator.Simulator
"""An internal :any:`Simulator` object, whose methods actually
perform the steps of the simulation."""
configs: List[np.ndarray]
"""A list of all configurations that have been
recorded during the simulation, as integer arrays."""
time: float
"""The current time."""
times: List[Union[float, timedelta]]
"""A list of all the corresponding times for configs."""
steps_per_time_unit: float
"""Number of simulated interactions per time unit."""
time_units: Optional[str]
"""The units that time is in."""
continuous_time: bool
"""Whether continuous time is used. The regular discrete
time model considers :any:`steps_per_time_unit` steps
to be 1 unit of time.
The continuous time model is a poisson process, with expected
:any:`steps_per_time_unit` steps per 1 unit of time."""
column_names: Union[pd.MultiIndex, List[str]]
"""Columns representing all states for pandas dataframe.
If the State is a tuple, NamedTuple, or dataclass, this will be a
pandas MultiIndex based on the various fields.
Otherwise it is list of str(State) for each State."""
snapshots: List[Snapshot]
"""A list of :any:`Snapshot` objects, which get
periodically called during the running of the simulation to give live
updates."""
rng: np.random.Generator
"""A numpy random generator used to sample random variables outside the
cython code."""
seed: Optional[int]
"""The optional integer seed used for rng and inside cython code."""
def __init__(self, init_config: Dict[State, int], rule: Rule, simulator_method: str = "MultiBatch",
transition_order: str = "symmetric", seed: Optional[int] = None,
volume: Optional[float] = None, continuous_time: bool = False, time_units: Optional[str] = None,
**kwargs):
"""Initialize a Simulation.
Args:
init_config: The starting configuration, as a
dictionary mapping states to counts.
rule (:any:`Rule`): A representation of the transition rule. The first two options are
a dictionary, whose keys are tuples of 2 states and values are their
outputs, or a function which takes pairs of states as input. For a
deterministic transition function, the output is a tuple of 2 states.
For a probabilistic transition function, the output is a dictionary
mapping tuples of states to probabilities. Inputs that are not present
in the dictionary, or return None from the function, are interpreted as
null transitions that return the same pair of states as the output.
The third option is a list of :any:`Reaction` objects describing a CRN,
which will be parsed into an equivalent population protocol.
simulator_method: Which Simulator method to use, either ``'MultiBatch'``
or ``'Sequential'``.
``'MultiBatch'``:
:any:`SimulatorMultiBatch` does O(sqrt(n)) interaction steps in parallel
using batching, and is much faster for large population sizes and
relatively small state sets.
``'Sequential'``:
:any:`SimulatorSequentialArray` represents the population as an array of
agents, and simulates each interaction step by choosing a pair of agents
to update. Defaults to 'MultiBatch'.
transition_order: Should the rule be interpreted as being symmetric,
either ``'asymmetric'``, ``'symmetric'``, or ``'symmetric_enforced'``.
Defaults to 'symmetric'.
``'asymmetric'``:
Ordering of the inputs matters, and all inputs not
explicitly given as assumed to be null interactions.
``'symmetric'``:
The input pairs are interpreted as unordered. If rule(a, b)
returns None, while rule(b, a) has a non-null output, then the
output of rule(a, b) is assumed to be the same as rule(b, a).
If rule(a, b) and rule(b, a) are each given, there is no effect.
Asymmetric interactions can be explicitly included this way.
``'symmetric_enforced'``:
The same as symmetric, except that if rule(a, b)
and rule(b, a) are non-null and do not give the same set of outputs,
a ValueError is raised.
seed: An optional integer used as the seed for all pseudorandom number
generation. Defaults to None.
volume: If a list of :any:`Reaction` objects is given for a CRN, then
the parameter volume can be passed in here. Defaults to None.
If None, the volume will be assumed to be the population size n.
continuous_time: Whether continuous time is used. Defaults to False.
If a CRN as a list of reactions is passed in, this will be set to True.
time_units: An optional string given the units that time is in. Defaults to None.
This must be a valid string to pass as unit to pandas.to_timedelta.
**kwargs: If `rule` is a function, any extra function parameters are passed in here,
beyond the first two arguments representing the two agents. For example, if `rule` is
defined:
.. code-block:: python
def rule(sender: int, receiver: int, threshold: int) -> Tuple[int, int]:
if sender + receiver > threshold:
return 0, 0
else:
return sender, receiver+1
To use a threshold of 20 in each interaction, in the :any:`Simulation` constructor, use
.. code-block:: python
sim = Simulation(init_config, rule, threshold=20)
"""
self.seed = seed
self.rng = np.random.default_rng(seed)
self.n = sum(init_config.values())
self.steps_per_time_unit = self.n
self.time_units = time_units
self.continuous_time = continuous_time
# if rule is iterable of Reactions from the crn module, then convert to dict
rule_is_reaction_iterable = True
try:
for reaction in rule:
if not isinstance(reaction, Reaction):
rule_is_reaction_iterable = False
break
except:
# might end up here if rule is not even iterable, e.g., is a function
rule_is_reaction_iterable = False
if rule_is_reaction_iterable:
if volume is None:
volume = self.n
rule, rate_max = reactions_to_dict(rule, self.n, volume)
transition_order = 'asymmetric'
self.steps_per_time_unit *= rate_max
# Default to continuous time for lists of reactions
self.continuous_time = True
self._rule = rule
self._rule_kwargs = kwargs
# Get a list of all reachable states, use the natsort library to put in a nice order.
if type(self._rule) == dict:
# If the rule is a dict, we can loop over the entries to get all states
states = []
for input, output in self._rule.items():
states.extend(input)
if type(output) == dict:
for pair in output.keys():
states.extend(pair)
else:
states.extend(output)
state_list = list(set(states))
else:
# Otherwise, we use breadth-first search to find all reachable states
state_list = list(state_enumeration(init_config, self.rule))
# We use the natsorted library to put state_list in a reasonable order
self.state_list = natsorted(state_list, key=lambda x: repr(x))
self.state_dict = {state: i for i, state in enumerate(self.state_list)}
if simulator_method.lower() == 'multibatch':
self._method = simulator.SimulatorMultiBatch
elif simulator_method.lower() == 'sequential':
self._method = simulator.SimulatorSequentialArray
else:
raise ValueError('simulator_method must be multibatch or sequential')
self._transition_order = transition_order
self.initialize_simulator(self.array_from_dict(init_config))
# Check an arbitrary state to see if it has fields.
# This will be true for either a tuple, NamedTuple, or dataclass.
state = next(iter(init_config.keys()))
# Check for dataclass.
if dataclasses.is_dataclass(state):
field_names = [field.name for field in dataclasses.fields(state)]
tuples = [dataclasses.astuple(state) for state in self.state_list]
else:
# Check for NamedTuple.
field_names = getattr(state, '_fields', None)
tuples = self.state_list
# Check also for tuple.
if (field_names and len(field_names) > 1)\
or (isinstance(state, tuple) and len(state) > 1):
# Make a MultiIndex only if there are multiple fields
self.column_names = pd.MultiIndex.from_tuples(tuples, names=field_names)
else:
self.column_names = [str(i) for i in self.state_list]
self.configs = []
self.times = []
self.time = 0
self.add_config()
# private history dataframe is initially empty, updated by the getter of property self.history
self._history = pd.DataFrame(data=self.configs, index=pd.Index(self.times_in_units(self.times)),
columns=self.column_names)
self.snapshots = []
[docs] def rule(self, a, b):
"""The rule, as a function of two input states."""
# If the input rule was a dict
if type(self._rule) == dict:
if (a, b) in self._rule:
return self._rule[(a, b)]
# If the input rule was a function, with possible kwargs
elif callable(self._rule):
# Make a fresh copy in the case of a dataclass in case the function mutates a, b
if dataclasses.is_dataclass(a):
a, b = dataclasses.replace(a), dataclasses.replace(b)
# TODO: this doesn't help if the dataclass has more classes as fields
# using deepcopy fixes this problem, but is significantly slower
# a, b = deepcopy(a), deepcopy(b)
output = self._rule(a, b, **self._rule_kwargs)
# If function just mutates a, b but doesn't return, then return new a, b values
return (a, b) if output is None else output
else:
raise TypeError("rule must be either a dict or a callable.")
[docs] def initialize_simulator(self, config):
"""Build the data structures necessary to instantiate the :any:`Simulator` class.
Args:
config: The config array to instantiate :any:`Simulator`.
"""
q = len(self.state_list)
delta = np.zeros((q, q, 2), dtype=np.intp)
null_transitions = np.zeros((q, q), dtype=bool)
random_transitions = np.zeros((q, q, 2), dtype=np.intp)
random_outputs = []
transition_probabilities = []
for i, a in enumerate(self.state_list):
for j, b in enumerate(self.state_list):
output = self.rule(a, b)
# when output is a distribution
if type(output) == dict:
s = sum(output.values())
assert s <= 1 + 2 ** -20, "The sum of output probabilities must be <= 1."
# ensure probabilities sum to 1
if 1 - s:
if (a, b) in output.keys():
output[(a, b)] += 1 - s
else:
output[(a, b)] = 1 - s
if len(output) == 1:
# distribution only had 1 output, not actually random
output = next(iter(output.keys()))
else:
# add (number of outputs, index to outputs)
random_transitions[i, j] = (len(output), len(random_outputs))
for (x, y) in output.keys():
random_outputs.append((self.state_dict[x], self.state_dict[y]))
transition_probabilities.extend(list(output.values()))
if output is None or set(output) == {a, b}:
null_transitions[i, j] = True
delta[i, j] = (i, j)
elif type(output) == tuple:
delta[i, j] = (self.state_dict[output[0]], self.state_dict[output[1]])
random_outputs = np.array(random_outputs, dtype=np.intp)
transition_probabilities = np.array(transition_probabilities, dtype=float)
if self._transition_order.lower() in ['symmetric', 'symmetric_enforced']:
for i in range(q):
for j in range(q):
# Set the output for i, j to be equal to j, i if null
if null_transitions[i, j]:
null_transitions[i, j] = null_transitions[j, i]
delta[i, j] = delta[j, i]
random_transitions[i, j] = random_transitions[j, i]
# If i, j and j, i are both non-null, with symmetric_enforced, check outputs are equal
elif self._transition_order.lower() == 'symmetric_enforced' \
and not null_transitions[j, i]:
if sorted(delta[i, j]) != sorted(delta[j, i]) or \
random_transitions[i, j, 0] != random_transitions[j, i, 0]:
a, b = self.state_list[i], self.state_list[j]
raise ValueError(f'''Asymmetric interaction:
{a, b} -> {self.rule(a, b)}
{b, a} -> {self.rule(b, a)}''')
self.simulator = self._method(config, delta, null_transitions,
random_transitions, random_outputs, transition_probabilities, self.seed)
[docs] def array_from_dict(self, d: Dict) -> np.ndarray:
"""Convert a configuration dictionary to an array.
Args:
d: A dictionary mapping states to counts.
Returns:
An array giving counts of all states, in the order of
self.state_list.
"""
a = np.zeros(len(self.state_list), dtype=np.int64)
for k in d.keys():
a[self.state_dict[k]] += d[k]
return a
[docs] def run(self, run_until: Union[float, ConvergenceDetector] = None,
history_interval: Union[float, Callable[[float], float]] = 1.,
stopping_interval: float = 1., timer: bool = True) -> None:
"""Runs the simulation.
Can give a fixed amount of time to run the simulation, or a function that checks
the configuration for convergence.
Args:
run_until: The stop condition. To run for a fixed amount of time, give
a numerical value. To run until a convergence criterion, give a function
mapping a configuration (as a dictionary mapping states to counts) to a
boolean. The run will stop when the function returns True.
Defaults to None. If None, the simulation will run until the configuration
is silent (all transitions are null). This only works with the multibatch
simulator method, if another simulator method is given, then using None will
raise a ValueError.
history_interval: The length to run the simulator before recording data,
in current time units. Defaults to 1.
This can either be a float, or a function that takes the current time and
and returns a float.
stopping_interval: The length to run the simulator before checking for the stop
condition.
timer: If True, and there are no other snapshot objects, a default :any:`TimeUpdate`
snapshot will be created to print updates with the current time.
Defaults to True.
"""
if len(self.snapshots) == 0 and timer is True:
if type(run_until) is float or type(run_until) is int:
self.add_snapshot(TimeUpdate(time_bound=run_until))
else:
self.add_snapshot(TimeUpdate())
end_time = None
# stop_condition() returns True when it is time to stop
if run_until is None:
if type(self.simulator) != simulator.SimulatorMultiBatch:
raise ValueError('Running until silence only works with multibatch simulator.')
def stop_condition():
return self.simulator.silent
elif type(run_until) is float or type(run_until) is int:
end_time = self.time + run_until
def stop_condition():
return self.time >= end_time
elif callable(run_until):
def stop_condition():
return run_until(self.config_dict)
else:
raise TypeError('run_until must be a float, int, function, or None.')
# Stop if stop_condition is already met
if stop_condition():
return
def get_next_history_time():
# Get the next time that will be recorded to self.times and self.history
if callable(history_interval):
length = history_interval(self.time)
else:
length = history_interval
if length <= 0:
raise ValueError('history_interval must always be strictly positive.')
return self.time + length
if stopping_interval <= 0:
raise ValueError('stopping_interval must always be strictly positive.')
next_history_time = get_next_history_time()
def get_next_time():
# Get the next time simulator will run until
t = min(next_history_time, self.time + stopping_interval)
if end_time is not None:
t = min(t, end_time)
return t
next_time = get_next_time()
# The next step that the simulator will be run until, which corresponds to parallel time next_time
next_step = self.time_to_steps(next_time)
for snapshot in self.snapshots:
snapshot.next_snapshot_time = perf_counter() + snapshot.update_time
# add max_wall_clock to be the minimum snapshot update time, to put a time bound on calls to simulator.run
max_wallclock_time = [min([s.update_time for s in self.snapshots])] if len(self.snapshots) > 0 else []
while stop_condition() is False:
if self.time >= next_time:
next_time = get_next_time()
next_step = self.time_to_steps(next_time)
current_step = self.simulator.t
self.simulator.run(next_step, *max_wallclock_time)
if self.simulator.t == next_step:
self.time = next_time
elif self.simulator.t < next_step:
# simulator exited early from hitting max_wallclock_time
# add a fraction of the time until next_time equal to the fractional progress made by simulator
self.time += (next_time - self.time) * (self.simulator.t - current_step) / (next_step - current_step)
else:
raise RuntimeError(f'The simulator ran to step {self.simulator.t} past the next step {next_step}.')
if self.time >= next_history_time:
assert self.time == next_history_time, \
f'self.time = {self.time} overshot next_history_time = {next_history_time}'
self.add_config()
next_history_time = get_next_history_time()
for snapshot in self.snapshots:
if perf_counter() >= snapshot.next_snapshot_time:
snapshot.update()
snapshot.next_snapshot_time = perf_counter() + snapshot.update_time
# add the final configuration if it wasn't already recorded
if self.time > self.times[-1]:
self.add_config()
# final updates for all snapshots
for snapshot in self.snapshots:
snapshot.update()
if len(self.snapshots) == 1 and type(self.snapshots[0]) is TimeUpdate:
self.snapshots[0].pbar.close()
self.snapshots.pop()
# print()
@property
def reactions(self) -> str:
"""
A string showing all non-null transitions in reaction notation.
Each reaction is separated by newlines, so that ``print(self.reactions)`` will display all reactions.
Only works with simulator method multibatch, otherwise will raise a ValueError.
"""
if type(self.simulator) != simulator.SimulatorMultiBatch:
raise ValueError('reactions must be defined by multibatch simulator.')
w = max([len(str(state)) for state in self.state_list])
reactions = [self._reaction_string(r, p, w) for (r, p) in
zip(self.simulator.reactions, self.simulator.reaction_probabilities)]
return '\n'.join(reactions)
@property
def enabled_reactions(self) -> str:
"""
A string showing all non-null transitions that are currently enabled.
This can only check the current configuration in self.simulator.
Each reaction is separated by newlines, so that ``print(self.enabled_reactions)``
will display all enabled reactions.
"""
if type(self.simulator) != simulator.SimulatorMultiBatch:
raise ValueError('reactions must be defined by multibatch simulator.')
w = max([len(str(state)) for state in self.state_list])
self.simulator.get_enabled_reactions()
reactions = []
for i in range(self.simulator.num_enabled_reactions):
r = self.simulator.reactions[self.simulator.enabled_reactions[i]]
p = self.simulator.reaction_probabilities[self.simulator.enabled_reactions[i]]
reactions.append(self._reaction_string(r, p, w))
return '\n'.join(reactions)
def _reaction_string(self, reaction, p: float = 1, w: int = 1) -> str:
"""A string representation of a reaction."""
reactants = [self.state_list[i] for i in sorted(reaction[0:2])]
products = [self.state_list[i] for i in sorted(reaction[2:])]
s = '{0}, {1} --> {2}, {3}'.format(*[str(x).rjust(w) for x in reactants + products])
if p < 1:
s += f' with probability {p}'
return s
# TODO: If this changes n, then the timescale must change
[docs] def reset(self, init_config: Optional[Dict[State, int]] = None) -> None:
"""Reset the simulation.
Args:
init_config: The configuration to reset to. Defaults to None.
If None, will use the old initial configuration.
"""
if init_config is None:
config = self.configs[0]
else:
config = np.zeros(len(self.state_list), dtype=np.int64)
for k in init_config.keys():
config[self.state_dict[k]] += init_config[k]
self.configs = [config]
self.times = [0]
self.time = 0
self._history = pd.DataFrame(data=self.configs, index=pd.Index(self.times, name='time'),
columns=self._history.columns)
self.simulator.reset(config)
# TODO: If this changes n, then the timescale must change
[docs] def set_config(self, config: Union[Dict[State, int], np.ndarray]) -> None:
"""Change the current configuration.
Args:
config: The configuration to change to. This can be a dictionary,
mapping states to counts, or an array giving counts in the order
of :any:`state_list`.
"""
if type(config) is dict:
config_array = self.array_from_dict(config)
else:
config_array = np.array(config, dtype=np.int64)
self.simulator.reset(config_array, self.simulator.t)
self.add_config()
[docs] def time_to_steps(self, time: float) -> int:
"""Convert simulated time into number of simulated interaction steps.
Args:
time: The amount of time to convert.
"""
if self.continuous_time:
# In continuous time the number of interactions to simulate is a Poisson random variable
# The last recorded simulated time was self.time, and at this point we had simulated self.simulator.t
# total interactions. We first compute the expected number of additional steps to simulate.
expected_steps = (time - self.time) * self.steps_per_time_unit
return self.simulator.t + self.rng.poisson(expected_steps)
else:
# In discrete time we multiply to convert
return math.floor(time * self.steps_per_time_unit)
@property
def config_dict(self) -> Dict[State, int]:
"""The current configuration, as a dictionary mapping states to counts."""
return {self.state_list[i]: self.simulator.config[i] for i in np.nonzero(self.simulator.config)[0]}
@property
def config_array(self) -> np.ndarray:
"""The current configuration in the simulator, as an array of counts.
The array is given in the same order as self.state_list. The index of state s
is self.state_dict[s].
"""
return np.asarray(self.simulator.config)
@property
def history(self) -> pd.DataFrame:
"""A pandas dataframe containing the history of all recorded configurations."""
h = len(self._history)
if h < len(self.configs):
new_history = pd.DataFrame(data=self.configs[h:], index=pd.Index(self.times_in_units(self.times[h:])),
columns=self._history.columns)
self._history = pd.concat([self._history, new_history])
if self.time_units is None:
if self.continuous_time:
self._history.index.name = 'time (continuous units)'
else:
n = "n" if self.n == self.steps_per_time_unit else str(self.steps_per_time_unit)
self._history.index.name = f'time ({n} interactions)'
return self._history
@property
def null_probability(self) -> float:
"""The probability the next interaction is null."""
if type(self.simulator) != simulator.SimulatorMultiBatch:
raise ValueError('null probability requires by multibatch simulator.')
self.simulator.get_enabled_reactions()
n = self.simulator.n
return 1 - self.simulator.get_total_propensity() / (n * (n-1) / 2)
[docs] def times_in_units(self, times: Iterable[float]) -> Iterable[Any]:
"""If :any:`time_units` is defined, convert time list to appropriate units."""
if self.time_units:
return pd.to_timedelta(times, unit=self.time_units)
else:
return times
[docs] def add_config(self) -> None:
"""Appends the current simulator configuration and time."""
self.configs.append(np.array(self.simulator.config))
self.times.append(self.time)
[docs] def set_snapshot_time(self, time: float) -> None:
"""Updates all snapshots to the nearest recorded configuration to a specified time.
Args:
time: The parallel time to update the snapshots to.
"""
index = np.searchsorted(self.times, time)
for snapshot in self.snapshots:
snapshot.update(index=index)
[docs] def set_snapshot_index(self, index: int) -> None:
"""Updates all snapshots to the configuration :any:`configs` ``[index]``.
Args:
index: The index of the configuration.
"""
for snapshot in self.snapshots:
snapshot.update(index=index)
[docs] def add_snapshot(self, snap: "Snapshot") -> None:
"""Add a new :any:`Snapshot` to :any:`snapshots`.
Args:
snap: The :any:`Snapshot` object to be added.
"""
snap.simulation = self
snap.initialize()
snap.update()
self.snapshots.append(snap)
[docs] def snapshot_slider(self, var: str = 'index') -> "widgets.interactive":
"""Returns a slider that updates all :any:`Snapshot` objects.
Returns a slider from the ipywidgets library.
Args:
var: What variable the slider uses, either ``'index'`` or ``'time'``.
"""
import ipywidgets as widgets
if var.lower() == 'index':
return widgets.interactive(self.set_snapshot_index,
index=widgets.IntSlider(min=0,
max=len(self.times) - 1,
layout=widgets.Layout(width='100%'),
step=1))
elif var.lower() == 'time':
return widgets.interactive(self.set_snapshot_time,
time=widgets.FloatSlider(min=self.times[0],
max=self.times[-1],
layout=widgets.Layout(width='100%'),
step=0.01))
else:
raise ValueError("var must be either 'index' or 'time'.")
[docs] def sample_silence_time(self) -> float:
"""Starts a new trial from the initial distribution and return time until silence."""
if type(self.simulator) != simulator.SimulatorMultiBatch:
raise ValueError('silence time can only be found by multibatch simulator.')
self.simulator.run_until_silent(np.array(self.configs[0]))
return self.time
[docs] def sample_future_configuration(self, time: float, num_samples: int = 100) -> pd.DataFrame:
"""Repeatedly samples the configuration at a fixed future time.
Args:
time: The amount of time ahead to sample the configuration.
num_samples: The number of samples to get.
Returns:
A dataframe whose rows are the sampled configuration.
"""
samples = []
t = self.simulator.t
for _ in tqdm(range(num_samples)):
self.simulator.reset(np.array(self.configs[-1]), t)
end_step = t + self.time_to_steps(time)
self.simulator.run(end_step)
samples.append(np.array(self.simulator.config))
return pd.DataFrame(data=samples, index=pd.Index(range(num_samples), name='trial #'),
columns=self._history.columns)
def __getstate__(self):
"""Returns information to be pickled."""
# Clear _history such that it can be regenerated by self.history
d = dict(self.__dict__)
d['_history'] = pd.DataFrame(data=self.configs[0:1], index=pd.Index(self.times_in_units(self.times[0:1])),
columns=self._history.columns)
del d['simulator']
return d
def __setstate__(self, state) -> None:
"""Instantiates from the pickled state information."""
self.__dict__ = state
self.initialize_simulator(self.configs[-1])
[docs]def time_trials(rule: Rule, ns: List[int], initial_conditions: Union[Callable, List],
convergence_condition: Optional[Callable] = None, convergence_check_interval: float = 0.1,
num_trials: int = 100, max_wallclock_time: float = 60 * 60 * 24,
**kwargs) -> pd.DataFrame:
"""Gathers data about the convergence time of a rule.
Args:
rule: The rule that is used to generate the :any:`Simulation` object.
ns: A list of population sizes n to sample from.
This should be in increasing order.
initial_conditions: An initial condition is a dict mapping states to counts.
This can either be a list of initial conditions, or a function mapping
population size n to an initial condition of n agents.
convergence_condition: A boolean function that takes a configuration dict
as input and returns True if that configuration has converged.
Defaults to None. If None, the simulation will run until silent
(all transitions are null), and the data will be for silence time.
convergence_check_interval: How often (in parallel time) the simulation will
run between convergence checks. Defaults to 0.1.
Smaller values give better resolution, but spend more time checking for
convergence.
num_trials: The maximum number of trials that will be done for each
population size n, if there is sufficient time. Defaults to 100.
If you want to ensure that you get the full num_trials samples for
each value n, use a large value for time_bound.
max_wallclock_time: A bound (in seconds) for how long this function will run.
Each value n is given a time budget based on the time remaining, and
will stop before doing num_trials runs when this time budget runs out.
Defaults to 60 * 60 * 24 (one day).
**kwargs: Other keyword arguments to pass into :any:`Simulation`.
Returns:
A pandas dataframe giving the data from each trial, with two columns
``'n''`` and ``'time'``. A good way to visualize this dataframe is using the
seaborn library, calling ``sns.lineplot(x='n', y='time', data=df)``.
"""
d = {'n': [], 'time': []}
end_time = perf_counter() + max_wallclock_time
if callable(initial_conditions):
initial_conditions = [initial_conditions(n) for n in ns]
for i in tqdm(range(len(ns))):
sim = Simulation(initial_conditions[i], rule, **kwargs)
t = perf_counter()
time_limit = t + (end_time - t) / (len(ns) - i)
j = 0
while j < num_trials and perf_counter() < time_limit:
j += 1
sim.reset(initial_conditions[i])
sim.run(convergence_condition, stopping_interval=convergence_check_interval, timer=False)
d['n'].append(ns[i])
d['time'].append(sim.time)
return pd.DataFrame(data=d)