Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from collections import defaultdict
- from functools import cache
- import networkx as nx
- CapacityMap = dict[str, int]
- def parse(filepath) -> tuple[nx.Graph, CapacityMap]:
- G = nx.Graph()
- capacity: dict[str, int] = defaultdict(int)
- with open(filepath, "r") as f:
- for line in f.readlines():
- left, right = line.split(";")
- name = left[6:8].strip()
- node_capacity = int(left[23:])
- capacity[name] = node_capacity
- for child in (node.strip() for node in right[23:].split(",")):
- G.add_edge(name, child)
- return G, capacity
- def find_max_open_valve_path(g: nx.Graph, capacities: CapacityMap, steps=30):
- @cache
- def distance(start, end):
- return nx.shortest_path(g, start, end)
- @cache
- def _search(current, closed_nodes, open_nodes, steps) -> int:
- if not closed_nodes:
- return 0
- max_value = 0
- for dest_node in closed_nodes:
- path = distance(current, dest_node)
- if len(path) < steps:
- flow_lifetime = steps - len(path)
- amortized_value = flow_lifetime * capacities[dest_node]
- next_closed_nodes = frozenset(
- node for node in closed_nodes if node != dest_node
- )
- next_opened_nodes = frozenset([*open_nodes, dest_node])
- max_value = max(
- max_value,
- amortized_value
- + _search(
- dest_node, next_closed_nodes, next_opened_nodes, flow_lifetime
- ),
- )
- return max_value
- open_nodes = frozenset(["AA"])
- closed_with_capacity = frozenset(k for (k, v) in capacities.items() if v)
- start = "AA"
- steps = 30
- return _search(start, closed_with_capacity, open_nodes, steps)
- def find_max_open_valve_path_elephant(g: nx.Graph, capacities: CapacityMap, steps=26):
- @cache
- def distance(start, end):
- return nx.shortest_path(g, start, end)
- @cache
- def _search(
- h_position, e_position, closed_nodes, steps, h_residual, e_residual
- ) -> int:
- if not closed_nodes:
- return 0
- max_value = 0
- if (h_residual == e_residual) or (h_residual < e_residual):
- start_node = h_position
- current_residual = h_residual
- else:
- start_node = e_position
- current_residual = e_residual
- for dest_node in closed_nodes:
- path = distance(start_node, dest_node)
- if len(path) < (steps - current_residual):
- flow_lifetime = steps - len(path) - current_residual
- amortized_value = flow_lifetime * capacities[dest_node]
- next_closed_nodes = frozenset(
- node for node in closed_nodes if node != dest_node
- )
- if (h_residual == e_residual) or (h_residual < e_residual):
- next_e_residual = e_residual - len(path)
- next_h_residual = h_residual + len(path)
- next_h_position = dest_node
- next_e_position = e_position
- if next_h_residual < next_e_residual:
- next_steps = steps
- else:
- next_steps = steps - next_h_residual
- else:
- next_h_residual = h_residual - len(path)
- next_e_residual = e_residual + len(path)
- next_e_position = dest_node
- next_h_position = h_position
- if next_e_residual < next_h_residual:
- next_steps = steps
- else:
- next_steps = steps - next_e_residual
- max_value = max(
- max_value,
- amortized_value
- + _search(
- next_h_position,
- next_e_position,
- next_closed_nodes,
- next_steps,
- next_h_residual,
- next_e_residual,
- ),
- )
- return max_value
- init_open_nodes = frozenset(["AA"])
- closed_with_capacity = frozenset(k for (k, v) in capacities.items() if v)
- init_e_position = "AA"
- init_h_position = "AA"
- init_h_residual = 0
- init_e_residual = 0
- return _search(
- init_h_position,
- init_e_position,
- closed_with_capacity,
- steps,
- init_h_residual,
- init_e_residual,
- )
- def part_one(filepath) -> int:
- g, capacity = parse(filepath)
- return find_max_open_valve_path(g, capacity, 30)
- def part_two(filepath) -> int:
- g, capacity = parse(filepath)
- return find_max_open_valve_path_elephant(g, capacity, 26)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement