Module riskmodels.adequacy.acg_models
This module implements models for available conventional generation (ACG). The implementation of the non-sequential models assumes generating units can be either fully available or fully unavailable at any given time. The sequential model represents each generating unit as a Markov chain whose transition probabilities are based on their overall availability and mean time to repair values. Both models assume statistical independence between generating units.
Expand source code
"""
This module implements models for available conventional generation (ACG). The implementation of the non-sequential models assumes generating units can be either fully available or fully unavailable at any given time. The sequential model represents each generating unit as a Markov chain whose transition probabilities are based on their overall availability and mean time to repair values. Both models assume statistical independence between generating units.
"""
from __future__ import annotations
import warnings
import typing as t
from riskmodels.univariate import Binned
import numpy as np
import pandas as pd
from c_sequential_models_api import ffi, lib as C_API
class NonSequential(Binned):
"""Available conventional generation model in which generators are assumed statistically independent of each other, and each one can be either 100% or 0% available at any given time with a certain probability. No serial correlation between states is assumed, i.e. this is a non-sequential or time-collapsed model. See class methods `from_generator_df` and `from_generator_csv_file` to instantiate this class."""
@classmethod
def from_generator_df(cls, df: pd.DataFrame) -> NonSequential:
"""Takes a dataframe object and builds the generation model from it.
Args:
df (pd.DataFrame): dataframe with colums 'availability' and 'capacity', where the former is the probability of the generating unit being available and the latter the nameplate capacity; each row represents an individual generator
Returns:
NonSequential: fitted model
"""
warnings.warn("Coercing capacity values to integers")
capacity_values = np.array(df["capacity"], dtype=np.int32)
availability_values = np.array(df["availability"])
if np.any(availability_values < 0) or np.any(availability_values > 1):
raise Exception(
f"Availabilities must be in the interval [0,1]; found interval[{min(availability_values)},{max(availability_values)}]"
)
max_gen = int(np.sum(capacity_values[capacity_values >= 0]))
min_gen = int(np.sum(capacity_values[capacity_values < 0]))
zero_idx = np.abs(
min_gen
) # this is in case there are generators with negative generation
pdf_length = max_gen + 1 - min_gen
pdf = np.zeros((pdf_length,), dtype=np.float64) # initialise pdf values
pdf[zero_idx] = 1.0
i = 0
for c, p in zip(capacity_values, availability_values):
if c >= 0:
suffix = pdf[0 : pdf_length - c]
preffix = np.zeros((c,))
else:
preffix = pdf[np.abs(c) : pdf_length]
suffix = np.zeros((np.abs(c),))
pdf = (1 - p) * pdf + p * np.concatenate([preffix, suffix])
i += 1
support = np.arange(min_gen, max_gen + 1)
return Binned(support=support, pdf_values=pdf, data=None)
@classmethod
def from_generator_csv_file(cls, file_path: str, **kwargs) -> NonSequential:
"""Takes a csv file and builds the generation model
Args:
file_path (str): Path to csv file. It must have colums 'availability' and 'capacity', where the former is the probability of the generating unit being available and the latter the nameplate capacity; each row represents an individual generator
**kwargs: additional arguments passed to pandas.read_csv
Returns:
NonSequential: fitted model
"""
df = pd.read_csv(file_path, **kwargs)
return cls.from_generator_df(df)
class Sequential(NonSequential):
"""Available conventional generation model in which generators are modelled as Markov chains and are assumed to be independent of each other. The methods `from_generator_df` and `from_generator_file` can be used to instantiate this class when 2-state Markov chains are used (on-off availability for each generating unit without de-rated states), see the cited methods for details.
To simulate Markov chain models with a different set of statess (e.g. de-rated states), the class can be instantiated through its constructor by passing named parameters for the transition matrices and chain states. See the argument specification below.
Args:
transition_matrices (np.ndarray): three-dimensional array where axis 0 represents generation units, and axis 1 and 2 represent transition matrix dimensions
chain_states (np.ndarray): two-dimensional array where axis 0 represents generation units and axis 1 represents the array of the unit's states
"""
transition_matrices: np.ndarray
chain_states: np.ndarray
def __str__(self):
max_cap = np.sum([s[0] for s in self.chain_states])
return f"Markov chain generation model with {len(self.transition_matrices)} generators and {max_cap} maximum capacity"
@property
def stationary_distributions(self):
return np.array(
[self.get_stationary_dist(mat) for mat in self.transition_matrices]
)
@classmethod
def build_chains(cls, df: pd.DataFrame) -> t.Tuple[np.ndarray, np.ndarray]:
"""Takes a dataframe with generator data (availability and mean time to repair) and returns the corresponding Markov chain states and transition matrices. Here, the availability probability determines the stationary distribution
Args:
df (pd.DataFrame): Generator data with columns 'availability' and 'mttr'
Returns:
t.Tuple[np.ndarray, np.ndarray]
"""
def get_transition_matrix(generator_data):
prob, mttr = generator_data
alpha = 1 - 1 / mttr
a11 = 1 - (1 - prob) * (1 - alpha) / prob
mat = np.array([[a11, 1 - a11], [1 - alpha, alpha]], dtype=np.float32)
mat = mat / np.sum(mat, axis=1)
return mat
states = np.array([np.array([x, 0.0]) for x in df["capacity"]], dtype=np.int32)
transition_matrices = np.apply_along_axis(
get_transition_matrix, 1, np.array(df[["availability", "mttr"]])
)
return states, transition_matrices
@classmethod
def sample_stationary_dists(
cls, transition_matrices: np.ndarray, chain_states: np.ndarray
) -> np.ndarray:
"""Sample states from the stationary distribution of transition probability matrices
Args:
transition_matrices (np.ndarray): array of transition probability matrices
chain_states (np.ndarray): two-dimensional array with state vectors for each transition matrix
Returns:
np.ndarray: array with sampled state for each transition matrix
"""
sample = []
for mat, states in zip(transition_matrices, chain_states):
stationary_dist = cls.get_stationary_dist(mat)
s = np.random.choice(states, size=1, p=stationary_dist)
sample.append(s)
return np.array(sample)
@classmethod
def get_stationary_dist(cls, mat: np.ndarray) -> np.ndarray:
"""Compute stationary probability distribution over states for a Markov chain
Args:
mat (np.ndarray): transition probability matrix
Returns:
np.ndarray: vector with stationary probability values for each state
"""
# from somewhere in stackoverflow
evals, evecs = np.linalg.eig(mat.T)
evec1 = evecs[:, np.isclose(evals, 1)]
# Since np.isclose will return an array, we've indexed with an array
# so we still have our 2nd axis. Get rid of it, since it's only size 1.
if evec1.shape[1] == 0:
raise Exception("Some generators might not have a stationary distribution")
evec1 = evec1[:, 0]
stationary = evec1 / evec1.sum()
# eigs finds complex eigenvalues and eigenvectors, so you'll want the real part.
stationary = stationary.real
return stationary
@classmethod
def from_generator_df(cls, df: pd.DataFrame) -> Sequential:
"""Takes a dataframe object and builds the generation model from it.
Args:
df (pd.DataFrame): dataframe with colums 'availability' and 'capacity', where the former is the probability that the generating unit is available (i.e. stationary availability probability) and the latter is the unit's nameplate capacity; in additon, an 'mttr' column with estimated mean times to repair per unit should be present. Each row represents an individual generator.
Returns:
Sequential: fitted model
"""
time_collapsed = super().from_generator_df(df)
df["capacity"] = df["capacity"].astype(
np.int32
) # for consistency between time-collapsed and time-dependent logic
states, matrices = cls.build_chains(df)
return cls(
pdf_values=time_collapsed.pdf_values,
support=time_collapsed.support,
data=time_collapsed.data,
transition_matrices=matrices,
chain_states=states,
)
@classmethod
def simulate_chains(
cls,
size: int,
trace_length: int,
transition_matrices: np.ndarray,
chain_states: np.ndarray,
initial_state: np.ndarray = None,
simulate_escape_time: bool = True,
output_array: np.ndarray = None,
seed: int = None,
) -> np.Optional[np.ndarray]:
"""Simulate multiple traces in which each one represents the aggregate of multiple Markov chains. This method samples a single large sequential trace and then split it into multiple subtraces; trace endpoints are therefore dependent.
Args:
size (int): Number of traces to simulate
trace_length (int): length of individual traces
transition_matrices (np.ndarray): three-dimensional array containing transition probability matrices for all chains where the first dimension corresponds to generating units and the last two dimensions correspond to transition matrices.
chain_states (np.ndarray): two-dimensional array with vectors of chain states for all chains where the first dimension corresponds to generating units and the second one to the state set. Note that every generating unit must have the same number of states.
initial_state (np.ndarray, optional): one-dimensional array with the initial state indices for all chains. The indices must correspond to a row in the transition matrices. If None, initial states are sampled from the stationary distributions of each chain.
simulate_escape_time (bool, optional): If True, simulate chains through time-of-escape simulations. If false, simulate each timestep individually.
output_array (np.ndarray, optional): Array where results are to be stored. If not provided, one is created.
seed (int, optional): Random seed passed to C backend. If not given, numpy's random numbers are used to initialise it.
No Longer Returned:
np.Optional[np.ndarray]: two-dimensional array in which each row represent an individual simulated trace. If an output array is passed as input, None is returned.
No Longer Raises:
ValueError: Description
"""
n_chains = len(chain_states)
n_states = len(chain_states[0])
# set output array
if output_array is not None:
if (
not isinstance(output_array, np.ndarray)
or len(output_array.shape) != 2
or output_array.shape != (size, trace_length)
or output_array.dtype != np.float32
):
raise ValueError(
"output_array must be a two-dimensional numpy array with shape (size, trace_length) of type numpy.float32"
)
return_output = False
else:
output_array = np.ascontiguousarray(
np.zeros((size, trace_length)), dtype=np.float32
)
return_output = True
# if seed is None:
# seed = np.random.randint(low=0,high=2**31-1)
# else:
# np.random.seed(seed) #both numpy and C seeds are the same if provided; this is needed for the initial state which is computed in Python
seed = np.random.randint(low=0, high=2**20 - 1) if seed is None else seed
# print(f"C seed: {seed}")
# set initial state array
if initial_state is None:
initial_state = cls.sample_stationary_dists(
transition_matrices, chain_states
).reshape(-1)
else:
if (
not isinstance(initial_state, np.ndarray)
or len(initial_state.shape) != 1
or len(initial_state) != n_chains
):
# print(initial_state)
# print(initial_state.shape)
raise ValueError(
"Initial state vector must be a one-dimensional numpy array with as many entries as transition_matrices."
)
if np.any(np.array([len(s) for s in chain_states]) != n_states):
raise ValueError("Number of states must be the same for all chains")
if np.any(
np.array([s.shape != (n_states, n_states) for s in transition_matrices])
):
raise ValueError("Matrices must be square and of the same dimensions")
# call C program
if trace_length <= 1:
raise ValueError("Trace length must be an integer larger than 1")
# cast as float
initial_state = np.ascontiguousarray(initial_state).astype(np.float32)
float_chain_states = chain_states.astype(np.float32)
C_API.simulate_mc_power_grid_py_interface(
ffi.cast("float *", output_array.ctypes.data),
ffi.cast("float *", transition_matrices.ctypes.data),
ffi.cast("float *", float_chain_states.ctypes.data),
ffi.cast("float *", initial_state.ctypes.data),
np.int32(n_chains),
np.int32(size),
np.int32(
trace_length - 1
), # initial state is accounted for in trace length
np.int32(n_states),
np.int32(seed),
np.int32(simulate_escape_time),
)
if return_output:
return output_array
def simulate(self, size: int, seed: int = None) -> np.ndarray:
"""Simulate a single trace of sequential observations
Args:
size (int): Number of samples, or equivalently, trace length.
seed (int, optional): Random seed passed to C backend. If not given, numpy's random numbers are used to initialise it.
Returns:
np.ndarray
"""
return self.simulate_chains(
size=1,
trace_length=size,
transition_matrices=self.transition_matrices,
chain_states=self.chain_states,
initial_state=None,
simulate_escape_time=True,
seed=seed,
).reshape(-1)
def simulate_seasons(
self,
size: int,
season_length: int,
seasons_per_trace: int = 1,
burn_in: int = 100,
seed: int = None,
) -> np.ndarray:
"""Simulate multiple traces of available conventional generation; each trace can have one or more peak seasons in it, depending on whether streaks of multiple years need to be sampled.
Args:
size (int): number of traces to sample
season_length (int): peak season length
seasons_per_trace (int, optional): Number of seasons per trace. The default is 1.
burn_in (int, optional): burn-in period between individual peak season traces; this is needed because in order to sample them, a large sequence is generated and subsequently subdivided, thus making trace endpoints correlated if a burn-in period is not allowed.
seed (int, optional): Random seed passed to C backend. If not given, numpy's random numbers are used to initialise it.
No Longer Returned:
np.ndarray: two-dimensional array where each row represent a sampled peak season of available conventional generation.
"""
total_seasons = size * seasons_per_trace
augmented_season_length = season_length + burn_in
output_array = self.simulate_chains(
size=total_seasons,
trace_length=augmented_season_length,
transition_matrices=self.transition_matrices,
chain_states=self.chain_states,
initial_state=None,
simulate_escape_time=True,
seed=seed,
)
# drop burn in periods and reshape
return output_array[:, 0:season_length].reshape(
(size, season_length * seasons_per_trace)
)
Classes
class NonSequential (**data: Any)
-
Available conventional generation model in which generators are assumed statistically independent of each other, and each one can be either 100% or 0% available at any given time with a certain probability. No serial correlation between states is assumed, i.e. this is a non-sequential or time-collapsed model. See class methods
from_generator_df
andfrom_generator_csv_file
to instantiate this class.Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class NonSequential(Binned): """Available conventional generation model in which generators are assumed statistically independent of each other, and each one can be either 100% or 0% available at any given time with a certain probability. No serial correlation between states is assumed, i.e. this is a non-sequential or time-collapsed model. See class methods `from_generator_df` and `from_generator_csv_file` to instantiate this class.""" @classmethod def from_generator_df(cls, df: pd.DataFrame) -> NonSequential: """Takes a dataframe object and builds the generation model from it. Args: df (pd.DataFrame): dataframe with colums 'availability' and 'capacity', where the former is the probability of the generating unit being available and the latter the nameplate capacity; each row represents an individual generator Returns: NonSequential: fitted model """ warnings.warn("Coercing capacity values to integers") capacity_values = np.array(df["capacity"], dtype=np.int32) availability_values = np.array(df["availability"]) if np.any(availability_values < 0) or np.any(availability_values > 1): raise Exception( f"Availabilities must be in the interval [0,1]; found interval[{min(availability_values)},{max(availability_values)}]" ) max_gen = int(np.sum(capacity_values[capacity_values >= 0])) min_gen = int(np.sum(capacity_values[capacity_values < 0])) zero_idx = np.abs( min_gen ) # this is in case there are generators with negative generation pdf_length = max_gen + 1 - min_gen pdf = np.zeros((pdf_length,), dtype=np.float64) # initialise pdf values pdf[zero_idx] = 1.0 i = 0 for c, p in zip(capacity_values, availability_values): if c >= 0: suffix = pdf[0 : pdf_length - c] preffix = np.zeros((c,)) else: preffix = pdf[np.abs(c) : pdf_length] suffix = np.zeros((np.abs(c),)) pdf = (1 - p) * pdf + p * np.concatenate([preffix, suffix]) i += 1 support = np.arange(min_gen, max_gen + 1) return Binned(support=support, pdf_values=pdf, data=None) @classmethod def from_generator_csv_file(cls, file_path: str, **kwargs) -> NonSequential: """Takes a csv file and builds the generation model Args: file_path (str): Path to csv file. It must have colums 'availability' and 'capacity', where the former is the probability of the generating unit being available and the latter the nameplate capacity; each row represents an individual generator **kwargs: additional arguments passed to pandas.read_csv Returns: NonSequential: fitted model """ df = pd.read_csv(file_path, **kwargs) return cls.from_generator_df(df)
Ancestors
- Binned
- Empirical
- BaseDistribution
- pydantic.main.BaseModel
- pydantic.utils.Representation
Subclasses
Class variables
var data : Optional[numpy.ndarray]
var pdf_values : numpy.ndarray
var support : numpy.ndarray
Static methods
def from_generator_csv_file(file_path: str, **kwargs) ‑> NonSequential
-
Takes a csv file and builds the generation model
Args
file_path
:str
- Path to csv file. It must have colums 'availability' and 'capacity', where the former is the probability of the generating unit being available and the latter the nameplate capacity; each row represents an individual generator
**kwargs
- additional arguments passed to pandas.read_csv
Returns
NonSequential
- fitted model
Expand source code
@classmethod def from_generator_csv_file(cls, file_path: str, **kwargs) -> NonSequential: """Takes a csv file and builds the generation model Args: file_path (str): Path to csv file. It must have colums 'availability' and 'capacity', where the former is the probability of the generating unit being available and the latter the nameplate capacity; each row represents an individual generator **kwargs: additional arguments passed to pandas.read_csv Returns: NonSequential: fitted model """ df = pd.read_csv(file_path, **kwargs) return cls.from_generator_df(df)
def from_generator_df(df: pd.DataFrame) ‑> NonSequential
-
Takes a dataframe object and builds the generation model from it.
Args
df
:pd.DataFrame
- dataframe with colums 'availability' and 'capacity', where the former is the probability of the generating unit being available and the latter the nameplate capacity; each row represents an individual generator
Returns
NonSequential
- fitted model
Expand source code
@classmethod def from_generator_df(cls, df: pd.DataFrame) -> NonSequential: """Takes a dataframe object and builds the generation model from it. Args: df (pd.DataFrame): dataframe with colums 'availability' and 'capacity', where the former is the probability of the generating unit being available and the latter the nameplate capacity; each row represents an individual generator Returns: NonSequential: fitted model """ warnings.warn("Coercing capacity values to integers") capacity_values = np.array(df["capacity"], dtype=np.int32) availability_values = np.array(df["availability"]) if np.any(availability_values < 0) or np.any(availability_values > 1): raise Exception( f"Availabilities must be in the interval [0,1]; found interval[{min(availability_values)},{max(availability_values)}]" ) max_gen = int(np.sum(capacity_values[capacity_values >= 0])) min_gen = int(np.sum(capacity_values[capacity_values < 0])) zero_idx = np.abs( min_gen ) # this is in case there are generators with negative generation pdf_length = max_gen + 1 - min_gen pdf = np.zeros((pdf_length,), dtype=np.float64) # initialise pdf values pdf[zero_idx] = 1.0 i = 0 for c, p in zip(capacity_values, availability_values): if c >= 0: suffix = pdf[0 : pdf_length - c] preffix = np.zeros((c,)) else: preffix = pdf[np.abs(c) : pdf_length] suffix = np.zeros((np.abs(c),)) pdf = (1 - p) * pdf + p * np.concatenate([preffix, suffix]) i += 1 support = np.arange(min_gen, max_gen + 1) return Binned(support=support, pdf_values=pdf, data=None)
Inherited members
class Sequential (**data: Any)
-
Available conventional generation model in which generators are modelled as Markov chains and are assumed to be independent of each other. The methods
from_generator_df
andfrom_generator_file
can be used to instantiate this class when 2-state Markov chains are used (on-off availability for each generating unit without de-rated states), see the cited methods for details. To simulate Markov chain models with a different set of statess (e.g. de-rated states), the class can be instantiated through its constructor by passing named parameters for the transition matrices and chain states. See the argument specification below.Args
transition_matrices
:np.ndarray
- three-dimensional array where axis 0 represents generation units, and axis 1 and 2 represent transition matrix dimensions
chain_states
:np.ndarray
- two-dimensional array where axis 0 represents generation units and axis 1 represents the array of the unit's states
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class Sequential(NonSequential): """Available conventional generation model in which generators are modelled as Markov chains and are assumed to be independent of each other. The methods `from_generator_df` and `from_generator_file` can be used to instantiate this class when 2-state Markov chains are used (on-off availability for each generating unit without de-rated states), see the cited methods for details. To simulate Markov chain models with a different set of statess (e.g. de-rated states), the class can be instantiated through its constructor by passing named parameters for the transition matrices and chain states. See the argument specification below. Args: transition_matrices (np.ndarray): three-dimensional array where axis 0 represents generation units, and axis 1 and 2 represent transition matrix dimensions chain_states (np.ndarray): two-dimensional array where axis 0 represents generation units and axis 1 represents the array of the unit's states """ transition_matrices: np.ndarray chain_states: np.ndarray def __str__(self): max_cap = np.sum([s[0] for s in self.chain_states]) return f"Markov chain generation model with {len(self.transition_matrices)} generators and {max_cap} maximum capacity" @property def stationary_distributions(self): return np.array( [self.get_stationary_dist(mat) for mat in self.transition_matrices] ) @classmethod def build_chains(cls, df: pd.DataFrame) -> t.Tuple[np.ndarray, np.ndarray]: """Takes a dataframe with generator data (availability and mean time to repair) and returns the corresponding Markov chain states and transition matrices. Here, the availability probability determines the stationary distribution Args: df (pd.DataFrame): Generator data with columns 'availability' and 'mttr' Returns: t.Tuple[np.ndarray, np.ndarray] """ def get_transition_matrix(generator_data): prob, mttr = generator_data alpha = 1 - 1 / mttr a11 = 1 - (1 - prob) * (1 - alpha) / prob mat = np.array([[a11, 1 - a11], [1 - alpha, alpha]], dtype=np.float32) mat = mat / np.sum(mat, axis=1) return mat states = np.array([np.array([x, 0.0]) for x in df["capacity"]], dtype=np.int32) transition_matrices = np.apply_along_axis( get_transition_matrix, 1, np.array(df[["availability", "mttr"]]) ) return states, transition_matrices @classmethod def sample_stationary_dists( cls, transition_matrices: np.ndarray, chain_states: np.ndarray ) -> np.ndarray: """Sample states from the stationary distribution of transition probability matrices Args: transition_matrices (np.ndarray): array of transition probability matrices chain_states (np.ndarray): two-dimensional array with state vectors for each transition matrix Returns: np.ndarray: array with sampled state for each transition matrix """ sample = [] for mat, states in zip(transition_matrices, chain_states): stationary_dist = cls.get_stationary_dist(mat) s = np.random.choice(states, size=1, p=stationary_dist) sample.append(s) return np.array(sample) @classmethod def get_stationary_dist(cls, mat: np.ndarray) -> np.ndarray: """Compute stationary probability distribution over states for a Markov chain Args: mat (np.ndarray): transition probability matrix Returns: np.ndarray: vector with stationary probability values for each state """ # from somewhere in stackoverflow evals, evecs = np.linalg.eig(mat.T) evec1 = evecs[:, np.isclose(evals, 1)] # Since np.isclose will return an array, we've indexed with an array # so we still have our 2nd axis. Get rid of it, since it's only size 1. if evec1.shape[1] == 0: raise Exception("Some generators might not have a stationary distribution") evec1 = evec1[:, 0] stationary = evec1 / evec1.sum() # eigs finds complex eigenvalues and eigenvectors, so you'll want the real part. stationary = stationary.real return stationary @classmethod def from_generator_df(cls, df: pd.DataFrame) -> Sequential: """Takes a dataframe object and builds the generation model from it. Args: df (pd.DataFrame): dataframe with colums 'availability' and 'capacity', where the former is the probability that the generating unit is available (i.e. stationary availability probability) and the latter is the unit's nameplate capacity; in additon, an 'mttr' column with estimated mean times to repair per unit should be present. Each row represents an individual generator. Returns: Sequential: fitted model """ time_collapsed = super().from_generator_df(df) df["capacity"] = df["capacity"].astype( np.int32 ) # for consistency between time-collapsed and time-dependent logic states, matrices = cls.build_chains(df) return cls( pdf_values=time_collapsed.pdf_values, support=time_collapsed.support, data=time_collapsed.data, transition_matrices=matrices, chain_states=states, ) @classmethod def simulate_chains( cls, size: int, trace_length: int, transition_matrices: np.ndarray, chain_states: np.ndarray, initial_state: np.ndarray = None, simulate_escape_time: bool = True, output_array: np.ndarray = None, seed: int = None, ) -> np.Optional[np.ndarray]: """Simulate multiple traces in which each one represents the aggregate of multiple Markov chains. This method samples a single large sequential trace and then split it into multiple subtraces; trace endpoints are therefore dependent. Args: size (int): Number of traces to simulate trace_length (int): length of individual traces transition_matrices (np.ndarray): three-dimensional array containing transition probability matrices for all chains where the first dimension corresponds to generating units and the last two dimensions correspond to transition matrices. chain_states (np.ndarray): two-dimensional array with vectors of chain states for all chains where the first dimension corresponds to generating units and the second one to the state set. Note that every generating unit must have the same number of states. initial_state (np.ndarray, optional): one-dimensional array with the initial state indices for all chains. The indices must correspond to a row in the transition matrices. If None, initial states are sampled from the stationary distributions of each chain. simulate_escape_time (bool, optional): If True, simulate chains through time-of-escape simulations. If false, simulate each timestep individually. output_array (np.ndarray, optional): Array where results are to be stored. If not provided, one is created. seed (int, optional): Random seed passed to C backend. If not given, numpy's random numbers are used to initialise it. No Longer Returned: np.Optional[np.ndarray]: two-dimensional array in which each row represent an individual simulated trace. If an output array is passed as input, None is returned. No Longer Raises: ValueError: Description """ n_chains = len(chain_states) n_states = len(chain_states[0]) # set output array if output_array is not None: if ( not isinstance(output_array, np.ndarray) or len(output_array.shape) != 2 or output_array.shape != (size, trace_length) or output_array.dtype != np.float32 ): raise ValueError( "output_array must be a two-dimensional numpy array with shape (size, trace_length) of type numpy.float32" ) return_output = False else: output_array = np.ascontiguousarray( np.zeros((size, trace_length)), dtype=np.float32 ) return_output = True # if seed is None: # seed = np.random.randint(low=0,high=2**31-1) # else: # np.random.seed(seed) #both numpy and C seeds are the same if provided; this is needed for the initial state which is computed in Python seed = np.random.randint(low=0, high=2**20 - 1) if seed is None else seed # print(f"C seed: {seed}") # set initial state array if initial_state is None: initial_state = cls.sample_stationary_dists( transition_matrices, chain_states ).reshape(-1) else: if ( not isinstance(initial_state, np.ndarray) or len(initial_state.shape) != 1 or len(initial_state) != n_chains ): # print(initial_state) # print(initial_state.shape) raise ValueError( "Initial state vector must be a one-dimensional numpy array with as many entries as transition_matrices." ) if np.any(np.array([len(s) for s in chain_states]) != n_states): raise ValueError("Number of states must be the same for all chains") if np.any( np.array([s.shape != (n_states, n_states) for s in transition_matrices]) ): raise ValueError("Matrices must be square and of the same dimensions") # call C program if trace_length <= 1: raise ValueError("Trace length must be an integer larger than 1") # cast as float initial_state = np.ascontiguousarray(initial_state).astype(np.float32) float_chain_states = chain_states.astype(np.float32) C_API.simulate_mc_power_grid_py_interface( ffi.cast("float *", output_array.ctypes.data), ffi.cast("float *", transition_matrices.ctypes.data), ffi.cast("float *", float_chain_states.ctypes.data), ffi.cast("float *", initial_state.ctypes.data), np.int32(n_chains), np.int32(size), np.int32( trace_length - 1 ), # initial state is accounted for in trace length np.int32(n_states), np.int32(seed), np.int32(simulate_escape_time), ) if return_output: return output_array def simulate(self, size: int, seed: int = None) -> np.ndarray: """Simulate a single trace of sequential observations Args: size (int): Number of samples, or equivalently, trace length. seed (int, optional): Random seed passed to C backend. If not given, numpy's random numbers are used to initialise it. Returns: np.ndarray """ return self.simulate_chains( size=1, trace_length=size, transition_matrices=self.transition_matrices, chain_states=self.chain_states, initial_state=None, simulate_escape_time=True, seed=seed, ).reshape(-1) def simulate_seasons( self, size: int, season_length: int, seasons_per_trace: int = 1, burn_in: int = 100, seed: int = None, ) -> np.ndarray: """Simulate multiple traces of available conventional generation; each trace can have one or more peak seasons in it, depending on whether streaks of multiple years need to be sampled. Args: size (int): number of traces to sample season_length (int): peak season length seasons_per_trace (int, optional): Number of seasons per trace. The default is 1. burn_in (int, optional): burn-in period between individual peak season traces; this is needed because in order to sample them, a large sequence is generated and subsequently subdivided, thus making trace endpoints correlated if a burn-in period is not allowed. seed (int, optional): Random seed passed to C backend. If not given, numpy's random numbers are used to initialise it. No Longer Returned: np.ndarray: two-dimensional array where each row represent a sampled peak season of available conventional generation. """ total_seasons = size * seasons_per_trace augmented_season_length = season_length + burn_in output_array = self.simulate_chains( size=total_seasons, trace_length=augmented_season_length, transition_matrices=self.transition_matrices, chain_states=self.chain_states, initial_state=None, simulate_escape_time=True, seed=seed, ) # drop burn in periods and reshape return output_array[:, 0:season_length].reshape( (size, season_length * seasons_per_trace) )
Ancestors
- NonSequential
- Binned
- Empirical
- BaseDistribution
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var chain_states : numpy.ndarray
var transition_matrices : numpy.ndarray
Static methods
def build_chains(df: pd.DataFrame) ‑> Tuple[numpy.ndarray, numpy.ndarray]
-
Takes a dataframe with generator data (availability and mean time to repair) and returns the corresponding Markov chain states and transition matrices. Here, the availability probability determines the stationary distribution
Args
df
:pd.DataFrame
- Generator data with columns 'availability' and 'mttr'
Returns
t.Tuple[np.ndarray, np.ndarray]
Expand source code
@classmethod def build_chains(cls, df: pd.DataFrame) -> t.Tuple[np.ndarray, np.ndarray]: """Takes a dataframe with generator data (availability and mean time to repair) and returns the corresponding Markov chain states and transition matrices. Here, the availability probability determines the stationary distribution Args: df (pd.DataFrame): Generator data with columns 'availability' and 'mttr' Returns: t.Tuple[np.ndarray, np.ndarray] """ def get_transition_matrix(generator_data): prob, mttr = generator_data alpha = 1 - 1 / mttr a11 = 1 - (1 - prob) * (1 - alpha) / prob mat = np.array([[a11, 1 - a11], [1 - alpha, alpha]], dtype=np.float32) mat = mat / np.sum(mat, axis=1) return mat states = np.array([np.array([x, 0.0]) for x in df["capacity"]], dtype=np.int32) transition_matrices = np.apply_along_axis( get_transition_matrix, 1, np.array(df[["availability", "mttr"]]) ) return states, transition_matrices
def from_generator_df(df: pd.DataFrame) ‑> Sequential
-
Takes a dataframe object and builds the generation model from it.
Args
df
:pd.DataFrame
- dataframe with colums 'availability' and 'capacity', where the former is the probability that the generating unit is available (i.e. stationary availability probability) and the latter is the unit's nameplate capacity; in additon, an 'mttr' column with estimated mean times to repair per unit should be present. Each row represents an individual generator.
Returns
Sequential
- fitted model
Expand source code
@classmethod def from_generator_df(cls, df: pd.DataFrame) -> Sequential: """Takes a dataframe object and builds the generation model from it. Args: df (pd.DataFrame): dataframe with colums 'availability' and 'capacity', where the former is the probability that the generating unit is available (i.e. stationary availability probability) and the latter is the unit's nameplate capacity; in additon, an 'mttr' column with estimated mean times to repair per unit should be present. Each row represents an individual generator. Returns: Sequential: fitted model """ time_collapsed = super().from_generator_df(df) df["capacity"] = df["capacity"].astype( np.int32 ) # for consistency between time-collapsed and time-dependent logic states, matrices = cls.build_chains(df) return cls( pdf_values=time_collapsed.pdf_values, support=time_collapsed.support, data=time_collapsed.data, transition_matrices=matrices, chain_states=states, )
def get_stationary_dist(mat: np.ndarray) ‑> numpy.ndarray
-
Compute stationary probability distribution over states for a Markov chain
Args
mat
:np.ndarray
- transition probability matrix
Returns
np.ndarray
- vector with stationary probability values for each state
Expand source code
@classmethod def get_stationary_dist(cls, mat: np.ndarray) -> np.ndarray: """Compute stationary probability distribution over states for a Markov chain Args: mat (np.ndarray): transition probability matrix Returns: np.ndarray: vector with stationary probability values for each state """ # from somewhere in stackoverflow evals, evecs = np.linalg.eig(mat.T) evec1 = evecs[:, np.isclose(evals, 1)] # Since np.isclose will return an array, we've indexed with an array # so we still have our 2nd axis. Get rid of it, since it's only size 1. if evec1.shape[1] == 0: raise Exception("Some generators might not have a stationary distribution") evec1 = evec1[:, 0] stationary = evec1 / evec1.sum() # eigs finds complex eigenvalues and eigenvectors, so you'll want the real part. stationary = stationary.real return stationary
def sample_stationary_dists(transition_matrices: np.ndarray, chain_states: np.ndarray) ‑> numpy.ndarray
-
Sample states from the stationary distribution of transition probability matrices
Args
transition_matrices
:np.ndarray
- array of transition probability matrices
chain_states
:np.ndarray
- two-dimensional array with state vectors for each transition matrix
Returns
np.ndarray
- array with sampled state for each transition matrix
Expand source code
@classmethod def sample_stationary_dists( cls, transition_matrices: np.ndarray, chain_states: np.ndarray ) -> np.ndarray: """Sample states from the stationary distribution of transition probability matrices Args: transition_matrices (np.ndarray): array of transition probability matrices chain_states (np.ndarray): two-dimensional array with state vectors for each transition matrix Returns: np.ndarray: array with sampled state for each transition matrix """ sample = [] for mat, states in zip(transition_matrices, chain_states): stationary_dist = cls.get_stationary_dist(mat) s = np.random.choice(states, size=1, p=stationary_dist) sample.append(s) return np.array(sample)
def simulate_chains(size: int, trace_length: int, transition_matrices: np.ndarray, chain_states: np.ndarray, initial_state: np.ndarray = None, simulate_escape_time: bool = True, output_array: np.ndarray = None, seed: int = None) ‑> np.Optional[np.ndarray]
-
Simulate multiple traces in which each one represents the aggregate of multiple Markov chains. This method samples a single large sequential trace and then split it into multiple subtraces; trace endpoints are therefore dependent.
Args
size
:int
- Number of traces to simulate
trace_length
:int
- length of individual traces
transition_matrices
:np.ndarray
- three-dimensional array containing transition probability matrices for all chains where the first dimension corresponds to generating units and the last two dimensions correspond to transition matrices.
chain_states
:np.ndarray
- two-dimensional array with vectors of chain states for all chains where the first dimension corresponds to generating units and the second one to the state set. Note that every generating unit must have the same number of states.
initial_state
:np.ndarray
, optional- one-dimensional array with the initial state indices for all chains. The indices must correspond to a row in the transition matrices. If None, initial states are sampled from the stationary distributions of each chain.
simulate_escape_time
:bool
, optional- If True, simulate chains through time-of-escape simulations. If false, simulate each timestep individually.
output_array
:np.ndarray
, optional- Array where results are to be stored. If not provided, one is created.
seed
:int
, optional- Random seed passed to C backend. If not given, numpy's random numbers are used to initialise it.
No Longer Returned: np.Optional[np.ndarray]: two-dimensional array in which each row represent an individual simulated trace. If an output array is passed as input, None is returned.
No Longer Raises: ValueError: Description
Expand source code
@classmethod def simulate_chains( cls, size: int, trace_length: int, transition_matrices: np.ndarray, chain_states: np.ndarray, initial_state: np.ndarray = None, simulate_escape_time: bool = True, output_array: np.ndarray = None, seed: int = None, ) -> np.Optional[np.ndarray]: """Simulate multiple traces in which each one represents the aggregate of multiple Markov chains. This method samples a single large sequential trace and then split it into multiple subtraces; trace endpoints are therefore dependent. Args: size (int): Number of traces to simulate trace_length (int): length of individual traces transition_matrices (np.ndarray): three-dimensional array containing transition probability matrices for all chains where the first dimension corresponds to generating units and the last two dimensions correspond to transition matrices. chain_states (np.ndarray): two-dimensional array with vectors of chain states for all chains where the first dimension corresponds to generating units and the second one to the state set. Note that every generating unit must have the same number of states. initial_state (np.ndarray, optional): one-dimensional array with the initial state indices for all chains. The indices must correspond to a row in the transition matrices. If None, initial states are sampled from the stationary distributions of each chain. simulate_escape_time (bool, optional): If True, simulate chains through time-of-escape simulations. If false, simulate each timestep individually. output_array (np.ndarray, optional): Array where results are to be stored. If not provided, one is created. seed (int, optional): Random seed passed to C backend. If not given, numpy's random numbers are used to initialise it. No Longer Returned: np.Optional[np.ndarray]: two-dimensional array in which each row represent an individual simulated trace. If an output array is passed as input, None is returned. No Longer Raises: ValueError: Description """ n_chains = len(chain_states) n_states = len(chain_states[0]) # set output array if output_array is not None: if ( not isinstance(output_array, np.ndarray) or len(output_array.shape) != 2 or output_array.shape != (size, trace_length) or output_array.dtype != np.float32 ): raise ValueError( "output_array must be a two-dimensional numpy array with shape (size, trace_length) of type numpy.float32" ) return_output = False else: output_array = np.ascontiguousarray( np.zeros((size, trace_length)), dtype=np.float32 ) return_output = True # if seed is None: # seed = np.random.randint(low=0,high=2**31-1) # else: # np.random.seed(seed) #both numpy and C seeds are the same if provided; this is needed for the initial state which is computed in Python seed = np.random.randint(low=0, high=2**20 - 1) if seed is None else seed # print(f"C seed: {seed}") # set initial state array if initial_state is None: initial_state = cls.sample_stationary_dists( transition_matrices, chain_states ).reshape(-1) else: if ( not isinstance(initial_state, np.ndarray) or len(initial_state.shape) != 1 or len(initial_state) != n_chains ): # print(initial_state) # print(initial_state.shape) raise ValueError( "Initial state vector must be a one-dimensional numpy array with as many entries as transition_matrices." ) if np.any(np.array([len(s) for s in chain_states]) != n_states): raise ValueError("Number of states must be the same for all chains") if np.any( np.array([s.shape != (n_states, n_states) for s in transition_matrices]) ): raise ValueError("Matrices must be square and of the same dimensions") # call C program if trace_length <= 1: raise ValueError("Trace length must be an integer larger than 1") # cast as float initial_state = np.ascontiguousarray(initial_state).astype(np.float32) float_chain_states = chain_states.astype(np.float32) C_API.simulate_mc_power_grid_py_interface( ffi.cast("float *", output_array.ctypes.data), ffi.cast("float *", transition_matrices.ctypes.data), ffi.cast("float *", float_chain_states.ctypes.data), ffi.cast("float *", initial_state.ctypes.data), np.int32(n_chains), np.int32(size), np.int32( trace_length - 1 ), # initial state is accounted for in trace length np.int32(n_states), np.int32(seed), np.int32(simulate_escape_time), ) if return_output: return output_array
Instance variables
var stationary_distributions
-
Expand source code
@property def stationary_distributions(self): return np.array( [self.get_stationary_dist(mat) for mat in self.transition_matrices] )
Methods
def simulate(self, size: int, seed: int = None) ‑> numpy.ndarray
-
Simulate a single trace of sequential observations
Args
size
:int
- Number of samples, or equivalently, trace length.
seed
:int
, optional- Random seed passed to C backend. If not given, numpy's random numbers are used to initialise it.
Returns
np.ndarray
Expand source code
def simulate(self, size: int, seed: int = None) -> np.ndarray: """Simulate a single trace of sequential observations Args: size (int): Number of samples, or equivalently, trace length. seed (int, optional): Random seed passed to C backend. If not given, numpy's random numbers are used to initialise it. Returns: np.ndarray """ return self.simulate_chains( size=1, trace_length=size, transition_matrices=self.transition_matrices, chain_states=self.chain_states, initial_state=None, simulate_escape_time=True, seed=seed, ).reshape(-1)
def simulate_seasons(self, size: int, season_length: int, seasons_per_trace: int = 1, burn_in: int = 100, seed: int = None) ‑> numpy.ndarray
-
Simulate multiple traces of available conventional generation; each trace can have one or more peak seasons in it, depending on whether streaks of multiple years need to be sampled.
Args
size
:int
- number of traces to sample
season_length
:int
- peak season length
seasons_per_trace
:int
, optional- Number of seasons per trace. The default is 1.
burn_in
:int
, optional- burn-in period between individual peak season traces; this is needed because in order to sample them, a large sequence is generated and subsequently subdivided, thus making trace endpoints correlated if a burn-in period is not allowed.
seed
:int
, optional- Random seed passed to C backend. If not given, numpy's random numbers are used to initialise it.
No Longer Returned: np.ndarray: two-dimensional array where each row represent a sampled peak season of available conventional generation.
Expand source code
def simulate_seasons( self, size: int, season_length: int, seasons_per_trace: int = 1, burn_in: int = 100, seed: int = None, ) -> np.ndarray: """Simulate multiple traces of available conventional generation; each trace can have one or more peak seasons in it, depending on whether streaks of multiple years need to be sampled. Args: size (int): number of traces to sample season_length (int): peak season length seasons_per_trace (int, optional): Number of seasons per trace. The default is 1. burn_in (int, optional): burn-in period between individual peak season traces; this is needed because in order to sample them, a large sequence is generated and subsequently subdivided, thus making trace endpoints correlated if a burn-in period is not allowed. seed (int, optional): Random seed passed to C backend. If not given, numpy's random numbers are used to initialise it. No Longer Returned: np.ndarray: two-dimensional array where each row represent a sampled peak season of available conventional generation. """ total_seasons = size * seasons_per_trace augmented_season_length = season_length + burn_in output_array = self.simulate_chains( size=total_seasons, trace_length=augmented_season_length, transition_matrices=self.transition_matrices, chain_states=self.chain_states, initial_state=None, simulate_escape_time=True, seed=seed, ) # drop burn in periods and reshape return output_array[:, 0:season_length].reshape( (size, season_length * seasons_per_trace) )
Inherited members