"""
Module for expression population protocols using CRN notation. Ideas and much code taken from
https://github.com/enricozb/python-crn.
The general syntax is
.. code-block:: python
a, b, u = species('A B U')
approx_majority = [
a + b >> 2 * u,
a + u >> 2 * a,
b + u >> 2 * b,
]
n = 10 ** 5
init_config = {a: 0.51 * n, b: 0.49 * n}
sim = Simulation(init_config=init_config, rule=approx_majority)
In other words, a list of reactions is treated by the ppsim library just like the other ways of specifying
population protocol transitions (the `rule` parameter in the constructor for :any:`Simulation`, which also
accepts a dict or a Python function).
More examples given in https://github.com/UC-Davis-molecular-computing/ppsim/tree/main/examples
"""
from __future__ import annotations # needed for forward references in type hints
from collections import defaultdict
import copy
from enum import Enum
from typing import Union, Dict, Tuple, Set, Iterable, DefaultDict, List, Any
from dataclasses import dataclass
from xml.dom import minidom
[docs]def species(sp: Union[str, Iterable[str]]) -> Tuple[Specie, ...]:
"""
Create a list of :any:`Specie` (Single species :any:`Expression`'s),
or a single one.
args:
sp:
An string or Iterable of strings representing the names of the species being created.
If a single string, species names are interpreted as space-separated.
Examples:
.. code-block:: python
w, x, y, z = species('W X Y Z')
rxn = x + y >> z + w
.. code-block:: python
w, x, y, z = species(['W', 'X', 'Y', 'Z'])
rxn = x + y >> z + w
"""
species_list: List[str]
if isinstance(sp, str):
species_list = sp.split()
else:
species_list = [specie.strip() for specie in sp]
# if len(species_list) == 1:
# return Specie(species_list[0])
if len(species_list) != len(set(species_list)):
raise ValueError(f'species_list {species_list} cannot contain duplicates.')
return tuple(map(Specie, species_list))
SpeciePair = Tuple['Specie', 'Specie'] # forward annotations don't seem to work here
Output = Union[SpeciePair, Dict[SpeciePair, float]]
[docs]def replace_reversible_rxns(rxns: Iterable[Reaction]) -> List[Reaction]:
"""
Args:
rxns: list of :any:`Reaction`'s
Returns: list of :any:`Reaction`'s, where every reversible reaction in `rxns` has been replaced by
two irreversible reactions, and all others have been left as they are
"""
new_rxns: List[Reaction] = []
for rxn in rxns:
if not rxn.reversible:
new_rxn = copy.deepcopy(rxn)
new_rxns.append(new_rxn)
else:
forward_rxn = Reaction(reactants=rxn.reactants, products=rxn.products, k=rxn.rate_constant,
rate_constant_units=rxn.rate_constant_units, reversible=False)
reverse_rxn = Reaction(reactants=rxn.products, products=rxn.reactants,
k=rxn.rate_constant_reverse,
rate_constant_units=rxn.rate_constant_reverse_units, reversible=False)
new_rxns.extend([forward_rxn, reverse_rxn])
return new_rxns
# XXX: This algorithm currently uses the reactant *ordered* pair.
# We should think about the logic of that and see if it makes sense to collapse
# two reversed ordered pairs to a single unordered pair at this step,
# or whether that should be done explicitly by the user specifying transition_order='symmetric'.
[docs]def reactions_to_dict(reactions: Iterable[Reaction], n: int, volume: float) \
-> Tuple[Dict[SpeciePair, Output], float]:
"""
Returns dict representation of `reactions`, transforming unimolecular reactions to bimolecular,
and converting rates to probabilities, also returning the max rate so the :any:`Simulation` knows
how to scale time.
Args:
reactions: list of :any:`Reaction`'s
n: the population size, necessary for rate conversion
volume: parameter as defined in Gillespie algorithm
Returns:
(transitions_dict, max_rate), where `transitions_dict` is the dict representation of the transitions,
and `max_rate` is the maximum rate for any pair of reactants,
i.e., if we have reactions (a + b >> c + d).k(2) and (a + b >> x + y).k(3),
then the ordered pair (a,b) has rate 2+3 = 5
"""
reactions = replace_reversible_rxns(reactions)
# Make a copy of reactions because this conversion will mutate the reactions
reactions = convert_unimolecular_to_bimolecular_and_flip_reactant_order(reactions, n, volume)
# for each ordered pair of reactants, calculate sum of rate constants across all reactions with those reactants
# get in stochastic units no matter how they're encoded in the reactions
reactant_pair_rates: DefaultDict[SpeciePair, float] = defaultdict(int)
for reaction in reactions:
if not reaction.is_bimolecular():
raise ValueError(f'all reactions must have exactly two reactants, violated by {reaction}')
if not reaction.num_products() == 2:
raise ValueError(f'all reactions must have exactly two products, violated by {reaction}')
reactants = reaction.reactants_if_bimolecular()
reactant_pair_rates[reactants] += reaction.rate_constant_stochastic
# divide all rate constants by the max rate (per reactant pair) to help turn them into probabilities
max_rate = max(reactant_pair_rates.values())
# add one randomized transition per reaction
transitions: Dict[SpeciePair, Output] = {}
for reaction in reactions:
prob = reaction.rate_constant_stochastic / max_rate
reactants = reaction.reactants_if_bimolecular()
products = reaction.products_if_exactly_two()
if reactants not in transitions:
# should we be worried about floating-point error here?
# I hope not since dividing a float by itself should always result in 1.0.
transitions[reactants] = products if prob == 1.0 else {products: prob}
else:
# if we calculated probabilities correctly above, if we assigned a dict entry to be non-randomized
# (since prob == 1.0 above), we should not encounter another reaction with same reactants
output = transitions[reactants]
assert isinstance(output, dict)
output[products] = prob
# assert that each possible input for transitions has output probabilities summing to 1
for reactants, outputs in transitions.items():
if isinstance(outputs, dict):
sum_probs = sum(prob for prob in outputs.values())
assert sum_probs <= 1 + 2 ** -20, f'sum_probs exceeds 1: {sum_probs}'
return transitions, max_rate
[docs]def convert_unimolecular_to_bimolecular_and_flip_reactant_order(reactions: Iterable[Reaction], n: int,
volume: float) -> List[Reaction]:
"""Process all reactions before being added to the dictionary.
bimolecular reactions have their rates multiplied by the corrective factor (n-1) / (2 * volume).
Bimolecular reactions with two different reactants are added twice, with their reactants in both orders.
"""
# gather set of all species together
all_species: Set[Specie] = set()
for reaction in reactions:
all_species.update(reaction.get_species())
converted_reactions = []
for reaction in reactions:
if reaction.num_reactants() != reaction.num_products():
raise ValueError(
f'each reaction must have same number of reactants and products, violated by {reaction}')
if reaction.is_bimolecular():
# Corrective factor to reaction rate
reaction.rate_constant *= (n - 1) / (2 * volume)
converted_reactions.append(reaction)
# Add a flipped copy if the reaction has two different reactants
reactants = reaction.reactants_if_bimolecular()
if len(set(reactants)) > 1:
flipped_reaction = copy.copy(reaction)
flipped_reaction.reactants = Expression([reactants[1], reactants[0]])
assert flipped_reaction.reactants_if_bimolecular != reactants
converted_reactions.append(flipped_reaction)
elif reaction.is_unimolecular():
# for each unimolecular reaction R -->(k) P and each species S,
# add a bimolecular reaction R+S -->(k) P+S
reactant = reaction.reactant_if_unimolecular()
product = reaction.product_if_unique()
bimolecular_implementing_reactions = [(reactant + s >> product + s).k(reaction.rate_constant)
for s in all_species]
converted_reactions.extend(bimolecular_implementing_reactions)
else:
raise ValueError(f'each reaction must have exactly one or two reactants, violated by {reaction}')
return converted_reactions
[docs]@dataclass(frozen=True)
class Specie:
name: str
def __add__(self, other: Union[Specie, Expression]) -> Expression:
if isinstance(other, Expression):
return other + Expression([self])
elif isinstance(other, Specie):
return Expression([self]) + Expression([other])
raise NotImplementedError()
__radd__ = __add__
def __rshift__(self, other: Union[Specie, Expression]) -> Reaction:
return Reaction(self, other)
def __rrshift__(self, other: Union[Specie, Expression]) -> Reaction:
return Reaction(other, self)
def __or__(self, other: Union[Specie, Expression]) -> Reaction:
return Reaction(self, other, reversible=True)
def __mul__(self, other: int) -> Expression:
if isinstance(other, int):
return other * Expression([self])
else:
raise NotImplementedError()
def __rmul__(self, other: int) -> Expression:
if isinstance(other, int):
return other * Expression([self])
else:
raise NotImplementedError()
def __str__(self) -> str:
return self.name
def __lt__(self, other: object) -> bool:
if not isinstance(other, Specie):
return NotImplemented
return self.name < other.name
def __eq__(self, other: object) -> bool:
if not isinstance(other, Specie):
return NotImplemented
return self.name == other.name
__req__ = __eq__
[docs]@dataclass(frozen=True)
class Expression:
"""
Class used for very basic symbolic manipulation of left/right hand
side of stoichiometric equations. Not very user friendly; users should
just use the `species` functions and manipulate those to get their
reactions.
"""
species: List[Specie]
"""ordered list of species in expression, e.g, A+A+B is [A,A,B]"""
def __add__(self, other: Expression) -> Expression:
if isinstance(other, Expression):
species_copy = list(self.species)
species_copy.extend(other.species)
return Expression(species_copy)
else:
raise NotImplementedError()
def __rmul__(self, coeff: int) -> Expression:
if isinstance(coeff, int):
species_copy = []
for _ in range(coeff):
species_copy.extend(self.species)
return Expression(species_copy)
else:
raise NotImplementedError()
__mul__ = __rmul__
def __rshift__(self, expr: Union[Specie, Expression]) -> Reaction:
return Reaction(self, expr)
def __or__(self, other: Union[Specie, Expression]) -> Reaction:
return Reaction(self, other, reversible=True)
def __str__(self) -> str:
return ' + '.join(s.name for s in self.species)
def __len__(self) -> int:
return len(self.species)
[docs] def get_species(self) -> Set[Specie]:
"""
Returns the set of species in this expression, not their
coefficients.
"""
return set(self.species)
avogadro = 6.02214076e23
[docs]def concentration_to_count(concentration: float, volume: float) -> int:
"""
Args:
concentration: units of M (molar) = moles / liter
volume: units of liter
Returns:
count of molecule with `concentration` in `volume`
"""
return round(avogadro * concentration * volume)
[docs]class RateConstantUnits(Enum):
stochastic = 'stochastic'
"""Units of L/s. Multiple by Avogadro's number to convert to mass-action units."""
mass_action = 'mass_action'
"""Units of /M/s. Divide by Avogadro's number to convert to stochastic units."""
[docs]@dataclass
class Reaction:
"""
Representation of a stoichiometric reaction using a pair of Expressions,
one for the reactants and one for the products.
"""
reactants: Expression
"""The left side of species in the reaction."""
products: Expression
"""The right side of species in the reaction."""
rate_constant: float = 1
"""Rate constant of forward reaction."""
rate_constant_reverse: float = 1
"""Rate constant of reverse reaction (only used if :py:data:`Reaction.reversible` is true)."""
rate_constant_units: RateConstantUnits = RateConstantUnits.stochastic
"""Units of forward rate constant."""
rate_constant_reverse_units: RateConstantUnits = RateConstantUnits.stochastic
"""Units of reverse rate constant."""
reversible: bool = False
"""Whether reaction is reversible, i.e. products → reactants is a reaction also."""
def __init__(self, reactants: Union[Specie, Expression], products: Union[Specie, Expression],
k: float = 1, r: float = 1,
rate_constant_units: RateConstantUnits = RateConstantUnits.stochastic,
rate_constant_reverse_units: RateConstantUnits = RateConstantUnits.stochastic,
reversible: bool = False) -> None:
"""
Args:
reactants: left side of species in the reaction
products: right side of species in the reaction
k: Rate constant of forward reaction
r: Rate constant of reverse reaction (only used if :py:data:`Reaction.reversible` is true
rate_constant_units: Units of forward rate constant
rate_constant_reverse_units: Units of reverse rate constant
reversible: Whether reaction is reversible
"""
if not (isinstance(reactants, Specie) or isinstance(reactants, Expression)):
raise ValueError(
"Attempted construction of reaction with type of reactants "
f"as {type(reactants)}. Type of reactants must be Species "
"or Expression")
if not (isinstance(products, Specie) or isinstance(products, Expression)):
raise ValueError(
"Attempted construction of products with type of products "
f"as {type(products)}. Type of products must be Species "
"or Expression")
if isinstance(reactants, Specie):
reactants = Expression([reactants])
if isinstance(products, Specie):
products = Expression([products])
if len(reactants) == 0:
raise ValueError('reactants cannot be empty')
if len(products) == 0:
raise ValueError('products cannot be empty')
self.reactants = reactants
self.products = products
self.rate_constant = float(k)
self.rate_constant_reverse = float(r)
self.rate_constant_units = rate_constant_units
self.rate_constant_reverse_units = rate_constant_reverse_units
self.reversible = reversible
[docs] def is_unimolecular(self) -> bool:
"""
Returns: true if there is one reactant
"""
return self.num_reactants() == 1
[docs] def is_bimolecular(self) -> bool:
"""
Returns: true if there are two reactants
"""
return self.num_reactants() == 2
[docs] def symmetric(self) -> bool:
"""
Returns: true if there are two reactants that are the same species
"""
return self.num_reactants() == 2 and self.reactants.species[0] == self.reactants.species[1]
[docs] def symmetric_products(self) -> bool:
"""
Returns: true if there are two products that are the same species
"""
return self.num_products() == 2 and self.products.species[0] == self.products.species[1]
[docs] def num_reactants(self) -> int:
"""
Returns: number of reactants
"""
return len(self.reactants)
[docs] def num_products(self) -> int:
"""
Returns: number of products
"""
return len(self.products)
[docs] def is_conservative(self) -> bool:
"""
Returns: true if number of reactants equals number of products
"""
return self.num_reactants() == self.num_products()
[docs] def reactant_if_unimolecular(self) -> Specie:
"""
Returns: unique reactant if there is only one
Raises: ValueError if there are multiple reactants
"""
if self.is_unimolecular():
return self.reactants.species[0]
else:
raise ValueError(f'reaction {self} is not unimolecular')
[docs] def product_if_unique(self) -> Specie:
"""
Returns: unique product if there is only one
Raises: ValueError if there are multiple products
"""
if self.num_products() == 1:
return self.products.species[0]
else:
raise ValueError(f'reaction {self} does not have exactly one product')
[docs] def reactants_if_bimolecular(self) -> Tuple[Specie, Specie]:
"""
Returns: pair of reactants if there are exactly two
Raises: ValueError if there are not exactly two reactants
"""
if self.is_bimolecular():
return self.reactants.species[0], self.reactants.species[1]
else:
raise ValueError(f'reaction {self} is not bimolecular')
[docs] def reactant_names_if_bimolecular(self) -> Tuple[str, str]:
"""
Returns: pair of reactant names if there are exactly two
Raises: ValueError if there are not exactly two reactants
"""
r1, r2 = self.reactants_if_bimolecular()
return r1.name, r2.name
[docs] def products_if_exactly_two(self) -> Tuple[Specie, Specie]:
"""
Returns: pair of products if there are exactly two
Raises: ValueError if there are not exactly two products
"""
if self.num_products() == 2:
return self.products.species[0], self.products.species[1]
else:
raise ValueError(f'reaction {self} does not have exactly two products')
[docs] def product_names_if_exactly_two(self) -> Tuple[str, str]:
"""
Returns: pair of product names if there are exactly two
Raises: ValueError if there are not exactly two products
"""
p1, p2 = self.products_if_exactly_two()
return p1.name, p2.name
def __str__(self) -> str:
rev_rate_str = f'({self.rate_constant_reverse})<' if self.reversible else ''
return f"{self.reactants} {rev_rate_str}-->({self.rate_constant}) {self.products}"
def __repr__(self) -> str:
return (f"Reaction({repr(self.reactants)}, {repr(self.products)}, "
f"{self.rate_constant})")
@property
def rate_constant_stochastic(self) -> float:
"""
Returns: forward rate constant in stochastic units (converts from mass-action if necessary)
"""
return self.rate_constant \
if self.rate_constant_units == RateConstantUnits.stochastic \
else self.rate_constant / avogadro
@property
def rate_constant_reverse_stochastic(self) -> float:
"""
Returns: reverse rate constant in stochastic units (converts from mass-action if necessary)
"""
return self.rate_constant_reverse \
if self.rate_constant_reverse_units == RateConstantUnits.stochastic \
else self.rate_constant_reverse / avogadro
[docs] def k(self, coeff: float, units: RateConstantUnits = RateConstantUnits.stochastic) -> Reaction:
"""
Changes the reaction coefficient to `coeff` and returns `self`.
This is useful for including the rate constant during the construction
of a reaction. For example
.. code-block:: python
x, y, z = species("X Y Z")
rxns = [
(x + y >> z).k(2.5),
(z >> x).k(1.5),
(z >> y).k(0.5)),
]
args:
coeff: float
The new reaction coefficient
units: float
units of rate constant (default stochastic)
"""
if self.is_unimolecular() and units == RateConstantUnits.mass_action:
raise ValueError('cannot use mass-action rate constants on a unimolecular reaction')
self.rate_constant_units = units
self.rate_constant = coeff
return self
[docs] def r(self, coeff: float, units: RateConstantUnits = RateConstantUnits.stochastic) -> Reaction:
"""
Changes the reverse reactionn reaction rate constant to `coeff` and returns `self`.
This is useful for including the rate constant during the construction
of a reaction. For example
.. code-block:: python
x, y, z = species("X Y Z")
rxns = [
(x + y >> z).k(2.5),
(z >> x).k(1.5),
(z >> y).k(0.5)),
]
args:
coeff: float
The new reverse reaction rate constant
units: float
units of rate constant (default stochastic)
"""
if self.num_products() == 1 and units == RateConstantUnits.mass_action:
raise ValueError('cannot use mass-action rate constants on a unimolecular reaction; '
'this reaction has only one product, so its reverse is unimolecular')
if not self.reversible:
raise ValueError('cannot set r on an irreversible reaction')
self.rate_constant_reverse_units = units
self.rate_constant_reverse = coeff
return self
[docs] def get_species(self) -> Set[Specie]:
"""
Return: the set of species present in the products and reactants.
"""
return {
*self.reactants.get_species(),
*self.products.get_species()
}
# example of StochKit format:
'''
<Model>
<Description>Epidemic</Description>
<NumberOfReactions>1</NumberOfReactions>
<NumberOfSpecies>2</NumberOfSpecies>
<ParametersList>
<Parameter>
<Id>c1</Id>
<Expression>1.0</Expression>
</Parameter>
</ParametersList>
<ReactionsList>
<Reaction>
<Id>R2</Id>
<Description> A+B -> 2B </Description>
<Type>mass-action</Type>
<Rate>c1</Rate>
<Reactants>
<SpeciesReference id="A" stoichiometry="1"/>
<SpeciesReference id="B" stoichiometry="1"/>
</Reactants>
<Products>
<SpeciesReference id="B" stoichiometry="2"/>
</Products>
</Reaction>
</ReactionsList>
<SpeciesList>
<Species>
<Id>A</Id>
<Description>Species #1</Description>
<InitialPopulation>10000</InitialPopulation>
</Species>
<Species>
<Id>B</Id>
<Description>Species #2</Description>
<InitialPopulation>1</InitialPopulation>
</Species>
</SpeciesList>
</Model>
'''
[docs]def species_in_rxns(rxns: Iterable[Reaction]) -> List[Specie]:
"""
Args:
rxns: iterable of :any:`Reaction`'s
Returns: list of species (without repetitions) in :any:`Reaction`'s in `rxns`
"""
species_set: Set[Specie] = set()
species_list: List[Specie] = []
for rxn in rxns:
for sp in rxn.reactants.species + rxn.products.species:
if sp not in species_set:
species_set.add(sp)
species_list.append(sp)
return species_list
[docs]def write_stochkit_file(filename: str, rxns: Iterable[Reaction], init_config: Dict[Specie, int],
volume: float = 1.0, name: str = 'CRN') -> None:
"""
Write stochkit file
Args:
filename: name of file to write
rxns: reactions to translate to StochKit format
init_config: dict mapping each :any:`Specie` to its initial count
volume: volume in liters
name: name of the CRN
"""
xml = stochkit_format(rxns, init_config, volume, name)
with open(filename, 'w') as f:
f.write(xml)