class Evolver(object):
def __init__(self, mask="111111", decay_rate=0.022, mutate_prob=0.8, log_file=None):
self.mask = mask
self.decay_rate = decay_rate # 0.022
self.decay = self.getDecay(t=0)
self.mutate_prob = mutate_prob
self.hosts = []
self.scores = []
self.host_num = 0
self.log_dir = "Log"
self.log_file = log_file or "Log {date}.txt".format(date=datetime.now().strftime("%d.%m.%Y %H.%M"))
if not os.path.exists(self.log_dir):
print("- Creating log directory {}".format(self.log_dir))
os.mkdir(self.log_dir)
def generateModels(self, num_of_models=10, param=None, file_name=None):
self.hosts.clear()
if param:
if type(param) in (list, tuple):
print("Generating model from parameters!")
self.hosts += [Host(p, score=0) for p in param]
else:
print("x Invalid param type received; {}".format(type(param)))
elif file_name:
print("Generating model from a file!")
self.loadParameters(file_name)
else:
print("Generating random models!")
for i in range(num_of_models):
param = np.random.randint(low=1, high=256, size=4)
#param = [random.randint(1+i*10, 10+(i+2)*10) for i in range(4, 0, -1)]
self.hosts.append(Host(param, 0))
self.host_num = len(self.hosts)
def loadParameters(self, file_name):
def str2list(string):
string = string.rstrip("\n")
param, score = string.split("=")
return (list(map(int, param.split(","))), float(score))
with open(file_name, "r") as file:
self.hosts.clear()
for param in file:
_param, _score = str2list(param)
self.hosts.append(Host(_param, _score))
print("Parameters loaded from {}!".format(file_name))
def saveParameters(self, file_name, file_mode="w"):
list2str = lambda array: ",".join(map(str, array))
with open(os.path.join(self.log_dir, file_name), file_mode) as out_file:
for host in self.hosts:
out_file.write("{}={}\n".format(host.param_str, host.score))
print("Parameters saved at {}!".format(file_name))
def createModel(self, param, input_shape, output_size, name="Unknown"):
input_x = Input(shape=input_shape, name="Acceleration_x")
input_y = Input(shape=input_shape, name="Acceleration_y")
input_z = Input(shape=input_shape, name="Acceleration_z")
shared_layers = (LSTM(param[0], activation="tanh", name="Shared_LSTM", dropout=0.5),
Dense(param[1], activation="relu", name="Shared_Dense_1"),
Dense(param[2], activation="relu", name="Shared_Dense_2"))
shared_output = execute_layers(inputs=(input_x, input_y, input_z), layers=shared_layers)
concat = keras.layers.concatenate(shared_output, name="Concatenate")
dense_1 = Dense(param[3], activation="relu", name="Dense_1")(concat)
main_output = Dense(output_size, activation="softmax", name="Classification_Layer")(dense_1)
return Model(inputs=[input_x, input_y, input_z], outputs=main_output)
def testModel(self, model, epochs=8, batch_size=10, verbose=0):
model.compile(loss='categorical_crossentropy', optimizer="adam", metrics=['accuracy'])
model.fit(x=[*data_train], y=labels_train, epochs=epochs, batch_size=batch_size, verbose=verbose)
scores = model.evaluate(x=[*data_test], y=labels_test, batch_size=batch_size, verbose=verbose)
#print("Accuracy: %.2f%%" % (scores[1]*100))
return scores[1]
def runTest(self):
progressPercent = lambda n: float(n)/self.host_num
out = display(IPython.display.Pretty(" "), display_id=True)
for i, host in enumerate(self.hosts):
out.update(IPython.display.Pretty("@ Training progress: {:>4.1f}%".format(progressPercent(i+1)*100)))
if host.score == 0:
model = self.createModel(host.param, (1, max_review_length), NUM_LABELS, name="Model {}".format(i))
fitness = self.testModel(model, epochs=5)
keras.backend.clear_session()
self.hosts[i].score = fitness
self.sortFitness()
self.saveParameters(self.log_file, "a")
self.scores.append(self.averageScore(self.host_num))
def evolve(self, t):
evolve_num = int(self.host_num/2)
new_hosts = []
print("\n- Evolving the hosts")
print(" - Removing half of the weaker hosts")
del self.hosts[-evolve_num:]
print(" - Crossovering top 2 hosts")
crossover_param = self.crossover(self.hosts[0].param.copy(), self.hosts[1].param.copy(), 4)
for param in crossover_param:
new_hosts.append(Host(param, 0))
print(" - Mutating remaining top hosts")
mutation_num = evolve_num - len(crossover_param)
self.decay = self.getDecay(t)
for _ in range(mutation_num):
param = random.choice(self.hosts).param.copy()
mutated_param = self.mutate(param)
new_hosts.append(Host(mutated_param, 0))
print(" - Adding hosts")
print(" {}".format(", ".join("[{}]".format(host.param_str) for host in new_hosts)))
self.hosts += new_hosts
new_hosts.clear()
def sortFitness(self):
self.hosts = sorted(self.hosts, key=lambda host: host.score, reverse=True)
def _maskHost(self, host, mask):
return [value for value, mask_bit in zip(host, mask) if mask_bit == "1"]
def masking(*host_indexs):
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
mask = kwargs.get("mask", self.mask)
if args:
args = list(args)
for i in host_indexs:
args[i] = self._maskHost(args[i], mask)
result = func(self, *args, **kwargs)
return result
return wrapper
return decorator
def crossover(self, paramA, paramB, max_crossover=None, mask=None):
assert len(paramA) == len(paramB), "Host A and B are not of same length; {} and {}".format(len(paramA), len(paramB))
HOST_LEN = len(paramA)
max_crossover = max_crossover or HOST_LEN
assert 0 <= max_crossover <= HOST_LEN, "Maximum cross number greater than crossable host length"
cross_index = {random.randint(0, HOST_LEN-1) for _ in range(max_crossover)}
cross_index_A = list(cross_index.copy())
cross_index_B = list(cross_index.copy())
del cross_index
random.shuffle(cross_index_B)
print(" {} & {}".format(paramA, paramB), end=" >> ")
print(cross_index_A, cross_index_B, sep=" & ", end=" >> ")
for i, j in zip(cross_index_A, cross_index_B):
paramA[i], paramB[j] = paramB[j], paramA[i]
print("{} & {}".format(paramA, paramB))
return paramA, paramB
def getDecay(self, t=0):
e = 2.718
return e**(-self.decay_rate*t)
def mutate(self, param, mask=None):
PARAM_LEN = len(param)
print(" ", param, end=" >> ")
mutate_prob = self.mutate_prob*self.decay
mutate_limit = max(1, round(20*self.decay))
for i in range(PARAM_LEN):
if random.random() <= mutate_prob:
rand_value = random.randint(-mutate_limit, mutate_limit)
param[i] = max(8, param[i] + rand_value)
del rand_value
print(param)
return param
def printHosts(self):
print("\n# All hosts")
for i in range(self.host_num):
print(" {:>2}) {}: {:.5f}".format(i+1, self.hosts[i].param, self.hosts[i].score), end="\t\t")
if not (i+1)%2:
print()
print("- Average score: {:.5f}".format(self.scores[-1]))
def averageScore(self, num=3):
assert 0 < num <= self.host_num, "Given number out of range!"
mean_score = sum(self.hosts[i].score for i in range(num))/num
return mean_score