Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from collections.abc import Mapping, Set
- from collections import defaultdict
- # NodeViews
- class NodeView(Mapping, Set):
- __slots__ = '_nodes',
- def __getstate__(self):
- return {'_nodes': self._nodes}
- def __setstate__(self, state):
- self._nodes = state['_nodes']
- def __init__(self, graph):
- self._nodes = graph._node
- # Mapping methods
- def __len__(self):
- return len(self._nodes)
- def __iter__(self):
- return iter(self._nodes)
- def __getitem__(self, n):
- return self._nodes[n]
- # Set methods
- def __contains__(self, n):
- return n in self._nodes
- @classmethod
- def _from_iterable(cls, it):
- return set(it)
- # DataView method
- def __call__(self, data=False, default=None):
- if data is False:
- return self
- return NodeDataView(self._nodes, data, default)
- def data(self, data=True, default=None):
- if data is False:
- return self
- return NodeDataView(self._nodes, data, default)
- def __str__(self):
- return str(list(self))
- def __repr__(self):
- return '%s(%r)' % (self.__class__.__name__, tuple(self))
- class NodeDataView(Set):
- __slots__ = ('_nodes', '_data', '_default')
- def __getstate__(self):
- return {'_nodes': self._nodes,
- '_data': self._data,
- '_default': self._default}
- def __setstate__(self, state):
- self._nodes = state['_nodes']
- self._data = state['_data']
- self._default = state['_default']
- def __init__(self, nodedict, data=False, default=None):
- self._nodes = nodedict
- self._data = data
- self._default = default
- @classmethod
- def _from_iterable(cls, it):
- try:
- return set(it)
- except TypeError as err:
- if "unhashable" in str(err):
- msg = " : Could be b/c data=True or your values are unhashable"
- raise TypeError(str(err) + msg)
- raise
- def __len__(self):
- return len(self._nodes)
- def __iter__(self):
- data = self._data
- if data is False:
- return iter(self._nodes)
- if data is True:
- return iter(self._nodes.items())
- return ((n, dd[data] if data in dd else self._default)
- for n, dd in self._nodes.items())
- def __contains__(self, n):
- try:
- node_in = n in self._nodes
- except TypeError:
- n, d = n
- return n in self._nodes and self[n] == d
- if node_in is True:
- return node_in
- try:
- n, d = n
- except (TypeError, ValueError):
- return False
- return n in self._nodes and self[n] == d
- def __getitem__(self, n):
- ddict = self._nodes[n]
- data = self._data
- if data is False or data is True:
- return ddict
- return ddict[data] if data in ddict else self._default
- def __str__(self):
- return str(list(self))
- def __repr__(self):
- if self._data is False:
- return '%s(%r)' % (self.__class__.__name__, tuple(self))
- if self._data is True:
- return '%s(%r)' % (self.__class__.__name__, dict(self))
- return '%s(%r, data=%r)' % \
- (self.__class__.__name__, dict(self), self._data)
- # DegreeViews
- class DiDegreeView(object):
- def __init__(self, G, nbunch=None, weight=None):
- self._graph = G
- self._succ = G._succ if hasattr(G, "_succ") else G._adj
- self._pred = G._pred if hasattr(G, "_pred") else G._adj
- self._nodes = self._succ if nbunch is None \
- else list(G.nbunch_iter(nbunch))
- self._weight = weight
- def __call__(self, nbunch=None, weight=None):
- if nbunch is None:
- if weight == self._weight:
- return self
- return self.__class__(self._graph, None, weight)
- try:
- if nbunch in self._nodes:
- if weight == self._weight:
- return self[nbunch]
- return self.__class__(self._graph, None, weight)[nbunch]
- except TypeError:
- pass
- return self.__class__(self._graph, nbunch, weight)
- def __getitem__(self, n):
- weight = self._weight
- succs = self._succ[n]
- preds = self._pred[n]
- if weight is None:
- return len(succs) + len(preds)
- return sum(dd.get(weight, 1) for dd in succs.values()) + \
- sum(dd.get(weight, 1) for dd in preds.values())
- def __iter__(self):
- weight = self._weight
- if weight is None:
- for n in self._nodes:
- succs = self._succ[n]
- preds = self._pred[n]
- yield (n, len(succs) + len(preds))
- else:
- for n in self._nodes:
- succs = self._succ[n]
- preds = self._pred[n]
- deg = sum(dd.get(weight, 1) for dd in succs.values()) \
- + sum(dd.get(weight, 1) for dd in preds.values())
- yield (n, deg)
- def __len__(self):
- return len(self._nodes)
- def __str__(self):
- return str(list(self))
- def __repr__(self):
- return '%s(%r)' % (self.__class__.__name__, dict(self))
- class DegreeView(DiDegreeView):
- def __getitem__(self, n):
- weight = self._weight
- nbrs = self._succ[n]
- if weight is None:
- return len(nbrs) + (n in nbrs)
- return sum(dd.get(weight, 1) for dd in nbrs.values()) + \
- (n in nbrs and nbrs[n].get(weight, 1))
- def __iter__(self):
- weight = self._weight
- if weight is None:
- for n in self._nodes:
- nbrs = self._succ[n]
- yield (n, len(nbrs) + (n in nbrs))
- else:
- for n in self._nodes:
- nbrs = self._succ[n]
- deg = sum(dd.get(weight, 1) for dd in nbrs.values()) + \
- (n in nbrs and nbrs[n].get(weight, 1))
- yield (n, deg)
- class OutDegreeView(DiDegreeView):
- """A DegreeView class to report out_degree for a DiGraph; See DegreeView"""
- def __getitem__(self, n):
- weight = self._weight
- nbrs = self._succ[n]
- if self._weight is None:
- return len(nbrs)
- return sum(dd.get(self._weight, 1) for dd in nbrs.values())
- def __iter__(self):
- weight = self._weight
- if weight is None:
- for n in self._nodes:
- succs = self._succ[n]
- yield (n, len(succs))
- else:
- for n in self._nodes:
- succs = self._succ[n]
- deg = sum(dd.get(weight, 1) for dd in succs.values())
- yield (n, deg)
- class InDegreeView(DiDegreeView):
- """A DegreeView class to report in_degree for a DiGraph; See DegreeView"""
- def __getitem__(self, n):
- weight = self._weight
- nbrs = self._pred[n]
- if weight is None:
- return len(nbrs)
- return sum(dd.get(weight, 1) for dd in nbrs.values())
- def __iter__(self):
- weight = self._weight
- if weight is None:
- for n in self._nodes:
- preds = self._pred[n]
- yield (n, len(preds))
- else:
- for n in self._nodes:
- preds = self._pred[n]
- deg = sum(dd.get(weight, 1) for dd in preds.values())
- yield (n, deg)
- class MultiDegreeView(DiDegreeView):
- """A DegreeView class for undirected multigraphs; See DegreeView"""
- def __getitem__(self, n):
- weight = self._weight
- nbrs = self._succ[n]
- if weight is None:
- return sum(len(keys) for keys in nbrs.values()) + \
- (n in nbrs and len(nbrs[n]))
- # edge weighted graph - degree is sum of nbr edge weights
- deg = sum(d.get(weight, 1) for key_dict in nbrs.values()
- for d in key_dict.values())
- if n in nbrs:
- deg += sum(d.get(weight, 1) for d in nbrs[n].values())
- return deg
- def __iter__(self):
- weight = self._weight
- if weight is None:
- for n in self._nodes:
- nbrs = self._succ[n]
- deg = sum(len(keys) for keys in nbrs.values()) + \
- (n in nbrs and len(nbrs[n]))
- yield (n, deg)
- else:
- for n in self._nodes:
- nbrs = self._succ[n]
- deg = sum(d.get(weight, 1) for key_dict in nbrs.values()
- for d in key_dict.values())
- if n in nbrs:
- deg += sum(d.get(weight, 1) for d in nbrs[n].values())
- yield (n, deg)
- class DiMultiDegreeView(DiDegreeView):
- """A DegreeView class for MultiDiGraph; See DegreeView"""
- def __getitem__(self, n):
- weight = self._weight
- succs = self._succ[n]
- preds = self._pred[n]
- if weight is None:
- return sum(len(keys) for keys in succs.values()) + \
- sum(len(keys) for keys in preds.values())
- # edge weighted graph - degree is sum of nbr edge weights
- deg = sum(d.get(weight, 1) for key_dict in succs.values()
- for d in key_dict.values()) + \
- sum(d.get(weight, 1) for key_dict in preds.values()
- for d in key_dict.values())
- return deg
- def __iter__(self):
- weight = self._weight
- if weight is None:
- for n in self._nodes:
- succs = self._succ[n]
- preds = self._pred[n]
- deg = sum(len(keys) for keys in succs.values()) + \
- sum(len(keys) for keys in preds.values())
- yield (n, deg)
- else:
- for n in self._nodes:
- succs = self._succ[n]
- preds = self._pred[n]
- deg = sum(d.get(weight, 1) for key_dict in succs.values()
- for d in key_dict.values()) + \
- sum(d.get(weight, 1) for key_dict in preds.values()
- for d in key_dict.values())
- yield (n, deg)
- class InMultiDegreeView(DiDegreeView):
- """A DegreeView class for inward degree of MultiDiGraph; See DegreeView"""
- def __getitem__(self, n):
- weight = self._weight
- nbrs = self._pred[n]
- if weight is None:
- return sum(len(data) for data in nbrs.values())
- # edge weighted graph - degree is sum of nbr edge weights
- return sum(d.get(weight, 1) for key_dict in nbrs.values()
- for d in key_dict.values())
- def __iter__(self):
- weight = self._weight
- if weight is None:
- for n in self._nodes:
- nbrs = self._pred[n]
- deg = sum(len(data) for data in nbrs.values())
- yield (n, deg)
- else:
- for n in self._nodes:
- nbrs = self._pred[n]
- deg = sum(d.get(weight, 1) for key_dict in nbrs.values()
- for d in key_dict.values())
- yield (n, deg)
- class OutMultiDegreeView(DiDegreeView):
- """A DegreeView class for outward degree of MultiDiGraph; See DegreeView"""
- def __getitem__(self, n):
- weight = self._weight
- nbrs = self._succ[n]
- if weight is None:
- return sum(len(data) for data in nbrs.values())
- # edge weighted graph - degree is sum of nbr edge weights
- return sum(d.get(weight, 1) for key_dict in nbrs.values()
- for d in key_dict.values())
- def __iter__(self):
- weight = self._weight
- if weight is None:
- for n in self._nodes:
- nbrs = self._succ[n]
- deg = sum(len(data) for data in nbrs.values())
- yield (n, deg)
- else:
- for n in self._nodes:
- nbrs = self._succ[n]
- deg = sum(d.get(weight, 1) for key_dict in nbrs.values()
- for d in key_dict.values())
- yield (n, deg)
- # EdgeDataViews
- class OutEdgeDataView(object):
- """EdgeDataView for outward edges of DiGraph; See EdgeDataView"""
- __slots__ = ('_viewer', '_nbunch', '_data', '_default',
- '_adjdict', '_nodes_nbrs', '_report')
- def __getstate__(self):
- return {'viewer': self._viewer,
- 'nbunch': self._nbunch,
- 'data': self._data,
- 'default': self._default}
- def __setstate__(self, state):
- self.__init__(**state)
- def __init__(self, viewer, nbunch=None, data=False, default=None):
- self._viewer = viewer
- adjdict = self._adjdict = viewer._adjdict
- if nbunch is None:
- self._nodes_nbrs = adjdict.items
- else:
- nbunch = list(viewer._graph.nbunch_iter(nbunch))
- self._nodes_nbrs = lambda: [(n, adjdict[n]) for n in nbunch]
- self._nbunch = nbunch
- self._data = data
- self._default = default
- # Set _report based on data and default
- if data is True:
- self._report = lambda n, nbr, dd: (n, nbr, dd)
- elif data is False:
- self._report = lambda n, nbr, dd: (n, nbr)
- else: # data is attribute name
- self._report = lambda n, nbr, dd: \
- (n, nbr, dd[data]) if data in dd else (n, nbr, default)
- def __len__(self):
- return sum(len(nbrs) for n, nbrs in self._nodes_nbrs())
- def __iter__(self):
- return (self._report(n, nbr, dd) for n, nbrs in self._nodes_nbrs()
- for nbr, dd in nbrs.items())
- def __contains__(self, e):
- try:
- u, v = e[:2]
- ddict = self._adjdict[u][v]
- except KeyError:
- return False
- return e == self._report(u, v, ddict)
- def __str__(self):
- return str(list(self))
- def __repr__(self):
- return '%s(%r)' % (self.__class__.__name__, list(self))
- class EdgeDataView(OutEdgeDataView):
- __slots__ = ()
- def __len__(self):
- return sum(1 for e in self)
- def __iter__(self):
- seen = {}
- for n, nbrs in self._nodes_nbrs():
- for nbr, dd in nbrs.items():
- if nbr not in seen:
- yield self._report(n, nbr, dd)
- seen[n] = 1
- del seen
- def __contains__(self, e):
- try:
- u, v = e[:2]
- ddict = self._adjdict[u][v]
- except KeyError:
- try:
- ddict = self._adjdict[v][u]
- except KeyError:
- return False
- return e == self._report(u, v, ddict)
- class InEdgeDataView(OutEdgeDataView):
- """An EdgeDataView class for outward edges of DiGraph; See EdgeDataView"""
- __slots__ = ()
- def __iter__(self):
- return (self._report(nbr, n, dd) for n, nbrs in self._nodes_nbrs()
- for nbr, dd in nbrs.items())
- def __contains__(self, e):
- try:
- u, v = e[:2]
- ddict = self._adjdict[v][u]
- except KeyError:
- return False
- return e == self._report(u, v, ddict)
- class OutMultiEdgeDataView(OutEdgeDataView):
- """An EdgeDataView for outward edges of MultiDiGraph; See EdgeDataView"""
- __slots__ = ('keys',)
- def __getstate__(self):
- return {'viewer': self._viewer,
- 'nbunch': self._nbunch,
- 'keys': self.keys,
- 'data': self._data,
- 'default': self._default}
- def __setstate__(self, state):
- self.__init__(**state)
- def __init__(self, viewer, nbunch=None,
- data=False, keys=False, default=None):
- self._viewer = viewer
- adjdict = self._adjdict = viewer._adjdict
- self.keys = keys
- if nbunch is None:
- self._nodes_nbrs = adjdict.items
- else:
- nbunch = list(viewer._graph.nbunch_iter(nbunch))
- self._nodes_nbrs = lambda: [(n, adjdict[n]) for n in nbunch]
- self._nbunch = nbunch
- self._data = data
- self._default = default
- # Set _report based on data and default
- if data is True:
- if keys is True:
- self._report = lambda n, nbr, k, dd: (n, nbr, k, dd)
- else:
- self._report = lambda n, nbr, k, dd: (n, nbr, dd)
- elif data is False:
- if keys is True:
- self._report = lambda n, nbr, k, dd: (n, nbr, k)
- else:
- self._report = lambda n, nbr, k, dd: (n, nbr)
- else: # data is attribute name
- if keys is True:
- self._report = lambda n, nbr, k, dd: (n, nbr, k, dd[data]) \
- if data in dd else (n, nbr, k, default)
- else:
- self._report = lambda n, nbr, k, dd: (n, nbr, dd[data]) \
- if data in dd else (n, nbr, default)
- def __len__(self):
- return sum(1 for e in self)
- def __iter__(self):
- return (self._report(n, nbr, k, dd) for n, nbrs in self._nodes_nbrs()
- for nbr, kd in nbrs.items() for k, dd in kd.items())
- def __contains__(self, e):
- u, v = e[:2]
- try:
- kdict = self._adjdict[u][v]
- except KeyError:
- return False
- if self.keys is True:
- k = e[2]
- try:
- dd = kdict[k]
- except KeyError:
- return False
- return e == self._report(u, v, k, dd)
- for k, dd in kdict.items():
- if e == self._report(u, v, k, dd):
- return True
- return False
- class MultiEdgeDataView(OutMultiEdgeDataView):
- """An EdgeDataView class for edges of MultiGraph; See EdgeDataView"""
- __slots__ = ()
- def __iter__(self):
- seen = {}
- for n, nbrs in self._nodes_nbrs():
- for nbr, kd in nbrs.items():
- if nbr not in seen:
- for k, dd in kd.items():
- yield self._report(n, nbr, k, dd)
- seen[n] = 1
- del seen
- def __contains__(self, e):
- u, v = e[:2]
- try:
- kdict = self._adjdict[u][v]
- except KeyError:
- try:
- kdict = self._adjdict[v][u]
- except KeyError:
- return False
- if self.keys is True:
- k = e[2]
- try:
- dd = kdict[k]
- except KeyError:
- return False
- return e == self._report(u, v, k, dd)
- for k, dd in kdict.items():
- if e == self._report(u, v, k, dd):
- return True
- return False
- class InMultiEdgeDataView(OutMultiEdgeDataView):
- """An EdgeDataView for inward edges of MultiDiGraph; See EdgeDataView"""
- __slots__ = ()
- def __iter__(self):
- return (self._report(nbr, n, k, dd) for n, nbrs in self._nodes_nbrs()
- for nbr, kd in nbrs.items() for k, dd in kd.items())
- def __contains__(self, e):
- u, v = e[:2]
- try:
- kdict = self._adjdict[v][u]
- except KeyError:
- return False
- if self.keys is True:
- k = e[2]
- dd = kdict[k]
- return e == self._report(u, v, k, dd)
- for k, dd in kdict.items():
- if e == self._report(u, v, k, dd):
- return True
- return False
- # EdgeViews have set operations and no data reported
- class OutEdgeView(Set, Mapping):
- """A EdgeView class for outward edges of a DiGraph"""
- __slots__ = ('_adjdict', '_graph', '_nodes_nbrs')
- def __getstate__(self):
- return {'_graph': self._graph}
- def __setstate__(self, state):
- self._graph = G = state['_graph']
- self._adjdict = G._succ if hasattr(G, "succ") else G._adj
- self._nodes_nbrs = self._adjdict.items
- @classmethod
- def _from_iterable(cls, it):
- return set(it)
- dataview = OutEdgeDataView
- def __init__(self, G):
- self._graph = G
- self._adjdict = G._succ if hasattr(G, "succ") else G._adj
- self._nodes_nbrs = self._adjdict.items
- # Set methods
- def __len__(self):
- return sum(len(nbrs) for n, nbrs in self._nodes_nbrs())
- def __iter__(self):
- for n, nbrs in self._nodes_nbrs():
- for nbr in nbrs:
- yield (n, nbr)
- def __contains__(self, e):
- try:
- u, v = e
- return v in self._adjdict[u]
- except KeyError:
- return False
- # Mapping Methods
- def __getitem__(self, e):
- u, v = e
- return self._adjdict[u][v]
- # EdgeDataView methods
- def __call__(self, nbunch=None, data=False, default=None):
- if nbunch is None and data is False:
- return self
- return self.dataview(self, nbunch, data, default)
- def data(self, data=True, default=None, nbunch=None):
- if nbunch is None and data is False:
- return self
- return self.dataview(self, nbunch, data, default)
- # String Methods
- def __str__(self):
- return str(list(self))
- def __repr__(self):
- return "{0.__class__.__name__}({1!r})".format(self, list(self))
- class EdgeView(OutEdgeView):
- __slots__ = ()
- dataview = EdgeDataView
- def __len__(self):
- num_nbrs = (len(nbrs) + (n in nbrs) for n, nbrs in self._nodes_nbrs())
- return sum(num_nbrs) // 2
- def __iter__(self):
- seen = {}
- for n, nbrs in self._nodes_nbrs():
- for nbr in list(nbrs):
- if nbr not in seen:
- yield (n, nbr)
- seen[n] = 1
- del seen
- def __contains__(self, e):
- try:
- u, v = e[:2]
- return v in self._adjdict[u] or u in self._adjdict[v]
- except (KeyError, ValueError):
- return False
- class InEdgeView(OutEdgeView):
- """A EdgeView class for inward edges of a DiGraph"""
- __slots__ = ()
- def __setstate__(self, state):
- self._graph = G = state['_graph']
- self._adjdict = G._pred if hasattr(G, "pred") else G._adj
- self._nodes_nbrs = self._adjdict.items
- dataview = InEdgeDataView
- def __init__(self, G):
- self._graph = G
- self._adjdict = G._pred if hasattr(G, "pred") else G._adj
- self._nodes_nbrs = self._adjdict.items
- def __iter__(self):
- for n, nbrs in self._nodes_nbrs():
- for nbr in nbrs:
- yield (nbr, n)
- def __contains__(self, e):
- try:
- u, v = e
- return u in self._adjdict[v]
- except KeyError:
- return False
- def __getitem__(self, e):
- u, v = e
- return self._adjdict[v][u]
- class OutMultiEdgeView(OutEdgeView):
- """A EdgeView class for outward edges of a MultiDiGraph"""
- __slots__ = ()
- dataview = OutMultiEdgeDataView
- def __len__(self):
- return sum(len(kdict) for n, nbrs in self._nodes_nbrs()
- for nbr, kdict in nbrs.items())
- def __iter__(self):
- for n, nbrs in self._nodes_nbrs():
- for nbr, kdict in nbrs.items():
- for key in kdict:
- yield (n, nbr, key)
- def __contains__(self, e):
- N = len(e)
- if N == 3:
- u, v, k = e
- elif N == 2:
- u, v = e
- k = 0
- else:
- raise ValueError("MultiEdge must have length 2 or 3")
- try:
- return k in self._adjdict[u][v]
- except KeyError:
- return False
- def __getitem__(self, e):
- u, v, k = e
- return self._adjdict[u][v][k]
- def __call__(self, nbunch=None, data=False, keys=False, default=None):
- if nbunch is None and data is False and keys is True:
- return self
- return self.dataview(self, nbunch, data, keys, default)
- def data(self, data=True, keys=False, default=None, nbunch=None):
- if nbunch is None and data is False and keys is True:
- return self
- return self.dataview(self, nbunch, data, keys, default)
- class MultiEdgeView(OutMultiEdgeView):
- """A EdgeView class for edges of a MultiGraph"""
- __slots__ = ()
- dataview = MultiEdgeDataView
- def __len__(self):
- return sum(1 for e in self)
- def __iter__(self):
- seen = {}
- for n, nbrs in self._nodes_nbrs():
- for nbr, kd in nbrs.items():
- if nbr not in seen:
- for k, dd in kd.items():
- yield (n, nbr, k)
- seen[n] = 1
- del seen
- class InMultiEdgeView(OutMultiEdgeView):
- """A EdgeView class for inward edges of a MultiDiGraph"""
- __slots__ = ()
- def __setstate__(self, state):
- self._graph = G = state['_graph']
- self._adjdict = G._pred if hasattr(G, "pred") else G._adj
- self._nodes_nbrs = self._adjdict.items
- dataview = InMultiEdgeDataView
- def __init__(self, G):
- self._graph = G
- self._adjdict = G._pred if hasattr(G, "pred") else G._adj
- self._nodes_nbrs = self._adjdict.items
- def __iter__(self):
- for n, nbrs in self._nodes_nbrs():
- for nbr, kdict in nbrs.items():
- for key in kdict:
- yield (nbr, n, key)
- def __contains__(self, e):
- N = len(e)
- if N == 3:
- u, v, k = e
- elif N == 2:
- u, v = e
- k = 0
- else:
- raise ValueError("MultiEdge must have length 2 or 3")
- try:
- return k in self._adjdict[v][u]
- except KeyError:
- return False
- def __getitem__(self, e):
- u, v, k = e
- return self._adjdict[v][u][k]
- # __all__ = ['AtlasView', 'AdjacencyView', 'MultiAdjacencyView',
- # 'UnionAtlas', 'UnionAdjacency',
- # 'UnionMultiInner', 'UnionMultiAdjacency',
- # 'FilterAtlas', 'FilterAdjacency',
- # 'FilterMultiInner', 'FilterMultiAdjacency',
- # ]
- class AtlasView(Mapping):
- __slots__ = ('_atlas',)
- def __getstate__(self):
- return {'_atlas': self._atlas}
- def __setstate__(self, state):
- self._atlas = state['_atlas']
- def __init__(self, d):
- self._atlas = d
- def __len__(self):
- return len(self._atlas)
- def __iter__(self):
- return iter(self._atlas)
- def __getitem__(self, key):
- return self._atlas[key]
- def copy(self):
- return {n: self[n].copy() for n in self._atlas}
- def __str__(self):
- return str(self._atlas) # {nbr: self[nbr] for nbr in self})
- def __repr__(self):
- return '%s(%r)' % (self.__class__.__name__, self._atlas)
- class AdjacencyView(AtlasView):
- __slots__ = () # Still uses AtlasView slots names _atlas
- def __getitem__(self, name):
- return AtlasView(self._atlas[name])
- def copy(self):
- return {n: self[n].copy() for n in self._atlas}
- class MultiAdjacencyView(AdjacencyView):
- __slots__ = () # Still uses AtlasView slots names _atlas
- def __getitem__(self, name):
- return AdjacencyView(self._atlas[name])
- def copy(self):
- return {n: self[n].copy() for n in self._atlas}
- class UnionAtlas(Mapping):
- __slots__ = ('_succ', '_pred')
- def __getstate__(self):
- return {'_succ': self._succ, '_pred': self._pred}
- def __setstate__(self, state):
- self._succ = state['_succ']
- self._pred = state['_pred']
- def __init__(self, succ, pred):
- self._succ = succ
- self._pred = pred
- def __len__(self):
- return len(self._succ) + len(self._pred)
- def __iter__(self):
- return iter(set(self._succ.keys()) | set(self._pred.keys()))
- def __getitem__(self, key):
- try:
- return self._succ[key]
- except KeyError:
- return self._pred[key]
- def copy(self):
- result = {nbr: dd.copy() for nbr, dd in self._succ.items()}
- for nbr, dd in self._pred.items():
- if nbr in result:
- result[nbr].update(dd)
- else:
- result[nbr] = dd.copy()
- return result
- def __str__(self):
- return str({nbr: self[nbr] for nbr in self})
- def __repr__(self):
- return '%s(%r, %r)' % (self.__class__.__name__, self._succ, self._pred)
- class UnionAdjacency(Mapping):
- __slots__ = ('_succ', '_pred')
- def __getstate__(self):
- return {'_succ': self._succ, '_pred': self._pred}
- def __setstate__(self, state):
- self._succ = state['_succ']
- self._pred = state['_pred']
- def __init__(self, succ, pred):
- # keys must be the same for two input dicts
- assert (len(set(succ.keys()) ^ set(pred.keys())) == 0)
- self._succ = succ
- self._pred = pred
- def __len__(self):
- return len(self._succ) # length of each dict should be the same
- def __iter__(self):
- return iter(self._succ)
- def __getitem__(self, nbr):
- return UnionAtlas(self._succ[nbr], self._pred[nbr])
- def copy(self):
- return {n: self[n].copy() for n in self._succ}
- def __str__(self):
- return str({nbr: self[nbr] for nbr in self})
- def __repr__(self):
- return '%s(%r, %r)' % (self.__class__.__name__, self._succ, self._pred)
- class UnionMultiInner(UnionAtlas):
- __slots__ = () # Still uses UnionAtlas slots names _succ, _pred
- def __getitem__(self, node):
- in_succ = node in self._succ
- in_pred = node in self._pred
- if in_succ:
- if in_pred:
- return UnionAtlas(self._succ[node], self._pred[node])
- return UnionAtlas(self._succ[node], {})
- return UnionAtlas({}, self._pred[node])
- def copy(self):
- nodes = set(self._succ.keys()) | set(self._pred.keys())
- return {n: self[n].copy() for n in nodes}
- class UnionMultiAdjacency(UnionAdjacency):
- __slots__ = () # Still uses UnionAdjacency slots names _succ, _pred
- def __getitem__(self, node):
- return UnionMultiInner(self._succ[node], self._pred[node])
- class FilterAtlas(Mapping): # nodedict, nbrdict, keydict
- def __init__(self, d, NODE_OK):
- self._atlas = d
- self.NODE_OK = NODE_OK
- def __len__(self):
- return sum(1 for n in self)
- def __iter__(self):
- try: # check that NODE_OK has attr 'nodes'
- node_ok_shorter = 2 * len(self.NODE_OK.nodes) < len(self._atlas)
- except AttributeError:
- node_ok_shorter = False
- if node_ok_shorter:
- return (n for n in self.NODE_OK.nodes if n in self._atlas)
- return (n for n in self._atlas if self.NODE_OK(n))
- def __getitem__(self, key):
- if key in self._atlas and self.NODE_OK(key):
- return self._atlas[key]
- raise KeyError("Key {} not found".format(key))
- def copy(self):
- try: # check that NODE_OK has attr 'nodes'
- node_ok_shorter = 2 * len(self.NODE_OK.nodes) < len(self._atlas)
- except AttributeError:
- node_ok_shorter = False
- if node_ok_shorter:
- return {u: self._atlas[u] for u in self.NODE_OK.nodes
- if u in self._atlas}
- return {u: d for u, d in self._atlas.items()
- if self.NODE_OK(u)}
- def __str__(self):
- return str({nbr: self[nbr] for nbr in self})
- def __repr__(self):
- return '%s(%r, %r)' % (self.__class__.__name__, self._atlas,
- self.NODE_OK)
- class FilterAdjacency(Mapping): # edgedict
- def __init__(self, d, NODE_OK, EDGE_OK):
- self._atlas = d
- self.NODE_OK = NODE_OK
- self.EDGE_OK = EDGE_OK
- def __len__(self):
- return sum(1 for n in self)
- def __iter__(self):
- try: # check that NODE_OK has attr 'nodes'
- node_ok_shorter = 2 * len(self.NODE_OK.nodes) < len(self._atlas)
- except AttributeError:
- node_ok_shorter = False
- if node_ok_shorter:
- return (n for n in self.NODE_OK.nodes if n in self._atlas)
- return (n for n in self._atlas if self.NODE_OK(n))
- def __getitem__(self, node):
- if node in self._atlas and self.NODE_OK(node):
- def new_node_ok(nbr):
- return self.NODE_OK(nbr) and self.EDGE_OK(node, nbr)
- return FilterAtlas(self._atlas[node], new_node_ok)
- raise KeyError("Key {} not found".format(node))
- def copy(self):
- try: # check that NODE_OK has attr 'nodes'
- node_ok_shorter = 2 * len(self.NODE_OK.nodes) < len(self._atlas)
- except AttributeError:
- node_ok_shorter = False
- if node_ok_shorter:
- return {u: {v: d for v, d in self._atlas[u].items()
- if self.NODE_OK(v) if self.EDGE_OK(u, v)}
- for u in self.NODE_OK.nodes if u in self._atlas}
- return {u: {v: d for v, d in nbrs.items() if self.NODE_OK(v)
- if self.EDGE_OK(u, v)}
- for u, nbrs in self._atlas.items()
- if self.NODE_OK(u)}
- def __str__(self):
- return str({nbr: self[nbr] for nbr in self})
- def __repr__(self):
- return '%s(%r, %r, %r)' % (self.__class__.__name__, self._atlas,
- self.NODE_OK, self.EDGE_OK)
- class FilterMultiInner(FilterAdjacency): # muliedge_seconddict
- def __iter__(self):
- try: # check that NODE_OK has attr 'nodes'
- node_ok_shorter = 2 * len(self.NODE_OK.nodes) < len(self._atlas)
- except AttributeError:
- node_ok_shorter = False
- if node_ok_shorter:
- my_nodes = (n for n in self.NODE_OK.nodes if n in self._atlas)
- else:
- my_nodes = (n for n in self._atlas if self.NODE_OK(n))
- for n in my_nodes:
- some_keys_ok = False
- for key in self._atlas[n]:
- if self.EDGE_OK(n, key):
- some_keys_ok = True
- break
- if some_keys_ok is True:
- yield n
- def __getitem__(self, nbr):
- if nbr in self._atlas and self.NODE_OK(nbr):
- def new_node_ok(key):
- return self.EDGE_OK(nbr, key)
- return FilterAtlas(self._atlas[nbr], new_node_ok)
- raise KeyError("Key {} not found".format(nbr))
- def copy(self):
- try: # check that NODE_OK has attr 'nodes'
- node_ok_shorter = 2 * len(self.NODE_OK.nodes) < len(self._atlas)
- except AttributeError:
- node_ok_shorter = False
- if node_ok_shorter:
- return {v: {k: d for k, d in self._atlas[v].items()
- if self.EDGE_OK(v, k)}
- for v in self.NODE_OK.nodes if v in self._atlas}
- return {v: {k: d for k, d in nbrs.items() if self.EDGE_OK(v, k)}
- for v, nbrs in self._atlas.items() if self.NODE_OK(v)}
- class FilterMultiAdjacency(FilterAdjacency): # multiedgedict
- def __getitem__(self, node):
- if node in self._atlas and self.NODE_OK(node):
- def edge_ok(nbr, key):
- return self.NODE_OK(nbr) and self.EDGE_OK(node, nbr, key)
- return FilterMultiInner(self._atlas[node], self.NODE_OK, edge_ok)
- raise KeyError("Key {} not found".format(node))
- def copy(self):
- try: # check that NODE_OK has attr 'nodes'
- node_ok_shorter = 2 * len(self.NODE_OK.nodes) < len(self._atlas)
- except AttributeError:
- node_ok_shorter = False
- if node_ok_shorter:
- my_nodes = self.NODE_OK.nodes
- return {u: {v: {k: d for k, d in kd.items()
- if self.EDGE_OK(u, v, k)}
- for v, kd in self._atlas[u].items() if v in my_nodes}
- for u in my_nodes if u in self._atlas}
- return {u: {v: {k: d for k, d in kd.items()
- if self.EDGE_OK(u, v, k)}
- for v, kd in nbrs.items() if self.NODE_OK(v)}
- for u, nbrs in self._atlas.items() if self.NODE_OK(u)}
- class Graph(object):
- node_dict_factory = dict
- node_attr_dict_factory = dict
- adjlist_outer_dict_factory = dict
- adjlist_inner_dict_factory = dict
- edge_attr_dict_factory = dict
- graph_attr_dict_factory = dict
- def to_directed_class(self):
- return nx.DiGraph
- def to_undirected_class(self):
- return Graph
- def __init__(self, incoming_graph_data=None, **attr):
- self.graph_attr_dict_factory = self.graph_attr_dict_factory
- self.node_dict_factory = self.node_dict_factory
- self.node_attr_dict_factory = self.node_attr_dict_factory
- self.adjlist_outer_dict_factory = self.adjlist_outer_dict_factory
- self.adjlist_inner_dict_factory = self.adjlist_inner_dict_factory
- self.edge_attr_dict_factory = self.edge_attr_dict_factory
- self.graph = self.graph_attr_dict_factory() # dictionary for graph attributes
- self._node = self.node_dict_factory() # empty node attribute dict
- self._adj = self.adjlist_outer_dict_factory() # empty adjacency dict
- # attempt to load graph with data
- if incoming_graph_data is not None:
- convert.to_networkx_graph(incoming_graph_data, create_using=self)
- # load graph attributes (must be after convert)
- self.graph.update(attr)
- @property
- def adj(self):
- return AdjacencyView(self._adj)
- @property
- def name(self):
- return self.graph.get('name', '')
- @name.setter
- def name(self, s):
- self.graph['name'] = s
- def __str__(self):
- return self.name
- def __iter__(self):
- return iter(self._node)
- def __contains__(self, n):
- try:
- return n in self._node
- except TypeError:
- return False
- def __len__(self):
- return len(self._node)
- def __getitem__(self, n):
- return self.adj[n]
- def add_node(self, node_for_adding, **attr):
- if node_for_adding not in self._node:
- self._adj[node_for_adding] = self.adjlist_inner_dict_factory()
- attr_dict = self._node[node_for_adding] = self.node_attr_dict_factory()
- attr_dict.update(attr)
- else: # update attr even if node already exists
- self._node[node_for_adding].update(attr)
- def add_nodes_from(self, nodes_for_adding, **attr):
- for n in nodes_for_adding:
- # keep all this inside try/except because
- # CPython throws TypeError on n not in self._node,
- # while pre-2.7.5 ironpython throws on self._adj[n]
- try:
- if n not in self._node:
- self._adj[n] = self.adjlist_inner_dict_factory()
- attr_dict = self._node[n] = self.node_attr_dict_factory()
- attr_dict.update(attr)
- else:
- self._node[n].update(attr)
- except TypeError:
- nn, ndict = n
- if nn not in self._node:
- self._adj[nn] = self.adjlist_inner_dict_factory()
- newdict = attr.copy()
- newdict.update(ndict)
- attr_dict = self._node[nn] = self.node_attr_dict_factory()
- attr_dict.update(newdict)
- else:
- olddict = self._node[nn]
- olddict.update(attr)
- olddict.update(ndict)
- def remove_node(self, n):
- adj = self._adj
- try:
- nbrs = list(adj[n]) # list handles self-loops (allows mutation)
- del self._node[n]
- except KeyError: # NetworkXError if n not in self
- raise NetworkXError("The node %s is not in the graph." % (n,))
- for u in nbrs:
- del adj[u][n] # remove all edges n-u in graph
- del adj[n] # now remove node
- def remove_nodes_from(self, nodes):
- adj = self._adj
- for n in nodes:
- try:
- del self._node[n]
- for u in list(adj[n]): # list handles self-loops
- del adj[u][n] # (allows mutation of dict in loop)
- del adj[n]
- except KeyError:
- pass
- @property
- def nodes(self):
- nodes = NodeView(self)
- # Lazy View creation: overload the (class) property on the instance
- # Then future G.nodes use the existing View
- # setattr doesn't work because attribute already exists
- self.__dict__['nodes'] = nodes
- return nodes
- def number_of_nodes(self):
- return len(self._node)
- def order(self):
- return len(self._node)
- def has_node(self, n):
- try:
- return n in self._node
- except TypeError:
- return False
- def add_edge(self, u_of_edge, v_of_edge, **attr):
- u, v = u_of_edge, v_of_edge
- # add nodes
- if u not in self._node:
- self._adj[u] = self.adjlist_inner_dict_factory()
- self._node[u] = self.node_attr_dict_factory()
- if v not in self._node:
- self._adj[v] = self.adjlist_inner_dict_factory()
- self._node[v] = self.node_attr_dict_factory()
- # add the edge
- datadict = self._adj[u].get(v, self.edge_attr_dict_factory())
- datadict.update(attr)
- self._adj[u][v] = datadict
- self._adj[v][u] = datadict
- def add_edges_from(self, ebunch_to_add, **attr):
- for e in ebunch_to_add:
- ne = len(e)
- if ne == 3:
- u, v, dd = e
- elif ne == 2:
- u, v = e
- dd = {} # doesn't need edge_attr_dict_factory
- else:
- raise NetworkXError(
- "Edge tuple %s must be a 2-tuple or 3-tuple." % (e,))
- if u not in self._node:
- self._adj[u] = self.adjlist_inner_dict_factory()
- self._node[u] = self.node_attr_dict_factory()
- if v not in self._node:
- self._adj[v] = self.adjlist_inner_dict_factory()
- self._node[v] = self.node_attr_dict_factory()
- datadict = self._adj[u].get(v, self.edge_attr_dict_factory())
- datadict.update(attr)
- datadict.update(dd)
- self._adj[u][v] = datadict
- self._adj[v][u] = datadict
- def add_weighted_edges_from(self, ebunch_to_add, weight='weight', **attr):
- self.add_edges_from(((u, v, {weight: d}) for u, v, d in ebunch_to_add),
- **attr)
- def remove_edge(self, u, v):
- try:
- del self._adj[u][v]
- if u != v: # self-loop needs only one entry removed
- del self._adj[v][u]
- except KeyError:
- raise NetworkXError("The edge %s-%s is not in the graph" % (u, v))
- def remove_edges_from(self, ebunch):
- adj = self._adj
- for e in ebunch:
- u, v = e[:2] # ignore edge data if present
- if u in adj and v in adj[u]:
- del adj[u][v]
- if u != v: # self loop needs only one entry removed
- del adj[v][u]
- def update(self, edges=None, nodes=None):
- if edges is not None:
- if nodes is not None:
- self.add_nodes_from(nodes)
- self.add_edges_from(edges)
- else:
- # check if edges is a Graph object
- try:
- graph_nodes = edges.nodes
- graph_edges = edges.edges
- except AttributeError:
- # edge not Graph-like
- self.add_edges_from(edges)
- else: # edges is Graph-like
- self.add_nodes_from(graph_nodes.data())
- self.add_edges_from(graph_edges.data())
- self.graph.update(edges.graph)
- elif nodes is not None:
- self.add_nodes_from(nodes)
- else:
- raise NetworkXError("update needs nodes or edges input")
- def has_edge(self, u, v):
- try:
- return v in self._adj[u]
- except KeyError:
- return False
- def neighbors(self, n):
- try:
- return iter(self._adj[n])
- except KeyError:
- raise NetworkXError("The node %s is not in the graph." % (n,))
- @property
- def edges(self):
- return EdgeView(self)
- def get_edge_data(self, u, v, default=None):
- try:
- return self._adj[u][v]
- except KeyError:
- return default
- def adjacency(self):
- return iter(self._adj.items())
- @property
- def degree(self):
- return DegreeView(self)
- def clear(self):
- self._adj.clear()
- self._node.clear()
- self.graph.clear()
- def is_multigraph(self):
- """Returns True if graph is a multigraph, False otherwise."""
- return False
- def is_directed(self):
- """Returns True if graph is directed, False otherwise."""
- return False
- def copy(self, as_view=False):
- if as_view is True:
- return nx.graphviews.generic_graph_view(self)
- G = self.__class__()
- G.graph.update(self.graph)
- G.add_nodes_from((n, d.copy()) for n, d in self._node.items())
- G.add_edges_from((u, v, datadict.copy())
- for u, nbrs in self._adj.items()
- for v, datadict in nbrs.items())
- return G
- def to_directed(self, as_view=False):
- graph_class = self.to_directed_class()
- if as_view is True:
- return nx.graphviews.generic_graph_view(self, graph_class)
- # deepcopy when not a view
- G = graph_class()
- G.graph.update(deepcopy(self.graph))
- G.add_nodes_from((n, deepcopy(d)) for n, d in self._node.items())
- G.add_edges_from((u, v, deepcopy(data))
- for u, nbrs in self._adj.items()
- for v, data in nbrs.items())
- return G
- def to_undirected(self, as_view=False):
- graph_class = self.to_undirected_class()
- if as_view is True:
- return nx.graphviews.generic_graph_view(self, graph_class)
- # deepcopy when not a view
- G = graph_class()
- G.graph.update(deepcopy(self.graph))
- G.add_nodes_from((n, deepcopy(d)) for n, d in self._node.items())
- G.add_edges_from((u, v, deepcopy(d))
- for u, nbrs in self._adj.items()
- for v, d in nbrs.items())
- return G
- def subgraph(self, nodes):
- induced_nodes = nx.filters.show_nodes(self.nbunch_iter(nodes))
- # if already a subgraph, don't make a chain
- subgraph = nx.graphviews.subgraph_view
- if hasattr(self, '_NODE_OK'):
- return subgraph(self._graph, induced_nodes, self._EDGE_OK)
- return subgraph(self, induced_nodes)
- def edge_subgraph(self, edges):
- return nx.edge_subgraph(self, edges)
- def size(self, weight=None):
- s = sum(d for v, d in self.degree(weight=weight))
- # If `weight` is None, the sum of the degrees is guaranteed to be
- # even, so we can perform integer division and hence return an
- # integer. Otherwise, the sum of the weighted degrees is not
- # guaranteed to be an integer, so we perform "real" division.
- return s // 2 if weight is None else s / 2
- def number_of_edges(self, u=None, v=None):
- if u is None:
- return int(self.size())
- if v in self._adj[u]:
- return 1
- return 0
- def nbunch_iter(self, nbunch=None):
- if nbunch is None: # include all nodes via iterator
- bunch = iter(self._adj)
- elif nbunch in self: # if nbunch is a single node
- bunch = iter([nbunch])
- else: # if nbunch is a sequence of nodes
- def bunch_iter(nlist, adj):
- try:
- for n in nlist:
- if n in adj:
- yield n
- except TypeError as e:
- message = e.args[0]
- # capture error for non-sequence/iterator nbunch.
- if 'iter' in message:
- msg = "nbunch is not a node or a sequence of nodes."
- raise NetworkXError(msg)
- # capture error for unhashable node.
- elif 'hashable' in message:
- msg = "Node {} in sequence nbunch is not a valid node."
- raise NetworkXError(msg.format(n))
- else:
- raise
- bunch = bunch_iter(nbunch, self._adj)
- return bunch
- class DiGraph(Graph):
- def __init__(self, incoming_graph_data=None, **attr):
- self.graph_attr_dict_factory = self.graph_attr_dict_factory
- self.node_dict_factory = self.node_dict_factory
- self.node_attr_dict_factory = self.node_attr_dict_factory
- self.adjlist_outer_dict_factory = self.adjlist_outer_dict_factory
- self.adjlist_inner_dict_factory = self.adjlist_inner_dict_factory
- self.edge_attr_dict_factory = self.edge_attr_dict_factory
- self.graph = self.graph_attr_dict_factory() # dictionary for graph attributes
- self._node = self.node_dict_factory() # dictionary for node attr
- # We store two adjacency lists:
- # the predecessors of node n are stored in the dict self._pred
- # the successors of node n are stored in the dict self._succ=self._adj
- self._adj = self.adjlist_outer_dict_factory() # empty adjacency dict
- self._pred = self.adjlist_outer_dict_factory() # predecessor
- self._succ = self._adj # successor
- # attempt to load graph with data
- if incoming_graph_data is not None:
- convert.to_networkx_graph(incoming_graph_data, create_using=self)
- # load graph attributes (must be after convert)
- self.graph.update(attr)
- @property
- def adj(self):
- return AdjacencyView(self._succ)
- @property
- def succ(self):
- return AdjacencyView(self._succ)
- @property
- def pred(self):
- return AdjacencyView(self._pred)
- def add_node(self, node_for_adding, **attr):
- if node_for_adding not in self._succ:
- self._succ[node_for_adding] = self.adjlist_inner_dict_factory()
- self._pred[node_for_adding] = self.adjlist_inner_dict_factory()
- attr_dict = self._node[node_for_adding] = self.node_attr_dict_factory()
- attr_dict.update(attr)
- else: # update attr even if node already exists
- self._node[node_for_adding].update(attr)
- def add_nodes_from(self, nodes_for_adding, **attr):
- for n in nodes_for_adding:
- # keep all this inside try/except because
- # CPython throws TypeError on n not in self._succ,
- # while pre-2.7.5 ironpython throws on self._succ[n]
- try:
- if n not in self._succ:
- self._succ[n] = self.adjlist_inner_dict_factory()
- self._pred[n] = self.adjlist_inner_dict_factory()
- attr_dict = self._node[n] = self.node_attr_dict_factory()
- attr_dict.update(attr)
- else:
- self._node[n].update(attr)
- except TypeError:
- nn, ndict = n
- if nn not in self._succ:
- self._succ[nn] = self.adjlist_inner_dict_factory()
- self._pred[nn] = self.adjlist_inner_dict_factory()
- newdict = attr.copy()
- newdict.update(ndict)
- attr_dict = self._node[nn] = self.node_attr_dict_factory()
- attr_dict.update(newdict)
- else:
- olddict = self._node[nn]
- olddict.update(attr)
- olddict.update(ndict)
- def remove_node(self, n):
- try:
- nbrs = self._succ[n]
- del self._node[n]
- except KeyError: # NetworkXError if n not in self
- raise NetworkXError("The node %s is not in the digraph." % (n,))
- for u in nbrs:
- del self._pred[u][n] # remove all edges n-u in digraph
- del self._succ[n] # remove node from succ
- for u in self._pred[n]:
- del self._succ[u][n] # remove all edges n-u in digraph
- del self._pred[n] # remove node from pred
- def remove_nodes_from(self, nodes):
- for n in nodes:
- try:
- succs = self._succ[n]
- del self._node[n]
- for u in succs:
- del self._pred[u][n] # remove all edges n-u in digraph
- del self._succ[n] # now remove node
- for u in self._pred[n]:
- del self._succ[u][n] # remove all edges n-u in digraph
- del self._pred[n] # now remove node
- except KeyError:
- pass # silent failure on remove
- def add_edge(self, u_of_edge, v_of_edge, **attr):
- u, v = u_of_edge, v_of_edge
- # add nodes
- if u not in self._succ:
- self._succ[u] = self.adjlist_inner_dict_factory()
- self._pred[u] = self.adjlist_inner_dict_factory()
- self._node[u] = self.node_attr_dict_factory()
- if v not in self._succ:
- self._succ[v] = self.adjlist_inner_dict_factory()
- self._pred[v] = self.adjlist_inner_dict_factory()
- self._node[v] = self.node_attr_dict_factory()
- # add the edge
- datadict = self._adj[u].get(v, self.edge_attr_dict_factory())
- datadict.update(attr)
- self._succ[u][v] = datadict
- self._pred[v][u] = datadict
- def add_edges_from(self, ebunch_to_add, **attr):
- for e in ebunch_to_add:
- ne = len(e)
- if ne == 3:
- u, v, dd = e
- elif ne == 2:
- u, v = e
- dd = {}
- else:
- raise NetworkXError(
- "Edge tuple %s must be a 2-tuple or 3-tuple." % (e,))
- if u not in self._succ:
- self._succ[u] = self.adjlist_inner_dict_factory()
- self._pred[u] = self.adjlist_inner_dict_factory()
- self._node[u] = self.node_attr_dict_factory()
- if v not in self._succ:
- self._succ[v] = self.adjlist_inner_dict_factory()
- self._pred[v] = self.adjlist_inner_dict_factory()
- self._node[v] = self.node_attr_dict_factory()
- datadict = self._adj[u].get(v, self.edge_attr_dict_factory())
- datadict.update(attr)
- datadict.update(dd)
- self._succ[u][v] = datadict
- self._pred[v][u] = datadict
- def remove_edge(self, u, v):
- try:
- del self._succ[u][v]
- del self._pred[v][u]
- except KeyError:
- raise NetworkXError("The edge %s-%s not in graph." % (u, v))
- def remove_edges_from(self, ebunch):
- for e in ebunch:
- u, v = e[:2] # ignore edge data
- if u in self._succ and v in self._succ[u]:
- del self._succ[u][v]
- del self._pred[v][u]
- def has_successor(self, u, v):
- """Returns True if node u has successor v.
- This is true if graph has the edge u->v.
- """
- return (u in self._succ and v in self._succ[u])
- def has_predecessor(self, u, v):
- """Returns True if node u has predecessor v.
- This is true if graph has the edge u<-v.
- """
- return (u in self._pred and v in self._pred[u])
- def successors(self, n):
- try:
- return iter(self._succ[n])
- except KeyError:
- raise NetworkXError("The node %s is not in the digraph." % (n,))
- # digraph definitions
- neighbors = successors
- def predecessors(self, n):
- try:
- return iter(self._pred[n])
- except KeyError:
- raise NetworkXError("The node %s is not in the digraph." % (n,))
- @property
- def edges(self):
- return OutEdgeView(self)
- # alias out_edges to edges
- out_edges = edges
- @property
- def in_edges(self):
- return InEdgeView(self)
- @property
- def degree(self):
- return DiDegreeView(self)
- @property
- def in_degree(self):
- return InDegreeView(self)
- @property
- def out_degree(self):
- self._succ.clear()
- self._pred.clear()
- self._node.clear()
- self.graph.clear()
- def is_multigraph(self):
- """Returns True if graph is a multigraph, False otherwise."""
- return False
- def is_directed(self):
- """Returns True if graph is directed, False otherwise."""
- return True
- def to_undirected(self, reciprocal=False, as_view=False):
- graph_class = self.to_undirected_class()
- if as_view is True:
- return nx.graphviews.generic_graph_view(self, Graph)
- # deepcopy when not a view
- G = Graph()
- G.graph.update(deepcopy(self.graph))
- G.add_nodes_from((n, deepcopy(d)) for n, d in self._node.items())
- if reciprocal is True:
- G.add_edges_from((u, v, deepcopy(d))
- for u, nbrs in self._adj.items()
- for v, d in nbrs.items()
- if v in self._pred[u])
- else:
- G.add_edges_from((u, v, deepcopy(d))
- for u, nbrs in self._adj.items()
- for v, d in nbrs.items())
- return G
- def reverse(self, copy=True):
- """Returns the reverse of the graph.
- The reverse is a graph with the same nodes and edges
- but with the directions of the edges reversed.
- Parameters
- ----------
- copy : bool optional (default=True)
- If True, return a new DiGraph holding the reversed edges.
- If False, the reverse graph is created using a view of
- the original graph.
- """
- if copy:
- H = self.__class__()
- H.graph.update(deepcopy(self.graph))
- H.add_nodes_from((n, deepcopy(d)) for n, d in self.nodes.items())
- H.add_edges_from((v, u, deepcopy(d)) for u, v, d
- in self.edges(data=True))
- return H
- return nx.graphviews.reverse_view(self)
- def check_planarity(G, counterexample=False):
- planarity_state = LRPlanarity(G)
- embedding = planarity_state.lr_planarity()
- if embedding is None:
- # graph is not planar
- if counterexample:
- return False, get_counterexample(G)
- else:
- return False, None
- else:
- # graph is planar
- return True, embedding
- def get_counterexample(G):
- # copy graph
- G = nx.Graph(G)
- if check_planarity(G)[0]:
- raise nx.NetworkXException("G is planar - no counter example.")
- # find Kuratowski subgraph
- subgraph = nx.Graph()
- for u in G:
- nbrs = list(G[u])
- for v in nbrs:
- G.remove_edge(u, v)
- if check_planarity(G)[0]:
- G.add_edge(u, v)
- subgraph.add_edge(u, v)
- return subgraph
- class Interval(object):
- def __init__(self, low=None, high=None):
- self.low = low
- self.high = high
- def empty(self):
- """Check if the interval is empty"""
- return self.low is None and self.high is None
- def copy(self):
- """Returns a copy of this interval"""
- return Interval(self.low, self.high)
- def conflicting(self, b, planarity_state):
- """Returns True if interval I conflicts with edge b"""
- return (not self.empty() and
- planarity_state.lowpt[self.high] > planarity_state.lowpt[b])
- class ConflictPair(object):
- """Represents a different constraint between two intervals.
- The edges in the left interval must have a different orientation than
- the one in the right interval.
- """
- def __init__(self, left=Interval(), right=Interval()):
- self.left = left
- self.right = right
- def swap(self):
- """Swap left and right intervals"""
- temp = self.left
- self.left = self.right
- self.right = temp
- def lowest(self, planarity_state):
- """Returns the lowest lowpoint of a conflict pair"""
- if self.left.empty():
- return planarity_state.lowpt[self.right.low]
- if self.right.empty():
- return planarity_state.lowpt[self.left.low]
- return min(planarity_state.lowpt[self.left.low],
- planarity_state.lowpt[self.right.low])
- def top_of_stack(l):
- """Returns the element on top of the stack."""
- if not l:
- return None
- return l[-1]
- class LRPlanarity(object):
- """A class to maintain the state during planarity check."""
- __slots__ = [
- 'G', 'roots', 'height', 'lowpt', 'lowpt2', 'nesting_depth',
- 'parent_edge', 'DG', 'adjs', 'ordered_adjs', 'ref', 'side', 'S',
- 'stack_bottom', 'lowpt_edge', 'left_ref', 'right_ref', 'embedding'
- ]
- def __init__(self, G):
- # copy G without adding self-loops
- self.G = Graph()
- self.G.add_nodes_from(G.nodes)
- for e in G.edges:
- if e[0] != e[1]:
- self.G.add_edge(e[0], e[1])
- self.roots = []
- # distance from tree root
- self.height = defaultdict(lambda: None)
- self.lowpt = {} # height of lowest return point of an edge
- self.lowpt2 = {} # height of second lowest return point
- self.nesting_depth = {} # for nesting order
- # None -> missing edge
- self.parent_edge = defaultdict(lambda: None)
- # oriented DFS graph
- self.DG = DiGraph()
- self.DG.add_nodes_from(G.nodes)
- self.adjs = {}
- self.ordered_adjs = {}
- self.ref = defaultdict(lambda: None)
- self.side = defaultdict(lambda: 1)
- # stack of conflict pairs
- self.S = []
- self.stack_bottom = {}
- self.lowpt_edge = {}
- self.left_ref = {}
- self.right_ref = {}
- self.embedding = PlanarEmbedding()
- def lr_planarity(self):
- """Execute the LR planarity test.
- Returns
- -------
- embedding : dict
- If the graph is planar an embedding is returned. Otherwise None.
- """
- if self.G.order() > 2 and self.G.size() > 3 * self.G.order() - 6:
- # graph is not planar
- return None
- # make adjacency lists for dfs
- for v in self.G:
- self.adjs[v] = list(self.G[v])
- # orientation of the graph by depth first search traversal
- for v in self.G:
- if self.height[v] is None:
- self.height[v] = 0
- self.roots.append(v)
- self.dfs_orientation(v)
- # Free no longer used variables
- self.G = None
- self.lowpt2 = None
- self.adjs = None
- # testing
- for v in self.DG: # sort the adjacency lists by nesting depth
- # note: this sorting leads to non linear time
- self.ordered_adjs[v] = sorted(
- self.DG[v], key=lambda x: self.nesting_depth[(v, x)])
- for v in self.roots:
- if not self.dfs_testing(v):
- return None
- # Free no longer used variables
- self.height = None
- self.lowpt = None
- self.S = None
- self.stack_bottom = None
- self.lowpt_edge = None
- for e in self.DG.edges:
- self.nesting_depth[e] = self.sign(e) * self.nesting_depth[e]
- self.embedding.add_nodes_from(self.DG.nodes)
- for v in self.DG:
- # sort the adjacency lists again
- self.ordered_adjs[v] = sorted(
- self.DG[v], key=lambda x: self.nesting_depth[(v, x)])
- # initialize the embedding
- previous_node = None
- for w in self.ordered_adjs[v]:
- self.embedding.add_half_edge_cw(v, w, previous_node)
- previous_node = w
- # Free no longer used variables
- self.DG = None
- self.nesting_depth = None
- self.ref = None
- # compute the complete embedding
- for v in self.roots:
- self.dfs_embedding(v)
- # Free no longer used variables
- self.roots = None
- self.parent_edge = None
- self.ordered_adjs = None
- self.left_ref = None
- self.right_ref = None
- self.side = None
- return self.embedding
- def dfs_orientation(self, v):
- """Orient the graph by DFS, compute lowpoints and nesting order.
- """
- # the recursion stack
- dfs_stack = [v]
- # index of next edge to handle in adjacency list of each node
- ind = defaultdict(lambda: 0)
- # boolean to indicate whether to skip the initial work for an edge
- skip_init = defaultdict(lambda: False)
- while dfs_stack:
- v = dfs_stack.pop()
- e = self.parent_edge[v]
- for w in self.adjs[v][ind[v]:]:
- vw = (v, w)
- if not skip_init[vw]:
- if (v, w) in self.DG.edges or (w, v) in self.DG.edges:
- ind[v] += 1
- continue # the edge was already oriented
- self.DG.add_edge(v, w) # orient the edge
- self.lowpt[vw] = self.height[v]
- self.lowpt2[vw] = self.height[v]
- if self.height[w] is None: # (v, w) is a tree edge
- self.parent_edge[w] = vw
- self.height[w] = self.height[v] + 1
- dfs_stack.append(v) # revisit v after finishing w
- dfs_stack.append(w) # visit w next
- skip_init[vw] = True # don't redo this block
- break # handle next node in dfs_stack (i.e. w)
- else: # (v, w) is a back edge
- self.lowpt[vw] = self.height[w]
- # determine nesting graph
- self.nesting_depth[vw] = 2 * self.lowpt[vw]
- if self.lowpt2[vw] < self.height[v]: # chordal
- self.nesting_depth[vw] += 1
- # update lowpoints of parent edge e
- if e is not None:
- if self.lowpt[vw] < self.lowpt[e]:
- self.lowpt2[e] = min(self.lowpt[e], self.lowpt2[vw])
- self.lowpt[e] = self.lowpt[vw]
- elif self.lowpt[vw] > self.lowpt[e]:
- self.lowpt2[e] = min(self.lowpt2[e], self.lowpt[vw])
- else:
- self.lowpt2[e] = min(self.lowpt2[e], self.lowpt2[vw])
- ind[v] += 1
- def dfs_testing(self, v):
- """Test for LR partition."""
- # the recursion stack
- dfs_stack = [v]
- # index of next edge to handle in adjacency list of each node
- ind = defaultdict(lambda: 0)
- # boolean to indicate whether to skip the initial work for an edge
- skip_init = defaultdict(lambda: False)
- while dfs_stack:
- v = dfs_stack.pop()
- e = self.parent_edge[v]
- # to indicate whether to skip the final block after the for loop
- skip_final = False
- for w in self.ordered_adjs[v][ind[v]:]:
- ei = (v, w)
- if not skip_init[ei]:
- self.stack_bottom[ei] = top_of_stack(self.S)
- if ei == self.parent_edge[w]: # tree edge
- dfs_stack.append(v) # revisit v after finishing w
- dfs_stack.append(w) # visit w next
- skip_init[ei] = True # don't redo this block
- skip_final = True # skip final work after breaking
- break # handle next node in dfs_stack (i.e. w)
- else: # back edge
- self.lowpt_edge[ei] = ei
- self.S.append(ConflictPair(right=Interval(ei, ei)))
- # integrate new return edges
- if self.lowpt[ei] < self.height[v]:
- if w == self.ordered_adjs[v][0]: # e_i has return edge
- self.lowpt_edge[e] = self.lowpt_edge[ei]
- else: # add constraints of e_i
- if not self.add_constraints(ei, e):
- # graph is not planar
- return False
- ind[v] += 1
- if not skip_final:
- # remove back edges returning to parent
- if e is not None: # v isn't root
- self.remove_back_edges(e)
- return True
- def add_constraints(self, ei, e):
- P = ConflictPair()
- # merge return edges of e_i into P.right
- while True:
- Q = self.S.pop()
- if not Q.left.empty():
- Q.swap()
- if not Q.left.empty(): # not planar
- return False
- if self.lowpt[Q.right.low] > self.lowpt[e]:
- # merge intervals
- if P.right.empty(): # topmost interval
- P.right = Q.right.copy()
- else:
- self.ref[P.right.low] = Q.right.high
- P.right.low = Q.right.low
- else: # align
- self.ref[Q.right.low] = self.lowpt_edge[e]
- if top_of_stack(self.S) == self.stack_bottom[ei]:
- break
- # merge conflicting return edges of e_1,...,e_i-1 into P.L
- while (top_of_stack(self.S).left.conflicting(ei, self) or
- top_of_stack(self.S).right.conflicting(ei, self)):
- Q = self.S.pop()
- if Q.right.conflicting(ei, self):
- Q.swap()
- if Q.right.conflicting(ei, self): # not planar
- return False
- # merge interval below lowpt(e_i) into P.R
- self.ref[P.right.low] = Q.right.high
- if Q.right.low is not None:
- P.right.low = Q.right.low
- if P.left.empty(): # topmost interval
- P.left = Q.left.copy()
- else:
- self.ref[P.left.low] = Q.left.high
- P.left.low = Q.left.low
- if not (P.left.empty() and P.right.empty()):
- self.S.append(P)
- return True
- def remove_back_edges(self, e):
- u = e[0]
- # trim back edges ending at parent u
- # drop entire conflict pairs
- while self.S and top_of_stack(self.S).lowest(self) == self.height[u]:
- P = self.S.pop()
- if P.left.low is not None:
- self.side[P.left.low] = -1
- if self.S: # one more conflict pair to consider
- P = self.S.pop()
- # trim left interval
- while P.left.high is not None and P.left.high[1] == u:
- P.left.high = self.ref[P.left.high]
- if P.left.high is None and P.left.low is not None:
- # just emptied
- self.ref[P.left.low] = P.right.low
- self.side[P.left.low] = -1
- P.left.low = None
- # trim right interval
- while P.right.high is not None and P.right.high[1] == u:
- P.right.high = self.ref[P.right.high]
- if P.right.high is None and P.right.low is not None:
- # just emptied
- self.ref[P.right.low] = P.left.low
- self.side[P.right.low] = -1
- P.right.low = None
- self.S.append(P)
- # side of e is side of a highest return edge
- if self.lowpt[e] < self.height[u]: # e has return edge
- hl = top_of_stack(self.S).left.high
- hr = top_of_stack(self.S).right.high
- if hl is not None and (
- hr is None or self.lowpt[hl] > self.lowpt[hr]):
- self.ref[e] = hl
- else:
- self.ref[e] = hr
- def dfs_embedding(self, v):
- """Completes the embedding."""
- # the recursion stack
- dfs_stack = [v]
- # index of next edge to handle in adjacency list of each node
- ind = defaultdict(lambda: 0)
- while dfs_stack:
- v = dfs_stack.pop()
- for w in self.ordered_adjs[v][ind[v]:]:
- ind[v] += 1
- ei = (v, w)
- if ei == self.parent_edge[w]: # tree edge
- self.embedding.add_half_edge_first(w, v)
- self.left_ref[v] = w
- self.right_ref[v] = w
- dfs_stack.append(v) # revisit v after finishing w
- dfs_stack.append(w) # visit w next
- break # handle next node in dfs_stack (i.e. w)
- else: # back edge
- if self.side[ei] == 1:
- self.embedding.add_half_edge_cw(w, v,
- self.right_ref[w])
- else:
- self.embedding.add_half_edge_ccw(w, v,
- self.left_ref[w])
- self.left_ref[w] = v
- def sign(self, e):
- """Resolve the relative side of an edge to the absolute side."""
- # the recursion stack
- dfs_stack = [e]
- # dict to remember reference edges
- old_ref = defaultdict(lambda: None)
- while dfs_stack:
- e = dfs_stack.pop()
- if self.ref[e] is not None:
- dfs_stack.append(e) # revisit e after finishing self.ref[e]
- dfs_stack.append(self.ref[e]) # visit self.ref[e] next
- old_ref[e] = self.ref[e] # remember value of self.ref[e]
- self.ref[e] = None
- else:
- self.side[e] *= self.side[old_ref[e]]
- return self.side[e]
- class PlanarEmbedding(DiGraph):
- def get_data(self):
- embedding = dict()
- for v in self:
- embedding[v] = list(self.neighbors_cw_order(v))
- return embedding
- def set_data(self, data):
- for v in data:
- for w in reversed(data[v]):
- self.add_half_edge_first(v, w)
- def neighbors_cw_order(self, v):
- if len(self[v]) == 0:
- # v has no neighbors
- return
- start_node = self.nodes[v]['first_nbr']
- yield start_node
- current_node = self[v][start_node]['cw']
- while start_node != current_node:
- yield current_node
- current_node = self[v][current_node]['cw']
- def check_structure(self):
- for v in self:
- try:
- sorted_nbrs = set(self.neighbors_cw_order(v))
- except KeyError:
- msg = "Bad embedding. " \
- "Missing orientation for a neighbor of {}".format(v)
- raise nx.NetworkXException(msg)
- unsorted_nbrs = set(self[v])
- if sorted_nbrs != unsorted_nbrs:
- msg = "Bad embedding. Edge orientations not set correctly."
- raise nx.NetworkXException(msg)
- for w in self[v]:
- # Check if opposite half-edge exists
- if not self.has_edge(w, v):
- msg = "Bad embedding. Opposite half-edge is missing."
- raise nx.NetworkXException(msg)
- # Check planarity
- counted_half_edges = set()
- for component in nx.connected_components(self):
- if len(component) == 1:
- # Don't need to check single node component
- continue
- num_nodes = len(component)
- num_half_edges = 0
- num_faces = 0
- for v in component:
- for w in self.neighbors_cw_order(v):
- num_half_edges += 1
- if (v, w) not in counted_half_edges:
- # We encountered a new face
- num_faces += 1
- # Mark all half-edges belonging to this face
- self.traverse_face(v, w, counted_half_edges)
- num_edges = num_half_edges // 2 # num_half_edges is even
- if num_nodes - num_edges + num_faces != 2:
- # The result does not match Euler's formula
- msg = "Bad embedding. The graph does not match Euler's formula"
- raise nx.NetworkXException(msg)
- def add_half_edge_ccw(self, start_node, end_node, reference_neighbor):
- if reference_neighbor is None:
- # The start node has no neighbors
- self.add_edge(start_node, end_node) # Add edge to graph
- self[start_node][end_node]['cw'] = end_node
- self[start_node][end_node]['ccw'] = end_node
- self.nodes[start_node]['first_nbr'] = end_node
- else:
- ccw_reference = self[start_node][reference_neighbor]['ccw']
- self.add_half_edge_cw(start_node, end_node, ccw_reference)
- if reference_neighbor == self.nodes[start_node].get('first_nbr',
- None):
- # Update first neighbor
- self.nodes[start_node]['first_nbr'] = end_node
- def add_half_edge_cw(self, start_node, end_node, reference_neighbor):
- self.add_edge(start_node, end_node) # Add edge to graph
- if reference_neighbor is None:
- # The start node has no neighbors
- self[start_node][end_node]['cw'] = end_node
- self[start_node][end_node]['ccw'] = end_node
- self.nodes[start_node]['first_nbr'] = end_node
- return
- if reference_neighbor not in self[start_node]:
- raise nx.NetworkXException(
- "Cannot add edge. Reference neighbor does not exist")
- # Get half-edge at the other side
- cw_reference = self[start_node][reference_neighbor]['cw']
- # Alter half-edge data structures
- self[start_node][reference_neighbor]['cw'] = end_node
- self[start_node][end_node]['cw'] = cw_reference
- self[start_node][cw_reference]['ccw'] = end_node
- self[start_node][end_node]['ccw'] = reference_neighbor
- def connect_components(self, v, w):
- self.add_half_edge_first(v, w)
- self.add_half_edge_first(w, v)
- def add_half_edge_first(self, start_node, end_node):
- if start_node in self and 'first_nbr' in self.nodes[start_node]:
- reference = self.nodes[start_node]['first_nbr']
- else:
- reference = None
- self.add_half_edge_ccw(start_node, end_node, reference)
- def next_face_half_edge(self, v, w):
- new_node = self[w][v]['ccw']
- return w, new_node
- def traverse_face(self, v, w, mark_half_edges=None):
- if mark_half_edges is None:
- mark_half_edges = set()
- face_nodes = [v]
- mark_half_edges.add((v, w))
- prev_node = v
- cur_node = w
- # Last half-edge is (incoming_node, v)
- incoming_node = self[v][w]['cw']
- while cur_node != v or prev_node != incoming_node:
- face_nodes.append(cur_node)
- prev_node, cur_node = self.next_face_half_edge(prev_node, cur_node)
- if (prev_node, cur_node) in mark_half_edges:
- raise nx.NetworkXException(
- "Bad planar embedding. Impossible face.")
- mark_half_edges.add((prev_node, cur_node))
- return face_nodes
- def is_directed(self):
- return False
- def solve(left_vertex, right_vertex, len):
- if len == 0:
- return True
- graph = Graph()
- for i in range(5 + len):
- graph.add_node(i)
- for i in range(4):
- graph.add_edge(0, i + 1)
- graph.add_edge(i + 1, (i + 1) % 4 + 1)
- for i in range(len):
- u = left_vertex[i]
- v = right_vertex[i]
- graph.add_edge(i + 5, u)
- graph.add_edge(i + 5, v)
- return check_planarity(graph)
- def main():
- input_file = open("input.txt", "r")
- test = int(input_file.readline().strip())
- while test:
- n = int(input_file.readline().strip())
- left_vertex = []
- right_vertex = []
- for i in range(n):
- tokens = input_file.readline().strip().split()
- left_vertex.append(int(tokens[0]))
- right_vertex.append(int(tokens[1]))
- left = 0
- right = n
- # result, emb = solve(left_vertex, right_vertex, n)
- # print (result)
- # exit(0)
- while right - left > 1:
- mid = (left + right) // 2
- if solve(left_vertex, right_vertex, mid)[0]:
- left = mid
- else:
- right = mid
- answer = left
- if solve(left_vertex, right_vertex, right)[0]:
- answer = right
- print(answer)
- test -= 1
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement