"""
This module contains Machine Learning tools.
Created on Wed Sep 8 00:00:00 2021
@author: Jose Manuel Tapia Avitia, e-mail: josetapia@exatec.tec.mx
"""
from os import makedirs as _create_path
from os.path import exists as _check_path
from timeit import default_timer as timer
import numpy as np
import pandas as pd
import tensorflow as tf
[docs]
def obtain_sample_weight(sample_fitness, fitness_to_weight="rank"):
"""
Using decreasing functions to give more priority to samples with less fitness
:param list sample_fitness: The fitness associated value for each sample
:param str fitness_to_weight: Specify which function use to convert fitness to weight
:return: An array that associates a weight to each sample
"""
a = min(sample_fitness)
b = max(sample_fitness)
if fitness_to_weight == "linear_reciprocal":
# f: [a, b] -> (0, 1]
def weight_conversion(fitness):
return fitness / a
# weight_conversion = lambda fitness: a / fitness
elif fitness_to_weight == "linear_reciprocal_translated":
# f: [a, b] -> [1, b-a+1]
def weight_conversion(fitness):
return a * b / fitness - a + 1
# weight_conversion = lambda fitness: a * b / fitness - a + 1
elif fitness_to_weight == "linear_percentage":
# f: [a, b] -> [1, 100]
def weight_conversion(fitness):
return 100 * (b - fitness) / (b - a) + 1
# weight_conversion = lambda fitness: 100 * (b - fitness) / (b - a) + 1
elif fitness_to_weight == "rank":
# f: [fitness] -> [0, n-1]
indices = list(range(len(sample_fitness)))
indices.sort(key=lambda idx: -sample_fitness[idx])
indices_sorted = [0 for _ in indices]
for i, idx in enumerate(indices):
indices_sorted[idx] = i
return indices_sorted
else:
# Default linear conversion
# f: [a, b] -> [a, b]
def weight_conversion(fitness):
return a + b - fitness
# weight_conversion = lambda fitness: a + b - fitness
return [weight_conversion(fitness) for fitness in sample_fitness]
[docs]
class DatasetSequences:
[docs]
def __init__(self, sequences, fitnesses, num_operators=None, fitness_to_weight=None):
"Pre-process sequences to generate training data for HHNN"
X, y, sample_fitness = [], [], []
for sequence, fitness in zip(sequences, fitnesses, strict=True):
if len(sequence) > 0 and sequence[0] == -1:
sequence.pop(0)
fitness.pop(0)
while len(sequence) > 0:
# Per each prefix, predict the next operator
y.append(sequence.pop())
X.append(sequence.copy())
sample_fitness.append(fitness.pop())
self._X = X
self._y = y
if fitness_to_weight is not None:
self._sample_weight = tf.constant(obtain_sample_weight(sample_fitness, fitness_to_weight))
else:
self._sample_weight = None
self._one_hot_encoded = None
if num_operators is not None:
self.apply_one_hot_encoding(num_operators)
[docs]
def apply_one_hot_encoding(self, num_operators):
"One-Hot encode the output of the training data"
if self._one_hot_encoded is not None:
if self._one_hot_encoded != num_operators:
self._y = [np.where(y_one == 1)[0][0] for y_one in self._y]
else:
return
self._one_hot_encoded = num_operators
if num_operators is not None:
self._y = tf.one_hot(indices=self._y, depth=num_operators, dtype=tf.int64).numpy()
[docs]
def obtain_dataset(self):
"Retrieve the pre-processed data"
return self._X, self._y, self._sample_weight
[docs]
def retrieve_model_info(params):
# Check essential attributes
essential_attributes = ["file_label", "model_architecture", "encoder", "num_steps"]
if not all(attribute in params for attribute in essential_attributes):
left_attributes = [attribute for attribute in essential_attributes if attribute not in params]
raise Exception(f"The following attributes left while retrieving the model info: {left_attributes}")
# Names
architecture_name = params["model_architecture"]
encoder_name = params["encoder"]
# Model label
memory_length = params.get("memory_length", params["num_steps"])
attribute_labels = [params["file_label"], f"mem{memory_length}"]
if "pretrained_model" in params:
attribute_labels.append(params["pretrained_model"])
model_label = "-".join(attribute_labels)
personal_label = params["file_label"].split("_")
if personal_label[-1] == "extended":
personal_label.pop()
personal_labels = "_".join(personal_label)
# Filenames
model_directory = "./data_files/ml_models/"
model_filename = model_label
if architecture_name == "transformer":
model_directory = model_directory + f"{model_label}_dir/"
model_filename = "trained_model"
filename_dict = {
"model_directory": model_directory,
"model_label": model_label,
"model_path": model_directory + f"{personal_labels}.h5",
"log_path": model_directory + f"{model_filename}_log.csv",
"log_time_path": model_directory + f"{model_filename}_log_time.csv",
}
return architecture_name, encoder_name, filename_dict
[docs]
class Encoder:
def __init__(self, params):
self._encoder_name = params["encoder"]
self._architecture_name = params["model_architecture"]
self._memory_length = params.get("memory_length", 100)
self._num_operators = params["num_operators"]
def one_hot_encode(sequence):
return tf.one_hot(indices=sequence, depth=self._num_operators, dtype=tf.int64).numpy()
def compose(f, g):
return lambda x: f(g(x))
# Choice identity encoder
if self._architecture_name in ["transformer", "transformer_orig", "LSTM_Ragged"]:
# Keep original values but element -1
self.__identity_encoder = self.__clean_sequence
else:
# Keep original values but fix the length
self.__identity_encoder = compose(self.__fix_sequence_length, self.__clean_sequence)
# Get encoder module
if self._encoder_name in ["identity", "default"]:
encoder = self.__identity_encoder
elif self._encoder_name in ["one_hot_encoder"]:
# Fix sequence, then one-hot encode it
encoder = compose(one_hot_encode, self.__identity_encoder)
else:
raise Exception("Encoder name does not exists")
# Prepare if LSTM is used
if self._architecture_name in ["LSTM"]:
# Encode sequence, then reshape
self._encoder = compose(self.__lstm_sequence, encoder)
else:
self._encoder = encoder
[docs]
def encode(self, sequence):
return self._encoder(sequence)
def __fix_sequence_length(self, sequence):
"Fill a sequence with a dummy value until a fixed length"
suffix = sequence[: self._memory_length]
left_len = self._memory_length - len(suffix)
prefix = [self._num_operators for _ in range(left_len)]
return prefix + suffix
def __clean_sequence(self, sequence):
"Keep original values but first -1 element"
sequence_copy = sequence.copy()
while len(sequence_copy) > 0 and sequence_copy[0] == -1:
sequence_copy.pop(0)
if len(sequence_copy) == 0:
sequence_copy.append(self._num_operators)
return sequence_copy
@staticmethod
def __lstm_sequence(sequence):
"Reshape sequence for LSTM architecture usage"
return [[x] for x in sequence]
[docs]
class ModelPredictorKeras:
# Keras TensorFlow Artificial Neural Network Model
def __init__(self, params):
# Get encoder
params["memory_length"] = params.get("memory_length", params["num_steps"])
self._params = params.copy()
self._encoder = Encoder(params.copy()).encode
self.__create_keras_model()
def __create_keras_model(self):
# Create model
self._model = tf.keras.Sequential()
self._num_operators = self._params["num_operators"]
architecture_name = self._params["model_architecture"]
input_size = self._params["memory_length"]
hidden_layers = self._params["model_architecture_layers"]
# Input layer
if architecture_name in ["MLP"]:
# MLP input
self._model.add(tf.keras.Input(shape=input_size))
elif architecture_name in ["LSTM_Ragged"]:
# Variable length, supported using ragged tensors
max_length_sequence = self._params["num_steps"]
self._model.add(tf.keras.Input(shape=(max_length_sequence,)))
# Embedding layer
if len(hidden_layers) > 0:
first_layer_size, _, _ = hidden_layers[0]
else:
first_layer_size = self._num_operators
self._model.add(tf.keras.layers.Embedding(self._num_operators + 1, first_layer_size))
elif architecture_name in ["LSTM"]:
# LSTM input
self._model.add(tf.keras.Input(shape=(input_size, 1)))
# Hidden layers
num_lstm_layers = sum("LSTM" == layer_type for _, _, layer_type in hidden_layers)
for idx, (layer_size, layer_activation, layer_type) in enumerate(hidden_layers):
if layer_type == "Dense":
self._model.add(tf.keras.layers.Dense(units=layer_size, activation=layer_activation))
elif layer_type == "LSTM":
self._model.add(
tf.keras.layers.LSTM(
units=layer_size, activation=layer_activation, return_sequences=idx + 1 < num_lstm_layers
)
)
# Output layer
self._model.add(tf.keras.layers.Dense(self._num_operators, activation="softmax"))
# Compile model
self._model.compile(
loss=tf.keras.losses.CategoricalCrossentropy(), optimizer=tf.keras.optimizers.Adam(), metrics=["accuracy"]
)
def __convert_tensor(self, tensor):
if self._params["model_architecture"] in ["LSTM_Ragged"]:
return tf.ragged.constant(tensor)
else:
return tf.constant(tensor)
[docs]
def fit(
self, X, y, epochs=100, sample_weight=None, verbose=False, early_stopping_params=None, verbose_statistics=False
):
# Pre-process dataset
X_encoded = [self._encoder(x) for x in X]
X_tensor = self.__convert_tensor(X_encoded)
y_tensor = self.__convert_tensor(y)
# Callbacks
callbacks = []
_, _, filename_dict = retrieve_model_info(self._params)
# History Logger
if verbose_statistics:
if not _check_path(filename_dict["model_directory"]):
_create_path(filename_dict["model_directory"])
history_logger = tf.keras.callbacks.CSVLogger(filename_dict["log_path"], separator=",", append=True)
callbacks.append(history_logger)
class TimingCallback(tf.keras.callbacks.Callback):
def __init__(self, logs=None):
self.logs = []
def on_epoch_begin(self, epoch, logs=None):
self.start_time = timer()
def on_epoch_end(self, epoch, logs=None):
self.logs.append(timer() - self.start_time)
timing_cb = TimingCallback()
if verbose_statistics:
callbacks.append(timing_cb)
# Early stopping
if early_stopping_params is not None and verbose_statistics:
early_stopping = tf.keras.callbacks.EarlyStopping(
monitor=early_stopping_params["monitor"],
patience=early_stopping_params["patience"],
mode=early_stopping_params["mode"],
)
callbacks.append(early_stopping)
# Train model
self._model.fit(
X_tensor, y_tensor, epochs=epochs, sample_weight=sample_weight, verbose=verbose, callbacks=callbacks
)
if verbose_statistics:
df_times = pd.DataFrame({"time": timing_cb.logs})
df_times.to_csv(filename_dict["log_time_path"])
# Save predict function
self._predict = self._model.predict
[docs]
def predict(self, sequence):
# Use model to predict weights
tensor = self.__convert_tensor([self._encoder(sequence)])
return self._predict(tensor)[0]
[docs]
def load(self, model_path=None):
if model_path is None:
_, _, filename_dict = retrieve_model_info(self._params)
model_path = filename_dict["model_path"]
if _check_path(model_path):
self._model = tf.keras.models.load_model(model_path)
# Save predict function
self._predict = self._model.predict
return True
else:
raise Exception(f'model_path "{model_path}" does not exists')
[docs]
def save(self, model_path=None):
if model_path is None:
_, _, filename_dict = retrieve_model_info(self._params)
if not _check_path(filename_dict["model_directory"]):
_create_path(filename_dict["model_directory"])
model_path = filename_dict["model_path"]
self._model.save(model_path)
[docs]
def ModelPredictor(params):
# Function that decide which ML model uses.
# For now, it is only supported the NN architecture with dense
# and lstm layers. Future version will support Transformers.
return ModelPredictorKeras(params)