Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pandas as pd
- import numpy as np
- from copy import deepcopy
- with open('./data/01.txt', 'r') as f:
- data = [line.strip() for line in f.readlines()]
- # %%
- # extract all need data from line
- def interpret_line(line: str):
- valvepart, tunnelpart = line.split(';')
- valvepart, flowrate = valvepart.split('=')
- flowrate = int(flowrate)
- valve = valvepart.split(' ')[1].strip()
- tunnels = tunnelpart.split(',')
- connected = [tunnels[0][-2:].strip(), *[t.strip() for t in tunnels[1:]]]
- return valve, flowrate, connected
- # %%
- # save data in DataFrame
- df = pd.DataFrame(columns=['Valve', 'Flowrate', 'Connected'])
- for line in data:
- df = df.append(
- pd.Series(
- interpret_line(line),
- index=['Valve', 'Flowrate', 'Connected']),
- ignore_index=True)
- # %%
- def build_adjacency_matrix(df: pd.DataFrame) -> pd.DataFrame:
- # build adjacency matrix by exploding and pivoting dataframe
- index_dict = {}
- for key, val in dict(df.Valve).items():
- index_dict[val] = key
- df_exploded = df.explode(column='Connected')
- df_exploded.drop(columns=['Flowrate'], inplace=True)
- df_exploded['temp_vals'] = 1
- df_exploded.Connected = df_exploded.Connected.str.strip().map(index_dict)
- mat = df_exploded.pivot(
- columns=['Connected'],
- values=['temp_vals'])
- mat = mat.fillna(0)
- mat[mat.columns] = mat[mat.columns].astype(int)
- return mat
- # %%
- def build_distance_matrix_from_adjacency(A: np.ndarray) -> np.ndarray:
- # using the fact that A@A returns a matrix that contains
- # the number of ways to move from i to j in 2 steps
- # repeated use of this rule and saving the lowest steps for which
- # you can get from i to j gives distance matrix
- retA = A.copy()
- newA = A.copy()
- # have to set unknown values to "infinity" for np.minimum
- retA[retA == 0] = 10000
- for n in range(2, A.shape[0]):
- newA = newA@A
- mathA = newA.copy()
- # values that are not 0 are reacheable with n steps in at least one way
- # replace all non-zero values with n
- mathA[mathA != 0] = n
- # have to set unknown values to "infinity" for np.minimum
- mathA[mathA == 0] = 10000
- retA = np.minimum(retA, mathA) # keep minimum known n for each entry
- np.fill_diagonal(retA, 0)
- return retA
- # %%
- # build adjacency matrix from dataframe
- mat = build_adjacency_matrix(df).to_numpy()
- # build distance matrix from adjacency matrix
- dist = build_distance_matrix_from_adjacency(mat)
- # %%
- # ------ PUZZLE 16-01 ------
- # calculate the total flow for one path
- def calculate_total_flow_for_path(path: list[int], dist, df):
- minutes = 30 # minutes until volcano explodes
- # find index of valve AA
- cur_pos = df.Valve[df.Valve == 'AA'].index.tolist()[0]
- total_flow = 0
- # run through path and calculate pressure release for each node
- for node in path:
- minutes -= dist[cur_pos, node] # subtract minutes for traveling
- minutes -= 1 # subtract minutes for opening valve
- # break loop if all minutes used up
- if minutes < 0:
- break
- cur_pos = node # update current position after traveling
- # add flow of opened valve to total flow
- total_flow += df.Flowrate.iloc[node] * minutes
- return total_flow
- # list of valves with flowrate > 0
- potential_valves = df[df.Flowrate != 0].index.tolist()
- path = np.random.permutation(potential_valves) # start with random permutation
- # calculate current pressue released, use negative of
- # pressure release to optimize via minimization
- old_E = -calculate_total_flow_for_path(path, dist, df)
- # #---# SIMULATED ANNEALING #---#
- Temp = 100 # start temp
- Tlow = 0.01 # stop temp
- Steps = 5 # thermalization steps at each temp
- dT = -0.01 # temp change with every while loop step
- lowest_path = deepcopy(path)
- count = 0 # while loop counter, used for printing annealing info
- # annealing loop
- while Temp > Tlow:
- # thermalization loop
- for n in range(Steps):
- new_path = path.copy()
- # choose valves i and j to swap in path
- i = np.random.randint(len(potential_valves))
- j = np.random.randint(len(potential_valves))
- while j == i:
- j = np.random.randint(len(potential_valves))
- new_path[[i, j]] = new_path[[j, i]]
- # calculate pressure released for new path
- new_E = -calculate_total_flow_for_path(new_path, dist, df)
- dE = new_E - old_E
- if dE <= 0.0:
- # if new path releases more pressure make new path the
- # reference path for next step in loop
- path = deepcopy(new_path)
- old_E = deepcopy(new_E)
- # new pressure release better than best known pressure release
- # save current path as best path
- best_E = -calculate_total_flow_for_path(lowest_path, dist, df)
- if (new_E - best_E < 0):
- lowest_path = deepcopy(new_path)
- elif np.exp(-dE/Temp) > np.random.random():
- # if new path worse than last path still change to new path
- # with probability proportional to boltzmann distribution
- path = deepcopy(new_path)
- old_E = deepcopy(new_E)
- count += 1
- if count % 100 == 0:
- print(
- Temp,
- new_E,
- old_E,
- dE,
- new_path,
- path,
- lowest_path,
- calculate_total_flow_for_path(lowest_path, dist, df))
- Temp += dT
- print(
- "Most pressue Part 1:",
- calculate_total_flow_for_path(lowest_path, dist, df))
- # %%
- # ------ PUZZLE 16-02 ------
- # calculate the total flow for two paths
- def calculate_total_flow_for_paths(
- path0: list[int],
- path1: list[int],
- dist,
- df):
- total_flow = 0
- for path in [path0, path1]:
- minutes = 26 # minutes left at beginning
- # starting position at valve AA
- cur_pos = df.Valve[df.Valve == 'AA'].index.tolist()[0]
- # calculate total flow for path
- total_flow_path = 0
- for node in path:
- minutes -= dist[cur_pos, node] # minutes used traveling
- minutes -= 1 # minutes used opening valve
- # break loop if all minutes are used up
- if minutes < 0:
- break
- cur_pos = node # change current position to node
- # add flow of valve to total flow
- total_flow_path += df.Flowrate.iloc[node] * minutes
- total_flow += total_flow_path # add up flow for both paths
- return total_flow
- # %%
- # only get valves with flow rate higher than 0
- potential_valves = df[df.Flowrate != 0].index.tolist()
- path = np.random.permutation(potential_valves) # start with random path
- # randomly distribute valves between path of elefant and path
- # of person
- choice = np.random.choice(
- range(path.shape[0]),
- size=int(len(path)/2),
- replace=False)
- ind = np.zeros(path.shape[0], dtype=bool)
- ind[choice] = True
- path_p = list(path[ind]) # path of person
- path_e = list(path[~ind]) # path of elefant
- # calculate current pressue released, use negative of
- # pressure release to optimize via minimization
- old_E = -calculate_total_flow_for_paths(path_p, path_e, dist, df)
- # #---# SIMULATED ANNEALING #---#
- Temp = 100 # start temp
- Tlow = 0.01 # stop temp
- Steps = 5 # thermalization steps
- dT = -0.01 # temp change after thermalization
- # store paths with lowest know "negative pressure release"
- lowest_path_p = deepcopy(path_p)
- lowest_path_e = deepcopy(path_e)
- count = 0 # used to print info every x steps of while loop
- # annealing loop
- while Temp > Tlow:
- # thermalization loop
- for n in range(Steps):
- new_path_p = deepcopy(path_p)
- new_path_e = deepcopy(path_e)
- # metropolis update
- start_list = np.random.randint(2) # which list to start from
- # should the valve in question swap lists?
- change_list = np.random.randint(2)
- if change_list:
- # randomly choose item position i in start_list to take item from
- # and item position j in other list to insert item to
- if start_list == 0:
- i = np.random.randint(len(new_path_p))
- j = np.random.randint(len(new_path_e))
- num = new_path_p.pop(i)
- new_path_e.insert(j, num)
- else:
- i = np.random.randint(len(new_path_e))
- j = np.random.randint(len(new_path_p))
- num = new_path_e.pop(i)
- new_path_p.insert(j, num)
- else:
- # randomly choose item position i in list start_list to take item
- # from and item position j in same list to insert item to
- if start_list == 0:
- i = np.random.randint(len(new_path_p))
- j = np.random.randint(len(new_path_p))
- num = new_path_p.pop(i)
- new_path_p.insert(j, num)
- else:
- i = np.random.randint(len(new_path_e))
- j = np.random.randint(len(new_path_e))
- num = new_path_e.pop(i)
- new_path_e.insert(j, num)
- # calculate pressure released with new paths
- new_E = -calculate_total_flow_for_paths(
- new_path_p,
- new_path_e,
- dist,
- df)
- # check if new paths create lower negative pressure release
- # (-> release more pressure than old path)
- dE = new_E - old_E
- if dE <= 0.0:
- # if new paths release more pressure, make these paths
- # the current paths for next iteration
- path_p = deepcopy(new_path_p)
- path_e = deepcopy(new_path_e)
- old_E = deepcopy(new_E)
- # if pressure release better than lowest know solution
- # update lowest solution
- lowest_E = -calculate_total_flow_for_paths(
- lowest_path_p, lowest_path_e, dist, df)
- if (new_E - lowest_E < 0):
- lowest_path_p = deepcopy(new_path_p)
- lowest_path_e = deepcopy(new_path_e)
- elif np.exp(-dE/Temp) > np.random.random():
- # if new paths don't release more pressure still update these
- # paths to current paths for next iteration with probabilities
- # based on boltzman weights
- path_e = deepcopy(new_path_e)
- path_p = deepcopy(new_path_p)
- old_E = deepcopy(new_E)
- count += 1
- # print annealing results
- if count % 100 == 0:
- print(
- Temp,
- new_E,
- old_E,
- dE,
- lowest_path_e,
- lowest_path_p,
- calculate_total_flow_for_paths(
- lowest_path_e,
- lowest_path_p,
- dist,
- df))
- Temp += dT
- print(
- "Most pressue:",
- calculate_total_flow_for_paths(
- lowest_path_e,
- lowest_path_p,
- dist,
- df))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement