gamcoach.counterfactuals

Counterfactuals Class.

This module implements the Counterfactuals class. We use it to represent the generated counterfactual explanations.

  1"""Counterfactuals Class.
  2
  3This module implements the Counterfactuals class. We use it to represent the
  4generated counterfactual explanations.
  5"""
  6
  7import numpy as np
  8import pandas as pd
  9import re
 10import pulp
 11
 12from tqdm import tqdm
 13from interpret.glassbox import (
 14    ExplainableBoostingClassifier,
 15    ExplainableBoostingRegressor,
 16)
 17from time import time
 18from collections import Counter
 19from typing import Union
 20
 21SEED = 922
 22
 23
 24class Counterfactuals:
 25    """Class to represent GAM counterfactual explanations."""
 26
 27    def __init__(
 28        self,
 29        solutions: list,
 30        isSuccessful: bool,
 31        model: pulp.LpProblem,
 32        variables: dict,
 33        ebm: Union[ExplainableBoostingClassifier, ExplainableBoostingRegressor],
 34        cur_example: np.ndarray,
 35        options: dict,
 36    ):
 37        """Initialize a Counterfactuals object.
 38
 39        Args:
 40            solutions (list): List of generated `(active_variables, optimal value)`.
 41                If successful, it should have `total_cfs` items.
 42            isSuccessful (bool): True if the mixed-integer linear problem has
 43                `total_cfs` number of solutions under all constraints.
 44            model (LpProblem): Linear programming model
 45            variables (dict): Dictionary containing all MILP variables,
 46                `feature_name` -> [`variables`],
 47            ebm (Union[ExplainableBoostingClassifier, ExplainableBoostingRegressor]):
 48                The trained EBM model.
 49            cur_example (np.ndarray): The original data point.
 50            options (dict): Dictionary containing all eligible options for each
 51                selected features. `feature_name` -> `[[target, score_gain,
 52                distance, bin_id]]`
 53        """
 54        self.isSuccessful = isSuccessful
 55        """Boolean to indicate if the optimization is successful."""
 56
 57        self.model = model
 58        """MILP program model."""
 59
 60        self.variables = variables
 61        """MILP program variabels."""
 62
 63        self.ebm = ebm
 64        """The trained EBM model."""
 65
 66        self.cur_example = cur_example[0]
 67        """The original data point."""
 68
 69        self.options = options
 70        """All possible options."""
 71
 72        self.solutions = solutions
 73        """Solutions for MILP."""
 74
 75        self.data: np.ndarray
 76        """Generated CFs in the original data dataformat."""
 77
 78        self.target_bins: list
 79        """New bins used in each row of `data`."""
 80
 81        self.values: list
 82        """Corresponding objective values (total distance) of each `data` row."""
 83
 84        self.convert_cfs_to_data(solutions)
 85
 86    def convert_cfs_to_data(self, solutions):
 87        """Convert optimal CF solutions to the original dataformat."""
 88
 89        self.data = []
 90        self.values = []
 91        self.target_ranges = []
 92
 93        for active_variables, value in solutions:
 94            cur_cf = self.cur_example.copy()
 95            cur_target_ranges = []
 96
 97            for var in active_variables:
 98                # Skip interaction vars (included)
 99                # In EBM, interaction names are `f1 x f2`, pulp's variable name
100                # is `f1_x_f2`
101                if "_x_" not in var.name:
102                    f_name = re.sub(r"(.+):\d+", r"\1", var.name)
103                    bin_i = int(re.sub(r".+:(\d+)", r"\1", var.name))
104
105                    # Find the original value
106                    org_value = self.cur_example[self.ebm.feature_names.index(f_name)]
107
108                    # Find the target bin
109                    f_index = self.ebm.feature_names.index(f_name)
110                    f_type = self.ebm.feature_types[f_index]
111
112                    if f_type == "continuous":
113                        bin_starts = self.ebm.preprocessor_._get_bin_labels(f_index)[
114                            :-1
115                        ]
116
117                        target_bin = "[{},".format(bin_starts[bin_i])
118
119                        if bin_i + 1 < len(bin_starts):
120                            target_bin += " {})".format(bin_starts[bin_i + 1])
121                        else:
122                            target_bin += " inf)"
123                    else:
124                        target_bin = ""
125                        org_value = '"{}"'.format(org_value)
126
127                    for option in self.options[f_name]:
128                        if option[3] == bin_i:
129                            target_value = option[0]
130                            cur_cf[f_index] = target_value
131
132                            if f_type == "continuous":
133                                cur_target_ranges.append(target_bin)
134                            else:
135                                cur_target_ranges.append(option[0])
136                            break
137
138            self.data.append(cur_cf)
139            self.values.append(value)
140            self.target_ranges.append(cur_target_ranges)
141
142        self.data = np.vstack(self.data)
143
144    def show(self):
145        """
146        Print the optimal solutions.
147        """
148        count = 0
149
150        for active_variables, value in self.solutions:
151            count += 1
152            print("## Strategy {} ##".format(count))
153
154            for var in active_variables:
155                # Skip interaction vars (included)
156                if "_x_" not in var.name:
157                    f_name = re.sub(r"(.+):\d+", r"\1", var.name)
158                    bin_i = int(re.sub(r".+:(\d+)", r"\1", var.name))
159
160                    # Find the original value
161                    org_value = self.cur_example[self.ebm.feature_names.index(f_name)]
162
163                    # Find the target bin
164                    f_index = self.ebm.feature_names.index(f_name)
165                    f_type = self.ebm.feature_types[f_index]
166
167                    if f_type == "continuous":
168                        bin_starts = self.ebm.preprocessor_._get_bin_labels(f_index)[
169                            :-1
170                        ]
171
172                        target_bin = "[{},".format(bin_starts[bin_i])
173
174                        if bin_i + 1 < len(bin_starts):
175                            target_bin += " {})".format(bin_starts[bin_i + 1])
176                        else:
177                            target_bin += " inf)"
178                    else:
179                        target_bin = ""
180                        org_value = '"{}"'.format(org_value)
181
182                    for option in self.options[f_name]:
183                        if option[3] == bin_i:
184                            new_value = (
185                                option[0]
186                                if f_type == "continuous"
187                                else '"{}"'.format(option[0])
188                            )
189
190                            print(
191                                "Change <{}> from {} to {} {}".format(
192                                    f_name, org_value, new_value, target_bin
193                                )
194                            )
195                            print(
196                                "\t* score gain: {:.4f}\n\t* distance cost: {:.4f}".format(
197                                    option[1], option[2]
198                                )
199                            )
200                            break
201
202                else:
203                    f_name = re.sub(r"(.+):.+", r"\1", var.name)
204                    f_name = f_name.replace("_x_", " x ")
205                    bin_0 = int(re.sub(r".+:(\d+),\d+", r"\1", var.name))
206                    bin_1 = int(re.sub(r".+:\d+,(\d+)", r"\1", var.name))
207
208                    for option in self.options[f_name]:
209                        if option[3][0] == bin_0 and option[3][1] == bin_1:
210                            print("Trigger interaction term: <{}>".format(f_name))
211                            print(
212                                "\t* score gain: {:.4f}\n\t* distance cost: {:.4f}".format(
213                                    option[1], 0
214                                )
215                            )
216                            break
217            print()
218
219    def model_summary(self, verbose=True):
220        """Print out a summary of the MILP model."""
221
222        if verbose:
223            print(
224                "Top {} solution to a MILP model with {} variables and {} constraints.".format(
225                    self.data.shape[0],
226                    self.model.numVariables(),
227                    self.model.numConstraints(),
228                )
229            )
230
231        data_df = pd.DataFrame(self.data)
232        data_df.columns = np.array(self.ebm.feature_names)[
233            [
234                i
235                for i in range(len(self.ebm.feature_types))
236                if self.ebm.feature_types[i] != "interaction"
237            ]
238        ]
239
240        new_predictions = self.ebm.predict(self.data)
241        data_df["new_prediction"] = new_predictions
242
243        return data_df
244
245    def __repr__(self) -> str:
246        summary = self.model_summary()
247        return summary.to_string()
248
249    def to_df(self):
250        summary = self.model_summary(False)
251        return summary
class Counterfactuals:
 25class Counterfactuals:
 26    """Class to represent GAM counterfactual explanations."""
 27
 28    def __init__(
 29        self,
 30        solutions: list,
 31        isSuccessful: bool,
 32        model: pulp.LpProblem,
 33        variables: dict,
 34        ebm: Union[ExplainableBoostingClassifier, ExplainableBoostingRegressor],
 35        cur_example: np.ndarray,
 36        options: dict,
 37    ):
 38        """Initialize a Counterfactuals object.
 39
 40        Args:
 41            solutions (list): List of generated `(active_variables, optimal value)`.
 42                If successful, it should have `total_cfs` items.
 43            isSuccessful (bool): True if the mixed-integer linear problem has
 44                `total_cfs` number of solutions under all constraints.
 45            model (LpProblem): Linear programming model
 46            variables (dict): Dictionary containing all MILP variables,
 47                `feature_name` -> [`variables`],
 48            ebm (Union[ExplainableBoostingClassifier, ExplainableBoostingRegressor]):
 49                The trained EBM model.
 50            cur_example (np.ndarray): The original data point.
 51            options (dict): Dictionary containing all eligible options for each
 52                selected features. `feature_name` -> `[[target, score_gain,
 53                distance, bin_id]]`
 54        """
 55        self.isSuccessful = isSuccessful
 56        """Boolean to indicate if the optimization is successful."""
 57
 58        self.model = model
 59        """MILP program model."""
 60
 61        self.variables = variables
 62        """MILP program variabels."""
 63
 64        self.ebm = ebm
 65        """The trained EBM model."""
 66
 67        self.cur_example = cur_example[0]
 68        """The original data point."""
 69
 70        self.options = options
 71        """All possible options."""
 72
 73        self.solutions = solutions
 74        """Solutions for MILP."""
 75
 76        self.data: np.ndarray
 77        """Generated CFs in the original data dataformat."""
 78
 79        self.target_bins: list
 80        """New bins used in each row of `data`."""
 81
 82        self.values: list
 83        """Corresponding objective values (total distance) of each `data` row."""
 84
 85        self.convert_cfs_to_data(solutions)
 86
 87    def convert_cfs_to_data(self, solutions):
 88        """Convert optimal CF solutions to the original dataformat."""
 89
 90        self.data = []
 91        self.values = []
 92        self.target_ranges = []
 93
 94        for active_variables, value in solutions:
 95            cur_cf = self.cur_example.copy()
 96            cur_target_ranges = []
 97
 98            for var in active_variables:
 99                # Skip interaction vars (included)
100                # In EBM, interaction names are `f1 x f2`, pulp's variable name
101                # is `f1_x_f2`
102                if "_x_" not in var.name:
103                    f_name = re.sub(r"(.+):\d+", r"\1", var.name)
104                    bin_i = int(re.sub(r".+:(\d+)", r"\1", var.name))
105
106                    # Find the original value
107                    org_value = self.cur_example[self.ebm.feature_names.index(f_name)]
108
109                    # Find the target bin
110                    f_index = self.ebm.feature_names.index(f_name)
111                    f_type = self.ebm.feature_types[f_index]
112
113                    if f_type == "continuous":
114                        bin_starts = self.ebm.preprocessor_._get_bin_labels(f_index)[
115                            :-1
116                        ]
117
118                        target_bin = "[{},".format(bin_starts[bin_i])
119
120                        if bin_i + 1 < len(bin_starts):
121                            target_bin += " {})".format(bin_starts[bin_i + 1])
122                        else:
123                            target_bin += " inf)"
124                    else:
125                        target_bin = ""
126                        org_value = '"{}"'.format(org_value)
127
128                    for option in self.options[f_name]:
129                        if option[3] == bin_i:
130                            target_value = option[0]
131                            cur_cf[f_index] = target_value
132
133                            if f_type == "continuous":
134                                cur_target_ranges.append(target_bin)
135                            else:
136                                cur_target_ranges.append(option[0])
137                            break
138
139            self.data.append(cur_cf)
140            self.values.append(value)
141            self.target_ranges.append(cur_target_ranges)
142
143        self.data = np.vstack(self.data)
144
145    def show(self):
146        """
147        Print the optimal solutions.
148        """
149        count = 0
150
151        for active_variables, value in self.solutions:
152            count += 1
153            print("## Strategy {} ##".format(count))
154
155            for var in active_variables:
156                # Skip interaction vars (included)
157                if "_x_" not in var.name:
158                    f_name = re.sub(r"(.+):\d+", r"\1", var.name)
159                    bin_i = int(re.sub(r".+:(\d+)", r"\1", var.name))
160
161                    # Find the original value
162                    org_value = self.cur_example[self.ebm.feature_names.index(f_name)]
163
164                    # Find the target bin
165                    f_index = self.ebm.feature_names.index(f_name)
166                    f_type = self.ebm.feature_types[f_index]
167
168                    if f_type == "continuous":
169                        bin_starts = self.ebm.preprocessor_._get_bin_labels(f_index)[
170                            :-1
171                        ]
172
173                        target_bin = "[{},".format(bin_starts[bin_i])
174
175                        if bin_i + 1 < len(bin_starts):
176                            target_bin += " {})".format(bin_starts[bin_i + 1])
177                        else:
178                            target_bin += " inf)"
179                    else:
180                        target_bin = ""
181                        org_value = '"{}"'.format(org_value)
182
183                    for option in self.options[f_name]:
184                        if option[3] == bin_i:
185                            new_value = (
186                                option[0]
187                                if f_type == "continuous"
188                                else '"{}"'.format(option[0])
189                            )
190
191                            print(
192                                "Change <{}> from {} to {} {}".format(
193                                    f_name, org_value, new_value, target_bin
194                                )
195                            )
196                            print(
197                                "\t* score gain: {:.4f}\n\t* distance cost: {:.4f}".format(
198                                    option[1], option[2]
199                                )
200                            )
201                            break
202
203                else:
204                    f_name = re.sub(r"(.+):.+", r"\1", var.name)
205                    f_name = f_name.replace("_x_", " x ")
206                    bin_0 = int(re.sub(r".+:(\d+),\d+", r"\1", var.name))
207                    bin_1 = int(re.sub(r".+:\d+,(\d+)", r"\1", var.name))
208
209                    for option in self.options[f_name]:
210                        if option[3][0] == bin_0 and option[3][1] == bin_1:
211                            print("Trigger interaction term: <{}>".format(f_name))
212                            print(
213                                "\t* score gain: {:.4f}\n\t* distance cost: {:.4f}".format(
214                                    option[1], 0
215                                )
216                            )
217                            break
218            print()
219
220    def model_summary(self, verbose=True):
221        """Print out a summary of the MILP model."""
222
223        if verbose:
224            print(
225                "Top {} solution to a MILP model with {} variables and {} constraints.".format(
226                    self.data.shape[0],
227                    self.model.numVariables(),
228                    self.model.numConstraints(),
229                )
230            )
231
232        data_df = pd.DataFrame(self.data)
233        data_df.columns = np.array(self.ebm.feature_names)[
234            [
235                i
236                for i in range(len(self.ebm.feature_types))
237                if self.ebm.feature_types[i] != "interaction"
238            ]
239        ]
240
241        new_predictions = self.ebm.predict(self.data)
242        data_df["new_prediction"] = new_predictions
243
244        return data_df
245
246    def __repr__(self) -> str:
247        summary = self.model_summary()
248        return summary.to_string()
249
250    def to_df(self):
251        summary = self.model_summary(False)
252        return summary

Class to represent GAM counterfactual explanations.

Counterfactuals( solutions: list, isSuccessful: bool, model: pulp.pulp.LpProblem, variables: dict, ebm: Union[interpret.glassbox.ebm.ebm.ExplainableBoostingClassifier, interpret.glassbox.ebm.ebm.ExplainableBoostingRegressor], cur_example: numpy.ndarray, options: dict)
28    def __init__(
29        self,
30        solutions: list,
31        isSuccessful: bool,
32        model: pulp.LpProblem,
33        variables: dict,
34        ebm: Union[ExplainableBoostingClassifier, ExplainableBoostingRegressor],
35        cur_example: np.ndarray,
36        options: dict,
37    ):
38        """Initialize a Counterfactuals object.
39
40        Args:
41            solutions (list): List of generated `(active_variables, optimal value)`.
42                If successful, it should have `total_cfs` items.
43            isSuccessful (bool): True if the mixed-integer linear problem has
44                `total_cfs` number of solutions under all constraints.
45            model (LpProblem): Linear programming model
46            variables (dict): Dictionary containing all MILP variables,
47                `feature_name` -> [`variables`],
48            ebm (Union[ExplainableBoostingClassifier, ExplainableBoostingRegressor]):
49                The trained EBM model.
50            cur_example (np.ndarray): The original data point.
51            options (dict): Dictionary containing all eligible options for each
52                selected features. `feature_name` -> `[[target, score_gain,
53                distance, bin_id]]`
54        """
55        self.isSuccessful = isSuccessful
56        """Boolean to indicate if the optimization is successful."""
57
58        self.model = model
59        """MILP program model."""
60
61        self.variables = variables
62        """MILP program variabels."""
63
64        self.ebm = ebm
65        """The trained EBM model."""
66
67        self.cur_example = cur_example[0]
68        """The original data point."""
69
70        self.options = options
71        """All possible options."""
72
73        self.solutions = solutions
74        """Solutions for MILP."""
75
76        self.data: np.ndarray
77        """Generated CFs in the original data dataformat."""
78
79        self.target_bins: list
80        """New bins used in each row of `data`."""
81
82        self.values: list
83        """Corresponding objective values (total distance) of each `data` row."""
84
85        self.convert_cfs_to_data(solutions)

