Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- class markov_node:
- def __init__(self, name, arcs):
- self.name = name
- self.arcs = arcs
- self.ssr_flag = False
- self.head_flag = False
- self.guarantee_flag = False
- self.connector_flag = False
- def __init__(self, name):
- self.name = name
- self.arcs = []
- self.ssr_flag = False
- self.head_flag = False
- self.guarantee_flag = False
- self.connector_flag = False
- def print_indent_details(self):
- print("\tnode: " + self.name)
- summation = 0.0
- for e in self.arcs:
- print("\t\tto " + e[0].name + ", weight = %5.4f" % (e[1]))
- summation = summation + e[1]
- print("\t\tSum of weight = %5.4f" % (summation))
- class markov_cluster:
- def __init__(self, name, nodes):
- self.name = name
- self.nodes = nodes
- self.wp_w8 = 2
- self.stg_w8 = 3
- self.dup_w8 = 0
- def __init__(self, name):
- self.name = name
- self.nodes = []
- self.wp_w8 = 2
- self.stg_w8 = 3
- self.dup_w8 = 0
- def set_weight(self, wp_w8, stg_w8, dup_w8):
- self.wp_w8 = wp_w8
- self.stg_w8 = stg_w8
- self.dup_w8 = dup_w8
- def make_normal_cluster(self, r_rate, ssr_rate, single_target_rate):
- self.nodes = []
- target_rate = (self.wp_w8 + self.stg_w8 + self.dup_w8)*single_target_rate
- for i in range(10):
- self.nodes.append(markov_node("%i" % (i)))
- self.nodes[0].ssr_flag = True
- self.nodes[1].head_flag = True
- for i in range(10):
- if i == 9:
- self.nodes[i].arcs.append((self.nodes[0], 1 - (target_rate/ssr_rate)))
- self.nodes[i].guarantee_flag = True
- else:
- self.nodes[i].arcs.append((self.nodes[0], ssr_rate - target_rate))
- self.nodes[i].arcs.append((self.nodes[i+1], r_rate))
- def make_connector(self, r_rate, ssr_rate, single_target_rate):
- target_rate = (self.wp_w8 + self.stg_w8 + self.dup_w8)*single_target_rate
- temp_node = markov_node(self.name + "_connector")
- temp_node.connector_flag = True
- for v in self.nodes:
- if v.ssr_flag:
- temp_node.arcs.append((v, ssr_rate - target_rate))
- if v.head_flag:
- temp_node.arcs.append((v, r_rate))
- self.nodes.append(temp_node)
- def make_internal_cluster(self, r_rate, ssr_rate, single_target_rate):
- self.make_normal_cluster(r_rate, ssr_rate, single_target_rate)
- self.make_connector(r_rate, ssr_rate, single_target_rate)
- def make_terminal_cluster(self):
- self.nodes = []
- temp_node = markov_node(self.name + "_connector")
- temp_node.connector_flag = True
- temp_node.arcs.append((temp_node, 1.0))
- self.nodes.append(temp_node)
- def print_details(self):
- print("Cluster: " + self.name)
- print("wp weight = %i, stg weight = %i, dup weight = %i" % (self.wp_w8,
- self.stg_w8,
- self.dup_w8))
- for v in self.nodes:
- v.print_indent_details()
- ####################################################################################
- ## Wishing Well Strategy
- ####################################################################################
- def make_chains():
- ####################################################################################
- ## begin parameters
- r_rate = 0.876
- ssr_rate = 0.124
- single_target_rate = 0.0124 ## 3 stg * 1, 1 wp * 2
- target_limit = (1,3,2)
- end_state = [(1,2,2),(1,3,0),(1,3,1),(1,3,2)]
- ##if stg == 0 no dup
- ## end parameters
- ####################################################################################
- ####################################################################################
- ## begin constructing chains
- clusters = []
- C_list = {}
- temp_cluster = markov_cluster("C(0,0,0)")
- temp_cluster.state = (0,0,0)
- temp_cluster.set_weight(2,3,0)
- temp_cluster.make_normal_cluster(r_rate, ssr_rate, single_target_rate)
- clusters.append(temp_cluster)
- C_list[(0,0,0)] = temp_cluster
- for C in clusters:
- if C.state != ("terminal"):
- ##got wp
- if C.state[0] < target_limit[0]:
- temp_state = (C.state[0]+1, C.state[1], C.state[2])
- if temp_state in end_state:
- if ("terminal") not in C_list:
- temp_cluster = markov_cluster("terminal_cluster")
- temp_cluster.state = ("terminal")
- temp_cluster.make_terminal_cluster()
- clusters.append(temp_cluster)
- C_list[temp_cluster.state] = temp_cluster
- else:
- temp_cluster = C_list[("terminal")]
- elif temp_state not in C_list:
- temp_cluster = markov_cluster("C(" +
- str(temp_state[0]) + "," +
- str(temp_state[1]) + "," +
- str(temp_state[2]) + ")")
- temp_cluster.state = temp_state
- temp_cluster.set_weight(C.wp_w8 - 2,
- C.stg_w8,
- C.dup_w8)
- temp_cluster.make_internal_cluster(r_rate,
- ssr_rate,
- single_target_rate)
- clusters.append(temp_cluster)
- C_list[temp_cluster.state] = temp_cluster
- else:
- temp_cluster = C_list[temp_state]
- for u in C.nodes:
- for v in temp_cluster.nodes:
- if v.connector_flag:
- if u.guarantee_flag:
- u.arcs.append((v, single_target_rate*C.wp_w8/ssr_rate))
- else:
- u.arcs.append((v, single_target_rate*C.wp_w8))
- ##got stg
- if C.state[1] < target_limit[1]:
- temp_state = (C.state[0], C.state[1]+1, C.state[2])
- if temp_state in end_state:
- if ("terminal") not in C_list:
- temp_cluster = markov_cluster("terminal_cluster")
- temp_cluster.state = ("terminal")
- temp_cluster.make_terminal_cluster()
- clusters.append(temp_cluster)
- C_list[temp_cluster.state] = temp_cluster
- else:
- temp_cluster = C_list[("terminal")]
- elif temp_state not in C_list:
- temp_cluster = markov_cluster("C(" +
- str(temp_state[0]) + "," +
- str(temp_state[1]) + "," +
- str(temp_state[2]) + ")")
- temp_cluster.state = temp_state
- temp_cluster.set_weight(C.wp_w8,
- C.stg_w8 - 1,
- C.dup_w8 + 1)
- temp_cluster.make_internal_cluster(r_rate,
- ssr_rate,
- single_target_rate)
- clusters.append(temp_cluster)
- C_list[temp_cluster.state] = temp_cluster
- else:
- temp_cluster = C_list[temp_state]
- for u in C.nodes:
- for v in temp_cluster.nodes:
- if v.connector_flag:
- if u.guarantee_flag:
- u.arcs.append((v, single_target_rate*C.stg_w8/ssr_rate))
- else:
- u.arcs.append((v, single_target_rate*C.stg_w8))
- ##got dup
- if C.state[2] < target_limit[2]:
- temp_state = (C.state[0], C.state[1], C.state[2]+1)
- if temp_state in end_state:
- if ("terminal") not in C_list:
- temp_cluster = markov_cluster("terminal_cluster")
- temp_cluster.state = ("terminal")
- temp_cluster.make_terminal_cluster()
- clusters.append(temp_cluster)
- C_list[temp_cluster.state] = temp_cluster
- else:
- temp_cluster = C_list[("terminal")]
- elif temp_state not in C_list:
- temp_cluster = markov_cluster("C(" +
- str(temp_state[0]) + "," +
- str(temp_state[1]) + "," +
- str(temp_state[2]) + ")")
- temp_cluster.state = temp_state
- if temp_state[2] == target_limit[2]:
- temp_cluster.set_weight(C.wp_w8,
- C.stg_w8,
- 0)
- else:
- temp_cluster.set_weight(C.wp_w8,
- C.stg_w8,
- C.dup_w8)
- temp_cluster.make_internal_cluster(r_rate,
- ssr_rate,
- single_target_rate)
- clusters.append(temp_cluster)
- C_list[temp_cluster.state] = temp_cluster
- else:
- temp_cluster = C_list[temp_state]
- for u in C.nodes:
- for v in temp_cluster.nodes:
- if v.connector_flag:
- if u.guarantee_flag:
- u.arcs.append((v, single_target_rate*C.dup_w8/ssr_rate))
- else:
- u.arcs.append((v, single_target_rate*C.dup_w8))
- ter_c_i = 0
- for i in range(len(clusters)):
- if clusters[i].state == ("terminal"):
- ter_c_i = i
- break
- for i in range(ter_c_i,len(clusters) - 1):
- t_1 = clusters[i]
- clusters[i] = clusters[i+1]
- clusters[i+1] = t_1
- return clusters
- ## end constructing chains
- ####################################################################################
- def print_clusters(clusters):
- ## for C in clusters:
- ## C.print_details()
- for C in clusters:
- print(C.name)
- ##def count(clusters):
- ## m = 0
- ## n = 0
- ## for c in clusters:
- ## for u in c.nodes:
- ## n+=1
- ## m+=1
- ## print("m = %i" % (m))
- ## print("n = %i" % (n))
- def node_count(clusters):
- counts = 0
- for c in clusters:
- for v in c.nodes:
- counts += 1
- return counts
- def make_matrix(clusters):
- index = []
- mapping = {}
- counts = 0
- for c in clusters:
- for v in c.nodes:
- index.append(v)
- mapping[v] = counts
- counts += 1
- adj_mat = np.array([])
- for i in range(counts):
- temp_row = np.zeros(counts)
- for v in index[i].arcs:
- temp_row[mapping[v[0]]]+=v[1]
- adj_mat = np.append(adj_mat,temp_row)
- return adj_mat.reshape(counts,counts).transpose()
- def test():
- clusters = make_chains()
- test_mat = make_matrix(clusters)
- print(np.matmul(np.ones(node_count(clusters)), test_mat))
- print(test_mat[:,node_count(clusters)-1])
- def test_pulls(numb):
- clusters = make_chains()
- test_mat = make_matrix(clusters)
- test_mat = np.linalg.matrix_power(test_mat, numb)
- print("%10.9f" % (test_mat[node_count(clusters)-1,0]))
- ## print("%10.9f" % (test_mat[node_count(clusters)-1,0]))
- def prob_vec(numb):
- return np.linalg.matrix_power(make_matrix(make_chains()), numb)
- def expected_vec():
- clusters = make_chains()
- test_mat = make_matrix(clusters)
- n = node_count(clusters)
- sub_mat = np.identity(n-1) - test_mat[0:n-1, 0:n-1]
- inverse_mat = np.linalg.inv(sub_mat)
- return np.matmul(np.ones(n-1), inverse_mat)[0:10]
- def calc_pulls(numb):
- clusters = make_chains()
- test_mat = make_matrix(clusters)
- test_mat = np.linalg.matrix_power(test_mat, numb)
- return test_mat[node_count(clusters)-1,0]
- def prob_table():
- for i in range(40):
- for j in range(5):
- val = calc_pulls(i*5+j+1)
- print("%3i : %6.5f " % (i*5+j+1,val), end=" ")
- print(" ")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement