Source code for spectral_libraries.core.ies

# -*- coding: utf-8 -*-
"""
| ----------------------------------------------------------------------------------------------------------------------
| Date                : August 2018
| Copyright           : © 2018 - 2020 by Ann Crabbé (KU Leuven)
| Email               : acrabbe.foss@gmail.com
| Acknowledgements    : Translated from VIPER Tools 2.0 (UC Santa Barbara, VIPER Lab).
|                       Dar Roberts, Kerry Halligan, Philip Dennison, Kenneth Dudley, Ben Somers, Ann Crabbé
|
| This file is part of the Spectral Libraries QGIS plugin and python package.
|
| This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
| License as published by the Free Software Foundation, either version 3 of the License, or any later version.
|
| This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
| warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
|
| You should have received a copy of the GNU General Public License (COPYING.txt). If not see www.gnu.org/licenses.
| ----------------------------------------------------------------------------------------------------------------------
"""
import os
import copy
import numpy as np
from functools import partial
from multiprocessing import Pool

from .library_metrics import square_rmse


[docs]class Ies: """ Iterative Endmember Selection (IES) is used to identify the spectral library subset that provides the best class separability. The basis for this is a RMSE-based kappa coefficient. In an iterative manner, endmembers are added and removed from the subset until the kappa coefficient no longer improves. Citations: Schaaf, A.N., Dennison, P.E., Fryer, G.K., Roth, K.L., and Roberts, D.A., 2011, Mapping Plant Functional Types at Multiple Spatial Resolutions using Imaging Spectrometer Data, GIScience Remote Sensing, 48, p. 324-344. Roth, K.L., Dennison, P.E., and Roberts, D.A., 2012, Comparing endmember selection techniques for accurate mapping of plant species and land cover using imaging spectrometer data, Remote Sensing of Environment, 127, p. 139-152. """ def __init__(self): self.rmse_band = None self.mask = None self.original_classes = None self.original_classes_multiplied = None self.forced_list = None self.n_endmembers = None self.n_classes = None self.n_processes = 1 self.summary = {} def _confusion_matrix(self, modeled_classes: np.array) -> np.array: """ :param modeled_classes: the classes modeled based on a subset of models :return: confusion matrix (= 2D histogram) of the original vs. modeled classes """ # self.original_classes_multiplied is the original classes multiplied by (n_classes +1) # by adding the modeled classes, we can create a n_classes^2 separate histogram bins confusion_matrix = self.original_classes_multiplied + modeled_classes # create a 1D histogram confusion_matrix = np.bincount(confusion_matrix, minlength=(self.n_classes + 1) ** 2) # reshape to 2D histogram return confusion_matrix.reshape([self.n_classes + 1, self.n_classes + 1]).T def _kappa(self, confusion_matrix: np.array) -> np.float32: """ :param confusion_matrix: the confusion matrix :return: kappa coefficient: a measure of class separability """ theta1 = np.float32(np.trace(confusion_matrix)) / self.n_endmembers theta2 = np.float32(np.sum(np.dot(confusion_matrix, confusion_matrix))) / (self.n_endmembers ** 2) return np.float32((theta1 - theta2) / (1 - theta2)) def _add_model(self, current_selection: np.array): """ Routine for adding a new model to the selection, if it provides a better kappa than the previous situation :param current_selection: current pool of models :return: the kappa and index of the newly found model """ n_models = current_selection.shape[0] selected_classes = self.original_classes[current_selection] # get the current min RMSE and model for each spectrum if n_models == 1: current_min_rmse = self.rmse_band[current_selection][0] current_modeled_classes = np.repeat(selected_classes, self.n_endmembers) current_modeled_classes[self.mask[current_selection][0]] = self.n_classes else: current_min_rmse = np.amin(self.rmse_band[current_selection], axis=0) min_index_current = np.argmin(self.rmse_band[current_selection], axis=0) min_index_current[np.all(self.mask[current_selection], axis=0)] = -1 selected_classes = np.append(selected_classes, self.n_classes) current_modeled_classes = selected_classes[min_index_current] # calculate the kappa array if self.n_processes == 1: # avoid adding a model that is already in the current pool potential_indices = np.arange(self.n_endmembers) potential_indices = np.delete(potential_indices, current_selection) # try adding each model in an iterative way kappa_array = np.zeros(self.n_endmembers, dtype=np.float32) for i in potential_indices: kappa_array[i] = self._add_model_thread(i, min_rmse=current_min_rmse, modeled_classes=current_modeled_classes) else: # not used for now - has no improvements pool = Pool(processes=self.n_processes) temp = partial(self._add_model_thread, min_rmse=current_min_rmse, modeled_classes=current_modeled_classes) kappa_array = pool.map(temp, np.arange(self.n_endmembers)) kappa_array = np.array(kappa_array) kappa_array[current_selection] = 0 # return only the model with the best kappa, and its index return kappa_array.max(), np.argmax(kappa_array) def _add_model_thread(self, i, min_rmse=None, modeled_classes=None): # the indices where the new model has a lower RMSE new_model_indices = np.where(self.rmse_band[i] < min_rmse) # change the current modeled classes where the new model has a better RMSE modeled_classes = copy.deepcopy(modeled_classes) modeled_classes[new_model_indices] = self.original_classes[i] confusion_matrix = self._confusion_matrix(modeled_classes) return self._kappa(confusion_matrix) def _remove_model(self, current_selection: np.array): """ Routine for removing a model from a selection, if it provides a better kappa than the previous situation :param current_selection: current pool of models :return: the kappa and index of the model to remove """ n_models = current_selection.shape[0] # create these before the loop to save some time mask = np.ones(n_models, dtype=bool) current_rmse = self.rmse_band[current_selection] current_mask = self.mask[current_selection] current_classes = self.original_classes[current_selection] # calculate the kappa array if self.n_processes == 1: # try adding each model in an iterative way kappa_array = np.zeros(n_models, dtype=np.float32) for i in np.arange(n_models): kappa_array[i] = self._remove_model_thread(i, ones=mask, mask=current_mask, rmse=current_rmse, classes=current_classes) else: pool = Pool(processes=self.n_processes) temp = partial(self._remove_model_thread, ones=mask, mask=current_mask, rmse=current_rmse, classes=current_classes) kappa_array = pool.map(temp, np.arange(n_models)) kappa_array = np.array(kappa_array) # no subtracting forced models kappa_array[np.where(np.in1d(current_selection, self.forced_list))] = 0 # return only the model with the best kappa, and its index return kappa_array.max(), np.argmax(kappa_array) def _remove_model_thread(self, i, ones=None, mask=None, rmse=None, classes=None): # find the modeled classes after removing one model ones = copy.deepcopy(ones) ones[i] = False min_index_removed = np.argmin(rmse[ones], axis=0) min_index_removed[np.all(mask[ones], axis=0)] = -1 classes_removed = np.append(classes[ones], self.n_classes) modeled_classes_removed = classes_removed[min_index_removed] # confusion matrix and kappa confusion_matrix = self._confusion_matrix(modeled_classes_removed) return self._kappa(confusion_matrix) def _evaluate_selection(self, selection: np.array): """ Routine for evaluating the kappa and confusion matrix of a given selection of models :param selection: current pool of models :return: the kappa and confusion matrix for this selection """ n_models = selection.shape[0] selected_classes = self.original_classes[selection] # get the current min RMSE and model for each spectrum if n_models == 1: current_modeled_classes = np.repeat(selected_classes, self.n_endmembers) current_modeled_classes[self.mask[selection][0]] = self.n_classes else: min_index_current = np.argmin(self.rmse_band[selection], axis=0) min_index_current[np.all(self.mask[selection], axis=0)] = -1 selected_classes = np.append(selected_classes, self.n_classes) current_modeled_classes = selected_classes[min_index_current] confusion_matrix = self._confusion_matrix(current_modeled_classes) kappa = self._kappa(confusion_matrix) # return the kappa and confusion matrix the model with the best kappa, and its index return kappa, confusion_matrix
[docs] def execute(self, library: np.array, class_list: np.array, constraints: tuple = (-0.05, 1.05, 0.025), forced_list: np.array = None, forced_step: int = None, multiprocessing: bool = True, summary: bool = False, set_progress: callable = None, log: callable = print): """ Execute the IES algorithm. The result is a 1-D numpy array of selected endmembers. In case a summary is requested, it is delivered as a second output variable. :param library: spectral library [spectra as columns], scaled to reflectance values, without bad bands :param class_list: int array with the *numerical* class for each spectrum (e.g. GV = 1, SOIL = 2) :param constraints: min fraction, max fraction and max RMSE. :param forced_list: int array with indices of the endmembers that should be forcefully included :param forced_step: the loop in which the forced_list should be included (starting from 0) :param multiprocessing: use multiprocessing or not (option is deactivated) :param summary: return a summary of the process or not :param set_progress: communicate progress (refer to the progress bar in case of GUI; otherwise print to console) :param log: communicate messages (refer to the print_log tab in the GUI; otherwise print to the console) :return: numpy array with the indices of the selected endmembers [+ summary as a dict in case requested] """ log('IES calculations started') set_progress = set_progress if set_progress else printProgress progress_int = 0 # store the variables self.original_classes = class_list self.n_endmembers = self.original_classes.shape[0] self.n_classes = self.original_classes.max() + 1 self.original_classes_multiplied = class_list * (self.n_classes + 1) # for later use in the confusion matrix self.forced_list = forced_list if multiprocessing: self.n_processes = 1 # option is turned off for now # try: # self.n_processes = len(os.sched_getaffinity(0)) # log("CPU cores available for use: {}".format(self.n_processes)) # except AttributeError: # self.n_processes = 1 log('Calculating the RMSE') self.rmse_band, constraints_band = square_rmse(library=library, constraints=constraints, reset=True) self.mask = constraints_band > 0 self.rmse_band[self.mask] = 9999 stop_adding = 0 stop_removing = 0 # find the first endmember: the modeled class with 1 model is always the model itself, except constraint breach, # unless we have to use the forced library right away if forced_step == 0: selected_indices = forced_list max_kappa, confusion_matrix = self._evaluate_selection(selected_indices) log("0: forced library entered: " + np.array2string(forced_list, separator=", ") + " - Kappa at this point: " + str(max_kappa)) progress_int = progress_int + 1 set_progress(progress_int) if summary: self.summary[0] = {'add': forced_list, 'kappa': max_kappa, 'confusion_matrix': confusion_matrix} else: modeled_classes_one_model = np.repeat(self.original_classes, self.n_endmembers).reshape((self.n_endmembers, self.n_endmembers)) modeled_classes_one_model[self.mask] = self.n_classes kappa_array = np.zeros(self.n_endmembers, dtype=np.float32) for i in np.arange(self.n_endmembers): confusion_matrix = self._confusion_matrix(modeled_classes_one_model[i]) kappa_array[i] = self._kappa(confusion_matrix) max_kappa = kappa_array.max() new_index = np.argmax(kappa_array) selected_indices = np.array([new_index]) log("0: new endmember: " + str(new_index) + " - Kappa at this point: " + str(max_kappa)) progress_int = progress_int + 1 set_progress(progress_int) if summary: self.summary[0] = {'add': new_index, 'kappa': max_kappa, 'confusion_matrix': self._confusion_matrix(modeled_classes_one_model[new_index])} # find the second endmember, unless we have to use the forced library in this step or unless we already have 2 if forced_step == 1: selected_indices = np.sort(np.append(selected_indices, forced_list)) max_kappa, confusion_matrix = self._evaluate_selection(selected_indices) log("1: forced library entered: " + np.array2string(forced_list, separator=", ") + " - Kappa at this point: " + str(max_kappa)) progress_int = progress_int + 1 set_progress(progress_int) if summary: self.summary[1] = {'add': forced_list, 'kappa': max_kappa, 'confusion_matrix': confusion_matrix} elif selected_indices.shape[0] < 2: new_kappa, new_index = self._add_model(selected_indices) if new_kappa > max_kappa: max_kappa = new_kappa selected_indices = np.sort(np.append(selected_indices, new_index)) log("1: new endmember: " + str(new_index) + " - Kappa at this point: " + str(max_kappa)) progress_int = progress_int + 1 set_progress(progress_int) if summary: self.summary[1] = {'add': new_index, 'kappa': max_kappa, 'confusion_matrix': self._evaluate_selection(selected_indices)[1]} else: set_progress(100) raise Exception("No second endmember found. Returning without result.") else: log("1: second loop skipped because forced library contained more than one endmember") progress_int = progress_int + 1 set_progress(progress_int) if summary: self.summary[1] = {'add': None} # IES loop loop_counter = 2 while stop_adding == 0 or stop_removing == 0: if loop_counter == 100: pass if forced_step == loop_counter: selected_indices = np.sort(np.append(selected_indices, forced_list)) max_kappa, confusion_matrix = self._evaluate_selection(selected_indices) log(str(loop_counter) + ": forced library entered: " + np.array2string(forced_list, separator=", ") + " - Kappa at this point: " + str(max_kappa)) progress_int = progress_int + 1 if progress_int < 99 else 0 set_progress(progress_int) if summary: self.summary[loop_counter] = {'add': forced_list, 'kappa': max_kappa, 'confusion_matrix': confusion_matrix} else: # process of adding a new model new_kappa, new_index = self._add_model(selected_indices) if new_kappa > max_kappa: max_kappa = new_kappa selected_indices = np.sort(np.append(selected_indices, new_index)) log(str(loop_counter) + ": new endmember: " + str(new_index) + " - Kappa at this point: " + str(max_kappa)) progress_int = progress_int + 1 if progress_int < 99 else 0 set_progress(progress_int) stop_adding = 0 if summary: self.summary[loop_counter] = {'add': new_index, 'kappa': max_kappa, 'confusion_matrix': self._evaluate_selection(selected_indices)[1]} else: stop_adding = 1 # process of subtracting a selected model new_kappa, remove_index = self._remove_model(selected_indices) if new_kappa > max_kappa: max_kappa = new_kappa log(str(loop_counter) + ": removed endmember: " + str(selected_indices[remove_index]) + " - Kappa at this point: " + str(max_kappa)) progress_int = progress_int + 1 if progress_int < 99 else 0 set_progress(progress_int) selected_indices = np.delete(selected_indices, remove_index) stop_removing = 0 if summary: self.summary[loop_counter] = {'remove': selected_indices[remove_index], 'r_kappa': max_kappa, 'r_confusion_matrix': self._evaluate_selection(selected_indices)[1]} else: stop_removing = 1 loop_counter += 1 if summary: return selected_indices, self.summary else: return selected_indices
[docs]def printProgress(value: int): """ Replacement for the GUI progress bar """ print('progress: {} %'.format(value))