Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy
- import random
- class eGreedyAgent:
- value = []
- counter = []
- probability = [0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]
- score = 0
- preference = 0
- counter_exploration = 0
- counter_exploitation = 0
- def __init__(self, e1=0.2, n=10, temp=1):
- self.e = e1
- self.score = 0
- self.value = []
- self.counter = []
- self.temp = temp
- for s in range(n):
- self.value.append([s, 0])
- self.counter.append(0)
- def random_choice(self):
- action = random.randint(0, 9)
- self.counter[action] += 1
- return action
- def greedy_choice(self):
- r = numpy.random.random()
- max_action = self.max_finder()
- if r > self.e:
- self.counter[max_action] += 1
- self.counter_exploitation += 1
- return max_action # exploit
- else:
- action = random.randint(0, 8)
- if action >= max_action:
- action += 1
- self.counter[action] += 1
- self.counter_exploration += 1
- return action # exploration
- def persuit_choice(self):
- r = numpy.random.random_sample()
- # print("r is ")
- # print(r)
- max_action = self.max_finder()
- # print('max_action')
- # print(max_action)
- self.probability[max_action] += 0.01 * (1 - self.probability[max_action])
- for u in range(10):
- if u != max_action:
- self.probability[u] += 0.01 * (0 - self.probability[u])
- possibility_lake = 1
- for i in range(10):
- possibility_lake -= self.probability[i]
- # print('probability is')
- # print(possibility_lake)
- # print(r)
- # print('&&&&')
- # print(possibility_lake)
- # print('')
- if r > possibility_lake:
- self.counter[i] += 1
- # print('thus i')
- # print(i)
- return i
- def exponential_choice(self, temp=1):
- r = numpy.random.random()
- value_exp = []
- maxrange = self.value[0][1]
- for v in range(10):
- if self.value[v][1] > maxrange:
- maxrange = self.value[v][1]
- for u in range(10):
- if maxrange > 30:
- value_exp.append(numpy.exp((self.value[u][1] - (maxrange - 30)) / temp))
- elif maxrange < 10:
- value_exp.append(numpy.exp((self.value[u][1] + (10 - maxrange)) / temp))
- else:
- value_exp.append(numpy.exp(self.value[u][1] / temp))
- # print('*****')
- # print('1')
- # print(self.value)
- # print('2')
- # print(value_exp)
- # print('*****')
- sum_value_exp = sum(value_exp)
- sum_possibility = sum_value_exp
- # print(value_exp)
- for pointer in range(10):
- sum_possibility = sum_possibility - value_exp[pointer]
- # print(sum_possibility / sum_value_exp)
- # print(r)
- # print('-----')
- if sum_possibility / sum_value_exp < r:
- self.counter[pointer] = self.counter[pointer] + 1
- # print(pointer)
- # print('pointer')
- return pointer
- def value_feedback(self, action=0, reward=0):
- self.score += reward
- self.value[action][1] = self.value[action][1] + (reward - self.value[action][1]) / self.counter[action]
- def nonstationary_value_feedback(self, action=0, reward=0):
- self.score += reward
- self.value[action][1] = self.value[action][1] + 0.1 * (reward - self.value[action][1])
- def reinforcement_comparison_feedback(self, action=0, reward=0):
- # print('reward')
- # print(reward)
- # print('value')
- # print(self.value)
- self.score += reward
- value_exp = []
- maxrange = self.value[0][1]
- for v in range(10):
- if self.value[v][1] > maxrange:
- maxrange = self.value[v][1]
- # print('maxrange')
- # print(maxrange)
- for u in range(10):
- if maxrange > 30:
- value_exp.append(numpy.exp((self.value[u][1] - (maxrange - 30)) / self.temp))
- elif maxrange < 10:
- value_exp.append(numpy.exp((self.value[u][1] + (10 - maxrange)) / self.temp))
- else:
- value_exp.append(numpy.exp(self.value[u][1] / self.temp))
- self.value[action][1] = self.value[action][1] + 0.1 * (reward - self.preference) * (
- 1 - value_exp[action] / sum(value_exp))
- # print(value_exp)
- # print('aftervalue')
- # print(self.value)
- self.preference = self.preference + 0.1 * (reward - self.preference)
- def reinforcement_comparison_feedback_without_factor(self, action=0, reward=0):
- # print('reward')
- # print(reward)
- # print('value')
- # print(self.value)
- self.score += reward
- value_exp = []
- maxrange = self.value[0][1]
- for v in range(10):
- if self.value[v][1] > maxrange:
- maxrange = self.value[v][1]
- # print('maxrange')
- # print(maxrange)
- for u in range(10):
- if maxrange > 30:
- value_exp.append(numpy.exp((self.value[u][1] - (maxrange - 30)) / self.temp))
- elif maxrange < 10:
- value_exp.append(numpy.exp((self.value[u][1] + (10 - maxrange)) / self.temp))
- else:
- value_exp.append(numpy.exp(self.value[u][1] / self.temp))
- self.value[action][1] = self.value[action][1] + 0.1 * (reward - self.preference)
- # print(value_exp)
- # print('aftervalue')
- # print(self.value)
- self.preference = self.preference + 0.1 * (reward - self.preference)
- def max_finder(self):
- max_value = 0
- max_action = 0
- for r1 in range(10):
- if max_value < self.value[r1][1]:
- max_value = self.value[r1][1]
- max_action = self.value[r1][0]
- return max_action
- def reset(self):
- # print("result")
- # print(self.value)
- # print(self.counter)
- self.score = 0
- self.value = []
- self.counter = []
- self.probability = [0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]
- class task:
- tasks = []
- def __init__(self, num=2000, stationary=True):
- self.tasks = []
- if stationary:
- for n in range(num):
- subtask = []
- for j in range(10):
- subtask.append(numpy.random.rand())
- self.tasks.append(subtask)
- else:
- for n in range(num):
- subtask = []
- for j in range(10):
- subtask.append(0)
- self.tasks.append(subtask)
- def reward(self, subtask=0, action=0):
- return numpy.random.normal(self.tasks[subtask][action])
- def nonstationarymove(self, subtask=0):
- for n in range(10):
- dice = numpy.random.rand()
- if dice > 0.5:
- self.tasks[subtask][n] += numpy.random.rand()
- else:
- self.tasks[subtask][n] -= numpy.random.rand()
Add Comment
Please, Sign In to add comment