Initialize a Counterfactuals object.

Args
  • solutions (list): List of generated (active_variables, optimal value). If successful, it should have total_cfs items.
  • isSuccessful (bool): True if the mixed-integer linear problem has total_cfs number of solutions under all constraints.
  • model (LpProblem): Linear programming model
  • variables (dict): Dictionary containing all MILP variables, feature_name -> [variables],
  • ebm (Union[ExplainableBoostingClassifier, ExplainableBoostingRegressor]): The trained EBM model.
  • cur_example (np.ndarray): The original data point.
  • options (dict): Dictionary containing all eligible options for each selected features. feature_name -> [[target, score_gain, distance, bin_id]]
isSuccessful

Boolean to indicate if the optimization is successful.

model

MILP program model.

variables

MILP program variabels.

ebm

The trained EBM model.

cur_example

The original data point.

options

All possible options.

solutions

Solutions for MILP.

data: numpy.ndarray

Generated CFs in the original data dataformat.

target_bins: list

New bins used in each row of data.

values: list

Corresponding objective values (total distance) of each data row.

def convert_cfs_to_data(self, solutions):
 87    def convert_cfs_to_data(self, solutions):
 88        """Convert optimal CF solutions to the original dataformat."""
 89
 90        self.data = []
 91        self.values = []
 92        self.target_ranges = []
 93
 94        for active_variables, value in solutions:
 95            cur_cf = self.cur_example.copy()
 96            cur_target_ranges = []
 97
 98            for var in active_variables:
 99                # Skip interaction vars (included)
100                # In EBM, interaction names are `f1 x f2`, pulp's variable name
101                # is `f1_x_f2`
102                if "_x_" not in var.name:
103                    f_name = re.sub(r"(.+):\d+", r"\1", var.name)
104                    bin_i = int(re.sub(r".+:(\d+)", r"\1", var.name))
105
106                    # Find the original value
107                    org_value = self.cur_example[self.ebm.feature_names.index(f_name)]
108
109                    # Find the target bin
110                    f_index = self.ebm.feature_names.index(f_name)
111                    f_type = self.ebm.feature_types[f_index]
112
113                    if f_type == "continuous":
114                        bin_starts = self.ebm.preprocessor_._get_bin_labels(f_index)[
115                            :-1
116                        ]
117
118                        target_bin = "[{},".format(bin_starts[bin_i])
119
120                        if bin_i + 1 < len(bin_starts):
121                            target_bin += " {})".format(bin_starts[bin_i + 1])
122                        else:
123                            target_bin += " inf)"
124                    else:
125                        target_bin = ""
126                        org_value = '"{}"'.format(org_value)
127
128                    for option in self.options[f_name]:
129                        if option[3] == bin_i:
130                            target_value = option[0]
131                            cur_cf[f_index] = target_value
132
133                            if f_type == "continuous":
134                                cur_target_ranges.append(target_bin)
135                            else:
136                                cur_target_ranges.append(option[0])
137                            break
138
139            self.data.append(cur_cf)
140            self.values.append(value)
141            self.target_ranges.append(cur_target_ranges)
142
143        self.data = np.vstack(self.data)

