gamcoach.counterfactuals
Counterfactuals Class.
This module implements the Counterfactuals class. We use it to represent the generated counterfactual explanations.
1"""Counterfactuals Class. 2 3This module implements the Counterfactuals class. We use it to represent the 4generated counterfactual explanations. 5""" 6 7import numpy as np 8import pandas as pd 9import re 10import pulp 11 12from tqdm import tqdm 13from interpret.glassbox import ( 14 ExplainableBoostingClassifier, 15 ExplainableBoostingRegressor, 16) 17from time import time 18from collections import Counter 19from typing import Union 20 21SEED = 922 22 23 24class Counterfactuals: 25 """Class to represent GAM counterfactual explanations.""" 26 27 def __init__( 28 self, 29 solutions: list, 30 isSuccessful: bool, 31 model: pulp.LpProblem, 32 variables: dict, 33 ebm: Union[ExplainableBoostingClassifier, ExplainableBoostingRegressor], 34 cur_example: np.ndarray, 35 options: dict, 36 ): 37 """Initialize a Counterfactuals object. 38 39 Args: 40 solutions (list): List of generated `(active_variables, optimal value)`. 41 If successful, it should have `total_cfs` items. 42 isSuccessful (bool): True if the mixed-integer linear problem has 43 `total_cfs` number of solutions under all constraints. 44 model (LpProblem): Linear programming model 45 variables (dict): Dictionary containing all MILP variables, 46 `feature_name` -> [`variables`], 47 ebm (Union[ExplainableBoostingClassifier, ExplainableBoostingRegressor]): 48 The trained EBM model. 49 cur_example (np.ndarray): The original data point. 50 options (dict): Dictionary containing all eligible options for each 51 selected features. `feature_name` -> `[[target, score_gain, 52 distance, bin_id]]` 53 """ 54 self.isSuccessful = isSuccessful 55 """Boolean to indicate if the optimization is successful.""" 56 57 self.model = model 58 """MILP program model.""" 59 60 self.variables = variables 61 """MILP program variabels.""" 62 63 self.ebm = ebm 64 """The trained EBM model.""" 65 66 self.cur_example = cur_example[0] 67 """The original data point.""" 68 69 self.options = options 70 """All possible options.""" 71 72 self.solutions = solutions 73 """Solutions for MILP.""" 74 75 self.data: np.ndarray 76 """Generated CFs in the original data dataformat.""" 77 78 self.target_bins: list 79 """New bins used in each row of `data`.""" 80 81 self.values: list 82 """Corresponding objective values (total distance) of each `data` row.""" 83 84 self.convert_cfs_to_data(solutions) 85 86 def convert_cfs_to_data(self, solutions): 87 """Convert optimal CF solutions to the original dataformat.""" 88 89 self.data = [] 90 self.values = [] 91 self.target_ranges = [] 92 93 for active_variables, value in solutions: 94 cur_cf = self.cur_example.copy() 95 cur_target_ranges = [] 96 97 for var in active_variables: 98 # Skip interaction vars (included) 99 # In EBM, interaction names are `f1 x f2`, pulp's variable name 100 # is `f1_x_f2` 101 if "_x_" not in var.name: 102 f_name = re.sub(r"(.+):\d+", r"\1", var.name) 103 bin_i = int(re.sub(r".+:(\d+)", r"\1", var.name)) 104 105 # Find the original value 106 org_value = self.cur_example[self.ebm.feature_names.index(f_name)] 107 108 # Find the target bin 109 f_index = self.ebm.feature_names.index(f_name) 110 f_type = self.ebm.feature_types[f_index] 111 112 if f_type == "continuous": 113 bin_starts = self.ebm.preprocessor_._get_bin_labels(f_index)[ 114 :-1 115 ] 116 117 target_bin = "[{},".format(bin_starts[bin_i]) 118 119 if bin_i + 1 < len(bin_starts): 120 target_bin += " {})".format(bin_starts[bin_i + 1]) 121 else: 122 target_bin += " inf)" 123 else: 124 target_bin = "" 125 org_value = '"{}"'.format(org_value) 126 127 for option in self.options[f_name]: 128 if option[3] == bin_i: 129 target_value = option[0] 130 cur_cf[f_index] = target_value 131 132 if f_type == "continuous": 133 cur_target_ranges.append(target_bin) 134 else: 135 cur_target_ranges.append(option[0]) 136 break 137 138 self.data.append(cur_cf) 139 self.values.append(value) 140 self.target_ranges.append(cur_target_ranges) 141 142 self.data = np.vstack(self.data) 143 144 def show(self): 145 """ 146 Print the optimal solutions. 147 """ 148 count = 0 149 150 for active_variables, value in self.solutions: 151 count += 1 152 print("## Strategy {} ##".format(count)) 153 154 for var in active_variables: 155 # Skip interaction vars (included) 156 if "_x_" not in var.name: 157 f_name = re.sub(r"(.+):\d+", r"\1", var.name) 158 bin_i = int(re.sub(r".+:(\d+)", r"\1", var.name)) 159 160 # Find the original value 161 org_value = self.cur_example[self.ebm.feature_names.index(f_name)] 162 163 # Find the target bin 164 f_index = self.ebm.feature_names.index(f_name) 165 f_type = self.ebm.feature_types[f_index] 166 167 if f_type == "continuous": 168 bin_starts = self.ebm.preprocessor_._get_bin_labels(f_index)[ 169 :-1 170 ] 171 172 target_bin = "[{},".format(bin_starts[bin_i]) 173 174 if bin_i + 1 < len(bin_starts): 175 target_bin += " {})".format(bin_starts[bin_i + 1]) 176 else: 177 target_bin += " inf)" 178 else: 179 target_bin = "" 180 org_value = '"{}"'.format(org_value) 181 182 for option in self.options[f_name]: 183 if option[3] == bin_i: 184 new_value = ( 185 option[0] 186 if f_type == "continuous" 187 else '"{}"'.format(option[0]) 188 ) 189 190 print( 191 "Change <{}> from {} to {} {}".format( 192 f_name, org_value, new_value, target_bin 193 ) 194 ) 195 print( 196 "\t* score gain: {:.4f}\n\t* distance cost: {:.4f}".format( 197 option[1], option[2] 198 ) 199 ) 200 break 201 202 else: 203 f_name = re.sub(r"(.+):.+", r"\1", var.name) 204 f_name = f_name.replace("_x_", " x ") 205 bin_0 = int(re.sub(r".+:(\d+),\d+", r"\1", var.name)) 206 bin_1 = int(re.sub(r".+:\d+,(\d+)", r"\1", var.name)) 207 208 for option in self.options[f_name]: 209 if option[3][0] == bin_0 and option[3][1] == bin_1: 210 print("Trigger interaction term: <{}>".format(f_name)) 211 print( 212 "\t* score gain: {:.4f}\n\t* distance cost: {:.4f}".format( 213 option[1], 0 214 ) 215 ) 216 break 217 print() 218 219 def model_summary(self, verbose=True): 220 """Print out a summary of the MILP model.""" 221 222 if verbose: 223 print( 224 "Top {} solution to a MILP model with {} variables and {} constraints.".format( 225 self.data.shape[0], 226 self.model.numVariables(), 227 self.model.numConstraints(), 228 ) 229 ) 230 231 data_df = pd.DataFrame(self.data) 232 data_df.columns = np.array(self.ebm.feature_names)[ 233 [ 234 i 235 for i in range(len(self.ebm.feature_types)) 236 if self.ebm.feature_types[i] != "interaction" 237 ] 238 ] 239 240 new_predictions = self.ebm.predict(self.data) 241 data_df["new_prediction"] = new_predictions 242 243 return data_df 244 245 def __repr__(self) -> str: 246 summary = self.model_summary() 247 return summary.to_string() 248 249 def to_df(self): 250 summary = self.model_summary(False) 251 return summary
class
Counterfactuals:
25class Counterfactuals: 26 """Class to represent GAM counterfactual explanations.""" 27 28 def __init__( 29 self, 30 solutions: list, 31 isSuccessful: bool, 32 model: pulp.LpProblem, 33 variables: dict, 34 ebm: Union[ExplainableBoostingClassifier, ExplainableBoostingRegressor], 35 cur_example: np.ndarray, 36 options: dict, 37 ): 38 """Initialize a Counterfactuals object. 39 40 Args: 41 solutions (list): List of generated `(active_variables, optimal value)`. 42 If successful, it should have `total_cfs` items. 43 isSuccessful (bool): True if the mixed-integer linear problem has 44 `total_cfs` number of solutions under all constraints. 45 model (LpProblem): Linear programming model 46 variables (dict): Dictionary containing all MILP variables, 47 `feature_name` -> [`variables`], 48 ebm (Union[ExplainableBoostingClassifier, ExplainableBoostingRegressor]): 49 The trained EBM model. 50 cur_example (np.ndarray): The original data point. 51 options (dict): Dictionary containing all eligible options for each 52 selected features. `feature_name` -> `[[target, score_gain, 53 distance, bin_id]]` 54 """ 55 self.isSuccessful = isSuccessful 56 """Boolean to indicate if the optimization is successful.""" 57 58 self.model = model 59 """MILP program model.""" 60 61 self.variables = variables 62 """MILP program variabels.""" 63 64 self.ebm = ebm 65 """The trained EBM model.""" 66 67 self.cur_example = cur_example[0] 68 """The original data point.""" 69 70 self.options = options 71 """All possible options.""" 72 73 self.solutions = solutions 74 """Solutions for MILP.""" 75 76 self.data: np.ndarray 77 """Generated CFs in the original data dataformat.""" 78 79 self.target_bins: list 80 """New bins used in each row of `data`.""" 81 82 self.values: list 83 """Corresponding objective values (total distance) of each `data` row.""" 84 85 self.convert_cfs_to_data(solutions) 86 87 def convert_cfs_to_data(self, solutions): 88 """Convert optimal CF solutions to the original dataformat.""" 89 90 self.data = [] 91 self.values = [] 92 self.target_ranges = [] 93 94 for active_variables, value in solutions: 95 cur_cf = self.cur_example.copy() 96 cur_target_ranges = [] 97 98 for var in active_variables: 99 # Skip interaction vars (included) 100 # In EBM, interaction names are `f1 x f2`, pulp's variable name 101 # is `f1_x_f2` 102 if "_x_" not in var.name: 103 f_name = re.sub(r"(.+):\d+", r"\1", var.name) 104 bin_i = int(re.sub(r".+:(\d+)", r"\1", var.name)) 105 106 # Find the original value 107 org_value = self.cur_example[self.ebm.feature_names.index(f_name)] 108 109 # Find the target bin 110 f_index = self.ebm.feature_names.index(f_name) 111 f_type = self.ebm.feature_types[f_index] 112 113 if f_type == "continuous": 114 bin_starts = self.ebm.preprocessor_._get_bin_labels(f_index)[ 115 :-1 116 ] 117 118 target_bin = "[{},".format(bin_starts[bin_i]) 119 120 if bin_i + 1 < len(bin_starts): 121 target_bin += " {})".format(bin_starts[bin_i + 1]) 122 else: 123 target_bin += " inf)" 124 else: 125 target_bin = "" 126 org_value = '"{}"'.format(org_value) 127 128 for option in self.options[f_name]: 129 if option[3] == bin_i: 130 target_value = option[0] 131 cur_cf[f_index] = target_value 132 133 if f_type == "continuous": 134 cur_target_ranges.append(target_bin) 135 else: 136 cur_target_ranges.append(option[0]) 137 break 138 139 self.data.append(cur_cf) 140 self.values.append(value) 141 self.target_ranges.append(cur_target_ranges) 142 143 self.data = np.vstack(self.data) 144 145 def show(self): 146 """ 147 Print the optimal solutions. 148 """ 149 count = 0 150 151 for active_variables, value in self.solutions: 152 count += 1 153 print("## Strategy {} ##".format(count)) 154 155 for var in active_variables: 156 # Skip interaction vars (included) 157 if "_x_" not in var.name: 158 f_name = re.sub(r"(.+):\d+", r"\1", var.name) 159 bin_i = int(re.sub(r".+:(\d+)", r"\1", var.name)) 160 161 # Find the original value 162 org_value = self.cur_example[self.ebm.feature_names.index(f_name)] 163 164 # Find the target bin 165 f_index = self.ebm.feature_names.index(f_name) 166 f_type = self.ebm.feature_types[f_index] 167 168 if f_type == "continuous": 169 bin_starts = self.ebm.preprocessor_._get_bin_labels(f_index)[ 170 :-1 171 ] 172 173 target_bin = "[{},".format(bin_starts[bin_i]) 174 175 if bin_i + 1 < len(bin_starts): 176 target_bin += " {})".format(bin_starts[bin_i + 1]) 177 else: 178 target_bin += " inf)" 179 else: 180 target_bin = "" 181 org_value = '"{}"'.format(org_value) 182 183 for option in self.options[f_name]: 184 if option[3] == bin_i: 185 new_value = ( 186 option[0] 187 if f_type == "continuous" 188 else '"{}"'.format(option[0]) 189 ) 190 191 print( 192 "Change <{}> from {} to {} {}".format( 193 f_name, org_value, new_value, target_bin 194 ) 195 ) 196 print( 197 "\t* score gain: {:.4f}\n\t* distance cost: {:.4f}".format( 198 option[1], option[2] 199 ) 200 ) 201 break 202 203 else: 204 f_name = re.sub(r"(.+):.+", r"\1", var.name) 205 f_name = f_name.replace("_x_", " x ") 206 bin_0 = int(re.sub(r".+:(\d+),\d+", r"\1", var.name)) 207 bin_1 = int(re.sub(r".+:\d+,(\d+)", r"\1", var.name)) 208 209 for option in self.options[f_name]: 210 if option[3][0] == bin_0 and option[3][1] == bin_1: 211 print("Trigger interaction term: <{}>".format(f_name)) 212 print( 213 "\t* score gain: {:.4f}\n\t* distance cost: {:.4f}".format( 214 option[1], 0 215 ) 216 ) 217 break 218 print() 219 220 def model_summary(self, verbose=True): 221 """Print out a summary of the MILP model.""" 222 223 if verbose: 224 print( 225 "Top {} solution to a MILP model with {} variables and {} constraints.".format( 226 self.data.shape[0], 227 self.model.numVariables(), 228 self.model.numConstraints(), 229 ) 230 ) 231 232 data_df = pd.DataFrame(self.data) 233 data_df.columns = np.array(self.ebm.feature_names)[ 234 [ 235 i 236 for i in range(len(self.ebm.feature_types)) 237 if self.ebm.feature_types[i] != "interaction" 238 ] 239 ] 240 241 new_predictions = self.ebm.predict(self.data) 242 data_df["new_prediction"] = new_predictions 243 244 return data_df 245 246 def __repr__(self) -> str: 247 summary = self.model_summary() 248 return summary.to_string() 249 250 def to_df(self): 251 summary = self.model_summary(False) 252 return summary
Class to represent GAM counterfactual explanations.
Counterfactuals( solutions: list, isSuccessful: bool, model: pulp.pulp.LpProblem, variables: dict, ebm: Union[interpret.glassbox.ebm.ebm.ExplainableBoostingClassifier, interpret.glassbox.ebm.ebm.ExplainableBoostingRegressor], cur_example: numpy.ndarray, options: dict)
28 def __init__( 29 self, 30 solutions: list, 31 isSuccessful: bool, 32 model: pulp.LpProblem, 33 variables: dict, 34 ebm: Union[ExplainableBoostingClassifier, ExplainableBoostingRegressor], 35 cur_example: np.ndarray, 36 options: dict, 37 ): 38 """Initialize a Counterfactuals object. 39 40 Args: 41 solutions (list): List of generated `(active_variables, optimal value)`. 42 If successful, it should have `total_cfs` items. 43 isSuccessful (bool): True if the mixed-integer linear problem has 44 `total_cfs` number of solutions under all constraints. 45 model (LpProblem): Linear programming model 46 variables (dict): Dictionary containing all MILP variables, 47 `feature_name` -> [`variables`], 48 ebm (Union[ExplainableBoostingClassifier, ExplainableBoostingRegressor]): 49 The trained EBM model. 50 cur_example (np.ndarray): The original data point. 51 options (dict): Dictionary containing all eligible options for each 52 selected features. `feature_name` -> `[[target, score_gain, 53 distance, bin_id]]` 54 """ 55 self.isSuccessful = isSuccessful 56 """Boolean to indicate if the optimization is successful.""" 57 58 self.model = model 59 """MILP program model.""" 60 61 self.variables = variables 62 """MILP program variabels.""" 63 64 self.ebm = ebm 65 """The trained EBM model.""" 66 67 self.cur_example = cur_example[0] 68 """The original data point.""" 69 70 self.options = options 71 """All possible options.""" 72 73 self.solutions = solutions 74 """Solutions for MILP.""" 75 76 self.data: np.ndarray 77 """Generated CFs in the original data dataformat.""" 78 79 self.target_bins: list 80 """New bins used in each row of `data`.""" 81 82 self.values: list 83 """Corresponding objective values (total distance) of each `data` row.""" 84 85 self.convert_cfs_to_data(solutions)
Initialize a Counterfactuals object.
Args
- solutions (list): List of generated
(active_variables, optimal value)
. If successful, it should havetotal_cfs
items. - isSuccessful (bool): True if the mixed-integer linear problem has
total_cfs
number of solutions under all constraints. - model (LpProblem): Linear programming model
- variables (dict): Dictionary containing all MILP variables,
feature_name
-> [variables
], - ebm (Union[ExplainableBoostingClassifier, ExplainableBoostingRegressor]): The trained EBM model.
- cur_example (np.ndarray): The original data point.
- options (dict): Dictionary containing all eligible options for each
selected features.
feature_name
->[[target, score_gain, distance, bin_id]]
def
convert_cfs_to_data(self, solutions):
87 def convert_cfs_to_data(self, solutions): 88 """Convert optimal CF solutions to the original dataformat.""" 89 90 self.data = [] 91 self.values = [] 92 self.target_ranges = [] 93 94 for active_variables, value in solutions: 95 cur_cf = self.cur_example.copy() 96 cur_target_ranges = [] 97 98 for var in active_variables: 99 # Skip interaction vars (included) 100 # In EBM, interaction names are `f1 x f2`, pulp's variable name 101 # is `f1_x_f2` 102 if "_x_" not in var.name: 103 f_name = re.sub(r"(.+):\d+", r"\1", var.name) 104 bin_i = int(re.sub(r".+:(\d+)", r"\1", var.name)) 105 106 # Find the original value 107 org_value = self.cur_example[self.ebm.feature_names.index(f_name)] 108 109 # Find the target bin 110 f_index = self.ebm.feature_names.index(f_name) 111 f_type = self.ebm.feature_types[f_index] 112 113 if f_type == "continuous": 114 bin_starts = self.ebm.preprocessor_._get_bin_labels(f_index)[ 115 :-1 116 ] 117 118 target_bin = "[{},".format(bin_starts[bin_i]) 119 120 if bin_i + 1 < len(bin_starts): 121 target_bin += " {})".format(bin_starts[bin_i + 1]) 122 else: 123 target_bin += " inf)" 124 else: 125 target_bin = "" 126 org_value = '"{}"'.format(org_value) 127 128 for option in self.options[f_name]: 129 if option[3] == bin_i: 130 target_value = option[0] 131 cur_cf[f_index] = target_value 132 133 if f_type == "continuous": 134 cur_target_ranges.append(target_bin) 135 else: 136 cur_target_ranges.append(option[0]) 137 break 138 139 self.data.append(cur_cf) 140 self.values.append(value) 141 self.target_ranges.append(cur_target_ranges) 142 143 self.data = np.vstack(self.data)
Convert optimal CF solutions to the original dataformat.
def
show(self):
145 def show(self): 146 """ 147 Print the optimal solutions. 148 """ 149 count = 0 150 151 for active_variables, value in self.solutions: 152 count += 1 153 print("## Strategy {} ##".format(count)) 154 155 for var in active_variables: 156 # Skip interaction vars (included) 157 if "_x_" not in var.name: 158 f_name = re.sub(r"(.+):\d+", r"\1", var.name) 159 bin_i = int(re.sub(r".+:(\d+)", r"\1", var.name)) 160 161 # Find the original value 162 org_value = self.cur_example[self.ebm.feature_names.index(f_name)] 163 164 # Find the target bin 165 f_index = self.ebm.feature_names.index(f_name) 166 f_type = self.ebm.feature_types[f_index] 167 168 if f_type == "continuous": 169 bin_starts = self.ebm.preprocessor_._get_bin_labels(f_index)[ 170 :-1 171 ] 172 173 target_bin = "[{},".format(bin_starts[bin_i]) 174 175 if bin_i + 1 < len(bin_starts): 176 target_bin += " {})".format(bin_starts[bin_i + 1]) 177 else: 178 target_bin += " inf)" 179 else: 180 target_bin = "" 181 org_value = '"{}"'.format(org_value) 182 183 for option in self.options[f_name]: 184 if option[3] == bin_i: 185 new_value = ( 186 option[0] 187 if f_type == "continuous" 188 else '"{}"'.format(option[0]) 189 ) 190 191 print( 192 "Change <{}> from {} to {} {}".format( 193 f_name, org_value, new_value, target_bin 194 ) 195 ) 196 print( 197 "\t* score gain: {:.4f}\n\t* distance cost: {:.4f}".format( 198 option[1], option[2] 199 ) 200 ) 201 break 202 203 else: 204 f_name = re.sub(r"(.+):.+", r"\1", var.name) 205 f_name = f_name.replace("_x_", " x ") 206 bin_0 = int(re.sub(r".+:(\d+),\d+", r"\1", var.name)) 207 bin_1 = int(re.sub(r".+:\d+,(\d+)", r"\1", var.name)) 208 209 for option in self.options[f_name]: 210 if option[3][0] == bin_0 and option[3][1] == bin_1: 211 print("Trigger interaction term: <{}>".format(f_name)) 212 print( 213 "\t* score gain: {:.4f}\n\t* distance cost: {:.4f}".format( 214 option[1], 0 215 ) 216 ) 217 break 218 print()
Print the optimal solutions.
def
model_summary(self, verbose=True):
220 def model_summary(self, verbose=True): 221 """Print out a summary of the MILP model.""" 222 223 if verbose: 224 print( 225 "Top {} solution to a MILP model with {} variables and {} constraints.".format( 226 self.data.shape[0], 227 self.model.numVariables(), 228 self.model.numConstraints(), 229 ) 230 ) 231 232 data_df = pd.DataFrame(self.data) 233 data_df.columns = np.array(self.ebm.feature_names)[ 234 [ 235 i 236 for i in range(len(self.ebm.feature_types)) 237 if self.ebm.feature_types[i] != "interaction" 238 ] 239 ] 240 241 new_predictions = self.ebm.predict(self.data) 242 data_df["new_prediction"] = new_predictions 243 244 return data_df
Print out a summary of the MILP model.