Convert optimal CF solutions to the original dataformat.

def show(self):
145    def show(self):
146        """
147        Print the optimal solutions.
148        """
149        count = 0
150
151        for active_variables, value in self.solutions:
152            count += 1
153            print("## Strategy {} ##".format(count))
154
155            for var in active_variables:
156                # Skip interaction vars (included)
157                if "_x_" not in var.name:
158                    f_name = re.sub(r"(.+):\d+", r"\1", var.name)
159                    bin_i = int(re.sub(r".+:(\d+)", r"\1", var.name))
160
161                    # Find the original value
162                    org_value = self.cur_example[self.ebm.feature_names.index(f_name)]
163
164                    # Find the target bin
165                    f_index = self.ebm.feature_names.index(f_name)
166                    f_type = self.ebm.feature_types[f_index]
167
168                    if f_type == "continuous":
169                        bin_starts = self.ebm.preprocessor_._get_bin_labels(f_index)[
170                            :-1
171                        ]
172
173                        target_bin = "[{},".format(bin_starts[bin_i])
174
175                        if bin_i + 1 < len(bin_starts):
176                            target_bin += " {})".format(bin_starts[bin_i + 1])
177                        else:
178                            target_bin += " inf)"
179                    else:
180                        target_bin = ""
181                        org_value = '"{}"'.format(org_value)
182
183                    for option in self.options[f_name]:
184                        if option[3] == bin_i:
185                            new_value = (
186                                option[0]
187                                if f_type == "continuous"
188                                else '"{}"'.format(option[0])
189                            )
190
191                            print(
192                                "Change <{}> from {} to {} {}".format(
193                                    f_name, org_value, new_value, target_bin
194                                )
195                            )
196                            print(
197                                "\t* score gain: {:.4f}\n\t* distance cost: {:.4f}".format(
198                                    option[1], option[2]
199                                )
200                            )
201                            break
202
203                else:
204                    f_name = re.sub(r"(.+):.+", r"\1", var.name)
205                    f_name = f_name.replace("_x_", " x ")
206                    bin_0 = int(re.sub(r".+:(\d+),\d+", r"\1", var.name))
207                    bin_1 = int(re.sub(r".+:\d+,(\d+)", r"\1", var.name))
208
209                    for option in self.options[f_name]:
210                        if option[3][0] == bin_0 and option[3][1] == bin_1:
211                            print("Trigger interaction term: <{}>".format(f_name))
212                            print(
213                                "\t* score gain: {:.4f}\n\t* distance cost: {:.4f}".format(
214                                    option[1], 0
215                                )
216                            )
217                            break
218            print()

Print the optimal solutions.

def model_summary(self, verbose=True):
220    def model_summary(self, verbose=True):
221        """Print out a summary of the MILP model."""
222
223        if verbose:
224            print(
225                "Top {} solution to a MILP model with {} variables and {} constraints.".format(
226                    self.data.shape[0],
227                    self.model.numVariables(),
228                    self.model.numConstraints(),
229                )
230            )
231
232        data_df = pd.DataFrame(self.data)
233        data_df.columns = np.array(self.ebm.feature_names)[
234            [
235                i
236                for i in range(len(self.ebm.feature_types))
237                if self.ebm.feature_types[i] != "interaction"
238            ]
239        ]
240
241        new_predictions = self.ebm.predict(self.data)
242        data_df["new_prediction"] = new_predictions
243
244        return data_df

Print out a summary of the MILP model.

def to_df(self):
250    def to_df(self):
251        summary = self.model_summary(False)
252        return summary