| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084 |
- """Read and write graphs in GEXF format.
- .. warning::
- This parser uses the standard xml library present in Python, which is
- insecure - see :external+python:mod:`xml` for additional information.
- Only parse GEFX files you trust.
- GEXF (Graph Exchange XML Format) is a language for describing complex
- network structures, their associated data and dynamics.
- This implementation does not support mixed graphs (directed and
- undirected edges together).
- Format
- ------
- GEXF is an XML format. See http://gexf.net/schema.html for the
- specification and http://gexf.net/basic.html for examples.
- """
- import itertools
- import time
- from xml.etree.ElementTree import (
- Element,
- ElementTree,
- SubElement,
- register_namespace,
- tostring,
- )
- import networkx as nx
- from networkx.utils import open_file
- __all__ = ["write_gexf", "read_gexf", "relabel_gexf_graph", "generate_gexf"]
- @open_file(1, mode="wb")
- def write_gexf(G, path, encoding="utf-8", prettyprint=True, version="1.2draft"):
- """Write G in GEXF format to path.
- "GEXF (Graph Exchange XML Format) is a language for describing
- complex networks structures, their associated data and dynamics" [1]_.
- Node attributes are checked according to the version of the GEXF
- schemas used for parameters which are not user defined,
- e.g. visualization 'viz' [2]_. See example for usage.
- .. warning::
- The `GEXF specification <https://gexf.net/schema.html>`_ reserves some
- keywords (e.g. ``id``, ``pid``, ``label``, etc.) for specifying node/edge
- metadata in the file format. Ensure NetworkX node/edge attribute names
- do not use these special keywords to guarantee all attributes are preserved
- as expected when roundtripping to/from GEXF format.
- Parameters
- ----------
- G : graph
- A NetworkX graph
- path : file or string
- File or file name to write.
- File names ending in .gz or .bz2 will be compressed.
- encoding : string (optional, default: 'utf-8')
- Encoding for text data.
- prettyprint : bool (optional, default: True)
- If True use line breaks and indenting in output XML.
- version: string (optional, default: '1.2draft')
- The version of GEXF to be used for nodes attributes checking
- Examples
- --------
- >>> G = nx.path_graph(4)
- >>> nx.write_gexf(G, "test.gexf")
- # visualization data
- >>> G.nodes[0]["viz"] = {"size": 54}
- >>> G.nodes[0]["viz"]["position"] = {"x": 0, "y": 1}
- >>> G.nodes[0]["viz"]["color"] = {"r": 0, "g": 0, "b": 256}
- Notes
- -----
- This implementation does not support mixed graphs (directed and undirected
- edges together).
- The node id attribute is set to be the string of the node label.
- If you want to specify an id use set it as node data, e.g.
- node['a']['id']=1 to set the id of node 'a' to 1.
- References
- ----------
- .. [1] GEXF File Format, http://gexf.net/
- .. [2] GEXF schema, http://gexf.net/schema.html
- """
- writer = GEXFWriter(encoding=encoding, prettyprint=prettyprint, version=version)
- writer.add_graph(G)
- writer.write(path)
- def generate_gexf(G, encoding="utf-8", prettyprint=True, version="1.2draft"):
- """Generate lines of GEXF format representation of G.
- "GEXF (Graph Exchange XML Format) is a language for describing
- complex networks structures, their associated data and dynamics" [1]_.
- Parameters
- ----------
- G : graph
- A NetworkX graph
- encoding : string (optional, default: 'utf-8')
- Encoding for text data.
- prettyprint : bool (optional, default: True)
- If True use line breaks and indenting in output XML.
- version : string (default: 1.2draft)
- Version of GEFX File Format (see http://gexf.net/schema.html)
- Supported values: "1.1draft", "1.2draft"
- Examples
- --------
- >>> G = nx.path_graph(4)
- >>> linefeed = chr(10) # linefeed=\n
- >>> s = linefeed.join(nx.generate_gexf(G))
- >>> for line in nx.generate_gexf(G): # doctest: +SKIP
- ... print(line)
- Notes
- -----
- This implementation does not support mixed graphs (directed and undirected
- edges together).
- The node id attribute is set to be the string of the node label.
- If you want to specify an id use set it as node data, e.g.
- node['a']['id']=1 to set the id of node 'a' to 1.
- References
- ----------
- .. [1] GEXF File Format, https://gephi.org/gexf/format/
- """
- writer = GEXFWriter(encoding=encoding, prettyprint=prettyprint, version=version)
- writer.add_graph(G)
- yield from str(writer).splitlines()
- @open_file(0, mode="rb")
- @nx._dispatchable(graphs=None, returns_graph=True)
- def read_gexf(path, node_type=None, relabel=False, version="1.2draft"):
- """Read graph in GEXF format from path.
- "GEXF (Graph Exchange XML Format) is a language for describing
- complex networks structures, their associated data and dynamics" [1]_.
- Parameters
- ----------
- path : file or string
- Filename or file handle to read.
- Filenames ending in .gz or .bz2 will be decompressed.
- node_type: Python type (default: None)
- Convert node ids to this type if not None.
- relabel : bool (default: False)
- If True relabel the nodes to use the GEXF node "label" attribute
- instead of the node "id" attribute as the NetworkX node label.
- version : string (default: 1.2draft)
- Version of GEFX File Format (see http://gexf.net/schema.html)
- Supported values: "1.1draft", "1.2draft"
- Returns
- -------
- graph: NetworkX graph
- If no parallel edges are found a Graph or DiGraph is returned.
- Otherwise a MultiGraph or MultiDiGraph is returned.
- Notes
- -----
- This implementation does not support mixed graphs (directed and undirected
- edges together).
- References
- ----------
- .. [1] GEXF File Format, http://gexf.net/
- """
- reader = GEXFReader(node_type=node_type, version=version)
- if relabel:
- G = relabel_gexf_graph(reader(path))
- else:
- G = reader(path)
- return G
- class GEXF:
- versions = {
- "1.1draft": {
- "NS_GEXF": "http://www.gexf.net/1.1draft",
- "NS_VIZ": "http://www.gexf.net/1.1draft/viz",
- "NS_XSI": "http://www.w3.org/2001/XMLSchema-instance",
- "SCHEMALOCATION": " ".join(
- [
- "http://www.gexf.net/1.1draft",
- "http://www.gexf.net/1.1draft/gexf.xsd",
- ]
- ),
- "VERSION": "1.1",
- },
- "1.2draft": {
- "NS_GEXF": "http://www.gexf.net/1.2draft",
- "NS_VIZ": "http://www.gexf.net/1.2draft/viz",
- "NS_XSI": "http://www.w3.org/2001/XMLSchema-instance",
- "SCHEMALOCATION": " ".join(
- [
- "http://www.gexf.net/1.2draft",
- "http://www.gexf.net/1.2draft/gexf.xsd",
- ]
- ),
- "VERSION": "1.2",
- },
- "1.3": {
- "NS_GEXF": "http://gexf.net/1.3",
- "NS_VIZ": "http://gexf.net/1.3/viz",
- "NS_XSI": "http://w3.org/2001/XMLSchema-instance",
- "SCHEMALOCATION": " ".join(
- [
- "http://gexf.net/1.3",
- "http://gexf.net/1.3/gexf.xsd",
- ]
- ),
- "VERSION": "1.3",
- },
- }
- def construct_types(self):
- types = [
- (int, "integer"),
- (float, "float"),
- (float, "double"),
- (bool, "boolean"),
- (list, "string"),
- (dict, "string"),
- (int, "long"),
- (str, "liststring"),
- (str, "anyURI"),
- (str, "string"),
- ]
- # These additions to types allow writing numpy types
- try:
- import numpy as np
- except ImportError:
- pass
- else:
- # prepend so that python types are created upon read (last entry wins)
- types = [
- (np.float64, "float"),
- (np.float32, "float"),
- (np.float16, "float"),
- (np.int_, "int"),
- (np.int8, "int"),
- (np.int16, "int"),
- (np.int32, "int"),
- (np.int64, "int"),
- (np.uint8, "int"),
- (np.uint16, "int"),
- (np.uint32, "int"),
- (np.uint64, "int"),
- (np.int_, "int"),
- (np.intc, "int"),
- (np.intp, "int"),
- ] + types
- self.xml_type = dict(types)
- self.python_type = dict(reversed(a) for a in types)
- # http://www.w3.org/TR/xmlschema-2/#boolean
- convert_bool = {
- "true": True,
- "false": False,
- "True": True,
- "False": False,
- "0": False,
- 0: False,
- "1": True,
- 1: True,
- }
- def set_version(self, version):
- d = self.versions.get(version)
- if d is None:
- raise nx.NetworkXError(f"Unknown GEXF version {version}.")
- self.NS_GEXF = d["NS_GEXF"]
- self.NS_VIZ = d["NS_VIZ"]
- self.NS_XSI = d["NS_XSI"]
- self.SCHEMALOCATION = d["SCHEMALOCATION"]
- self.VERSION = d["VERSION"]
- self.version = version
- class GEXFWriter(GEXF):
- # class for writing GEXF format files
- # use write_gexf() function
- def __init__(
- self, graph=None, encoding="utf-8", prettyprint=True, version="1.2draft"
- ):
- self.construct_types()
- self.prettyprint = prettyprint
- self.encoding = encoding
- self.set_version(version)
- self.xml = Element(
- "gexf",
- {
- "xmlns": self.NS_GEXF,
- "xmlns:xsi": self.NS_XSI,
- "xsi:schemaLocation": self.SCHEMALOCATION,
- "version": self.VERSION,
- },
- )
- # Make meta element a non-graph element
- # Also add lastmodifieddate as attribute, not tag
- meta_element = Element("meta")
- subelement_text = f"NetworkX {nx.__version__}"
- SubElement(meta_element, "creator").text = subelement_text
- meta_element.set("lastmodifieddate", time.strftime("%Y-%m-%d"))
- self.xml.append(meta_element)
- register_namespace("viz", self.NS_VIZ)
- # counters for edge and attribute identifiers
- self.edge_id = itertools.count()
- self.attr_id = itertools.count()
- self.all_edge_ids = set()
- # default attributes are stored in dictionaries
- self.attr = {}
- self.attr["node"] = {}
- self.attr["edge"] = {}
- self.attr["node"]["dynamic"] = {}
- self.attr["node"]["static"] = {}
- self.attr["edge"]["dynamic"] = {}
- self.attr["edge"]["static"] = {}
- if graph is not None:
- self.add_graph(graph)
- def __str__(self):
- if self.prettyprint:
- self.indent(self.xml)
- s = tostring(self.xml).decode(self.encoding)
- return s
- def add_graph(self, G):
- # first pass through G collecting edge ids
- for u, v, dd in G.edges(data=True):
- eid = dd.get("id")
- if eid is not None:
- self.all_edge_ids.add(str(eid))
- # set graph attributes
- if G.graph.get("mode") == "dynamic":
- mode = "dynamic"
- else:
- mode = "static"
- # Add a graph element to the XML
- if G.is_directed():
- default = "directed"
- else:
- default = "undirected"
- name = G.graph.get("name", "")
- graph_element = Element("graph", defaultedgetype=default, mode=mode, name=name)
- self.graph_element = graph_element
- self.add_nodes(G, graph_element)
- self.add_edges(G, graph_element)
- self.xml.append(graph_element)
- def add_nodes(self, G, graph_element):
- nodes_element = Element("nodes")
- for node, data in G.nodes(data=True):
- node_data = data.copy()
- node_id = str(node_data.pop("id", node))
- kw = {"id": node_id}
- label = str(node_data.pop("label", node))
- kw["label"] = label
- try:
- pid = node_data.pop("pid")
- kw["pid"] = str(pid)
- except KeyError:
- pass
- try:
- start = node_data.pop("start")
- kw["start"] = str(start)
- self.alter_graph_mode_timeformat(start)
- except KeyError:
- pass
- try:
- end = node_data.pop("end")
- kw["end"] = str(end)
- self.alter_graph_mode_timeformat(end)
- except KeyError:
- pass
- # add node element with attributes
- node_element = Element("node", **kw)
- # add node element and attr subelements
- default = G.graph.get("node_default", {})
- node_data = self.add_parents(node_element, node_data)
- if self.VERSION == "1.1":
- node_data = self.add_slices(node_element, node_data)
- else:
- node_data = self.add_spells(node_element, node_data)
- node_data = self.add_viz(node_element, node_data)
- node_data = self.add_attributes("node", node_element, node_data, default)
- nodes_element.append(node_element)
- graph_element.append(nodes_element)
- def add_edges(self, G, graph_element):
- def edge_key_data(G):
- # helper function to unify multigraph and graph edge iterator
- if G.is_multigraph():
- for u, v, key, data in G.edges(data=True, keys=True):
- edge_data = data.copy()
- edge_data.update(key=key)
- edge_id = edge_data.pop("id", None)
- if edge_id is None:
- edge_id = next(self.edge_id)
- while str(edge_id) in self.all_edge_ids:
- edge_id = next(self.edge_id)
- self.all_edge_ids.add(str(edge_id))
- yield u, v, edge_id, edge_data
- else:
- for u, v, data in G.edges(data=True):
- edge_data = data.copy()
- edge_id = edge_data.pop("id", None)
- if edge_id is None:
- edge_id = next(self.edge_id)
- while str(edge_id) in self.all_edge_ids:
- edge_id = next(self.edge_id)
- self.all_edge_ids.add(str(edge_id))
- yield u, v, edge_id, edge_data
- edges_element = Element("edges")
- for u, v, key, edge_data in edge_key_data(G):
- kw = {"id": str(key)}
- try:
- edge_label = edge_data.pop("label")
- kw["label"] = str(edge_label)
- except KeyError:
- pass
- try:
- edge_weight = edge_data.pop("weight")
- kw["weight"] = str(edge_weight)
- except KeyError:
- pass
- try:
- edge_type = edge_data.pop("type")
- kw["type"] = str(edge_type)
- except KeyError:
- pass
- try:
- start = edge_data.pop("start")
- kw["start"] = str(start)
- self.alter_graph_mode_timeformat(start)
- except KeyError:
- pass
- try:
- end = edge_data.pop("end")
- kw["end"] = str(end)
- self.alter_graph_mode_timeformat(end)
- except KeyError:
- pass
- source_id = str(G.nodes[u].get("id", u))
- target_id = str(G.nodes[v].get("id", v))
- edge_element = Element("edge", source=source_id, target=target_id, **kw)
- default = G.graph.get("edge_default", {})
- if self.VERSION == "1.1":
- edge_data = self.add_slices(edge_element, edge_data)
- else:
- edge_data = self.add_spells(edge_element, edge_data)
- edge_data = self.add_viz(edge_element, edge_data)
- edge_data = self.add_attributes("edge", edge_element, edge_data, default)
- edges_element.append(edge_element)
- graph_element.append(edges_element)
- def add_attributes(self, node_or_edge, xml_obj, data, default):
- # Add attrvalues to node or edge
- attvalues = Element("attvalues")
- if len(data) == 0:
- return data
- mode = "static"
- for k, v in data.items():
- # rename generic multigraph key to avoid any name conflict
- if k == "key":
- k = "networkx_key"
- val_type = type(v)
- if val_type not in self.xml_type:
- raise TypeError(f"attribute value type is not allowed: {val_type}")
- if isinstance(v, list):
- # dynamic data
- for val, start, end in v:
- val_type = type(val)
- if start is not None or end is not None:
- mode = "dynamic"
- self.alter_graph_mode_timeformat(start)
- self.alter_graph_mode_timeformat(end)
- break
- attr_id = self.get_attr_id(
- str(k), self.xml_type[val_type], node_or_edge, default, mode
- )
- for val, start, end in v:
- e = Element("attvalue")
- e.attrib["for"] = attr_id
- e.attrib["value"] = str(val)
- # Handle nan, inf, -inf differently
- if val_type is float:
- if e.attrib["value"] == "inf":
- e.attrib["value"] = "INF"
- elif e.attrib["value"] == "nan":
- e.attrib["value"] = "NaN"
- elif e.attrib["value"] == "-inf":
- e.attrib["value"] = "-INF"
- if start is not None:
- e.attrib["start"] = str(start)
- if end is not None:
- e.attrib["end"] = str(end)
- attvalues.append(e)
- else:
- # static data
- mode = "static"
- attr_id = self.get_attr_id(
- str(k), self.xml_type[val_type], node_or_edge, default, mode
- )
- e = Element("attvalue")
- e.attrib["for"] = attr_id
- if isinstance(v, bool):
- e.attrib["value"] = str(v).lower()
- else:
- e.attrib["value"] = str(v)
- # Handle float nan, inf, -inf differently
- if val_type is float:
- if e.attrib["value"] == "inf":
- e.attrib["value"] = "INF"
- elif e.attrib["value"] == "nan":
- e.attrib["value"] = "NaN"
- elif e.attrib["value"] == "-inf":
- e.attrib["value"] = "-INF"
- attvalues.append(e)
- xml_obj.append(attvalues)
- return data
- def get_attr_id(self, title, attr_type, edge_or_node, default, mode):
- # find the id of the attribute or generate a new id
- try:
- return self.attr[edge_or_node][mode][title]
- except KeyError:
- # generate new id
- new_id = str(next(self.attr_id))
- self.attr[edge_or_node][mode][title] = new_id
- attr_kwargs = {"id": new_id, "title": title, "type": attr_type}
- attribute = Element("attribute", **attr_kwargs)
- # add subelement for data default value if present
- default_title = default.get(title)
- if default_title is not None:
- default_element = Element("default")
- default_element.text = str(default_title)
- attribute.append(default_element)
- # new insert it into the XML
- attributes_element = None
- for a in self.graph_element.findall("attributes"):
- # find existing attributes element by class and mode
- a_class = a.get("class")
- a_mode = a.get("mode", "static")
- if a_class == edge_or_node and a_mode == mode:
- attributes_element = a
- if attributes_element is None:
- # create new attributes element
- attr_kwargs = {"mode": mode, "class": edge_or_node}
- attributes_element = Element("attributes", **attr_kwargs)
- self.graph_element.insert(0, attributes_element)
- attributes_element.append(attribute)
- return new_id
- def add_viz(self, element, node_data):
- viz = node_data.pop("viz", False)
- if viz:
- color = viz.get("color")
- if color is not None:
- if self.VERSION == "1.1":
- e = Element(
- f"{{{self.NS_VIZ}}}color",
- r=str(color.get("r")),
- g=str(color.get("g")),
- b=str(color.get("b")),
- )
- else:
- e = Element(
- f"{{{self.NS_VIZ}}}color",
- r=str(color.get("r")),
- g=str(color.get("g")),
- b=str(color.get("b")),
- a=str(color.get("a", 1.0)),
- )
- element.append(e)
- size = viz.get("size")
- if size is not None:
- e = Element(f"{{{self.NS_VIZ}}}size", value=str(size))
- element.append(e)
- thickness = viz.get("thickness")
- if thickness is not None:
- e = Element(f"{{{self.NS_VIZ}}}thickness", value=str(thickness))
- element.append(e)
- shape = viz.get("shape")
- if shape is not None:
- if shape.startswith("http"):
- e = Element(
- f"{{{self.NS_VIZ}}}shape", value="image", uri=str(shape)
- )
- else:
- e = Element(f"{{{self.NS_VIZ}}}shape", value=str(shape))
- element.append(e)
- position = viz.get("position")
- if position is not None:
- e = Element(
- f"{{{self.NS_VIZ}}}position",
- x=str(position.get("x")),
- y=str(position.get("y")),
- z=str(position.get("z")),
- )
- element.append(e)
- return node_data
- def add_parents(self, node_element, node_data):
- parents = node_data.pop("parents", False)
- if parents:
- parents_element = Element("parents")
- for p in parents:
- e = Element("parent")
- e.attrib["for"] = str(p)
- parents_element.append(e)
- node_element.append(parents_element)
- return node_data
- def add_slices(self, node_or_edge_element, node_or_edge_data):
- slices = node_or_edge_data.pop("slices", False)
- if slices:
- slices_element = Element("slices")
- for start, end in slices:
- e = Element("slice", start=str(start), end=str(end))
- slices_element.append(e)
- node_or_edge_element.append(slices_element)
- return node_or_edge_data
- def add_spells(self, node_or_edge_element, node_or_edge_data):
- spells = node_or_edge_data.pop("spells", False)
- if spells:
- spells_element = Element("spells")
- for start, end in spells:
- e = Element("spell")
- if start is not None:
- e.attrib["start"] = str(start)
- self.alter_graph_mode_timeformat(start)
- if end is not None:
- e.attrib["end"] = str(end)
- self.alter_graph_mode_timeformat(end)
- spells_element.append(e)
- node_or_edge_element.append(spells_element)
- return node_or_edge_data
- def alter_graph_mode_timeformat(self, start_or_end):
- # If 'start' or 'end' appears, set timeformat
- if start_or_end is not None:
- if isinstance(start_or_end, str):
- timeformat = "date"
- elif isinstance(start_or_end, float):
- timeformat = "double"
- elif isinstance(start_or_end, int):
- timeformat = "long"
- else:
- raise nx.NetworkXError(
- "timeformat should be of the type int, float or str"
- )
- self.graph_element.set("timeformat", timeformat)
- # If Graph mode is static, alter to dynamic
- if self.graph_element.get("mode") == "static":
- self.graph_element.set("mode", "dynamic")
- def write(self, fh):
- # Serialize graph G in GEXF to the open fh
- if self.prettyprint:
- self.indent(self.xml)
- document = ElementTree(self.xml)
- document.write(fh, encoding=self.encoding, xml_declaration=True)
- def indent(self, elem, level=0):
- # in-place prettyprint formatter
- i = "\n" + " " * level
- if len(elem):
- if not elem.text or not elem.text.strip():
- elem.text = i + " "
- if not elem.tail or not elem.tail.strip():
- elem.tail = i
- for elem in elem:
- self.indent(elem, level + 1)
- if not elem.tail or not elem.tail.strip():
- elem.tail = i
- else:
- if level and (not elem.tail or not elem.tail.strip()):
- elem.tail = i
- class GEXFReader(GEXF):
- # Class to read GEXF format files
- # use read_gexf() function
- def __init__(self, node_type=None, version="1.2draft"):
- self.construct_types()
- self.node_type = node_type
- # assume simple graph and test for multigraph on read
- self.simple_graph = True
- self.set_version(version)
- def __call__(self, stream):
- self.xml = ElementTree(file=stream)
- g = self.xml.find(f"{{{self.NS_GEXF}}}graph")
- if g is not None:
- return self.make_graph(g)
- # try all the versions
- for version in self.versions:
- self.set_version(version)
- g = self.xml.find(f"{{{self.NS_GEXF}}}graph")
- if g is not None:
- return self.make_graph(g)
- raise nx.NetworkXError("No <graph> element in GEXF file.")
- def make_graph(self, graph_xml):
- # start with empty DiGraph or MultiDiGraph
- edgedefault = graph_xml.get("defaultedgetype", None)
- if edgedefault == "directed":
- G = nx.MultiDiGraph()
- else:
- G = nx.MultiGraph()
- # graph attributes
- graph_name = graph_xml.get("name", "")
- if graph_name != "":
- G.graph["name"] = graph_name
- graph_start = graph_xml.get("start")
- if graph_start is not None:
- G.graph["start"] = graph_start
- graph_end = graph_xml.get("end")
- if graph_end is not None:
- G.graph["end"] = graph_end
- graph_mode = graph_xml.get("mode", "")
- if graph_mode == "dynamic":
- G.graph["mode"] = "dynamic"
- else:
- G.graph["mode"] = "static"
- # timeformat
- self.timeformat = graph_xml.get("timeformat")
- if self.timeformat == "date":
- self.timeformat = "string"
- # node and edge attributes
- attributes_elements = graph_xml.findall(f"{{{self.NS_GEXF}}}attributes")
- # dictionaries to hold attributes and attribute defaults
- node_attr = {}
- node_default = {}
- edge_attr = {}
- edge_default = {}
- for a in attributes_elements:
- attr_class = a.get("class")
- if attr_class == "node":
- na, nd = self.find_gexf_attributes(a)
- node_attr.update(na)
- node_default.update(nd)
- G.graph["node_default"] = node_default
- elif attr_class == "edge":
- ea, ed = self.find_gexf_attributes(a)
- edge_attr.update(ea)
- edge_default.update(ed)
- G.graph["edge_default"] = edge_default
- else:
- raise # unknown attribute class
- # Hack to handle Gephi0.7beta bug
- # add weight attribute
- ea = {"weight": {"type": "double", "mode": "static", "title": "weight"}}
- ed = {}
- edge_attr.update(ea)
- edge_default.update(ed)
- G.graph["edge_default"] = edge_default
- # add nodes
- nodes_element = graph_xml.find(f"{{{self.NS_GEXF}}}nodes")
- if nodes_element is not None:
- for node_xml in nodes_element.findall(f"{{{self.NS_GEXF}}}node"):
- self.add_node(G, node_xml, node_attr)
- # add edges
- edges_element = graph_xml.find(f"{{{self.NS_GEXF}}}edges")
- if edges_element is not None:
- for edge_xml in edges_element.findall(f"{{{self.NS_GEXF}}}edge"):
- self.add_edge(G, edge_xml, edge_attr)
- # switch to Graph or DiGraph if no parallel edges were found.
- if self.simple_graph:
- if G.is_directed():
- G = nx.DiGraph(G)
- else:
- G = nx.Graph(G)
- return G
- def add_node(self, G, node_xml, node_attr, node_pid=None):
- # add a single node with attributes to the graph
- # get attributes and subattributues for node
- data = self.decode_attr_elements(node_attr, node_xml)
- data = self.add_parents(data, node_xml) # add any parents
- if self.VERSION == "1.1":
- data = self.add_slices(data, node_xml) # add slices
- else:
- data = self.add_spells(data, node_xml) # add spells
- data = self.add_viz(data, node_xml) # add viz
- data = self.add_start_end(data, node_xml) # add start/end
- # find the node id and cast it to the appropriate type
- node_id = node_xml.get("id")
- if self.node_type is not None:
- node_id = self.node_type(node_id)
- # every node should have a label
- node_label = node_xml.get("label")
- data["label"] = node_label
- # parent node id
- node_pid = node_xml.get("pid", node_pid)
- if node_pid is not None:
- data["pid"] = node_pid
- # check for subnodes, recursive
- subnodes = node_xml.find(f"{{{self.NS_GEXF}}}nodes")
- if subnodes is not None:
- for node_xml in subnodes.findall(f"{{{self.NS_GEXF}}}node"):
- self.add_node(G, node_xml, node_attr, node_pid=node_id)
- G.add_node(node_id, **data)
- def add_start_end(self, data, xml):
- # start and end times
- ttype = self.timeformat
- node_start = xml.get("start")
- if node_start is not None:
- data["start"] = self.python_type[ttype](node_start)
- node_end = xml.get("end")
- if node_end is not None:
- data["end"] = self.python_type[ttype](node_end)
- return data
- def add_viz(self, data, node_xml):
- # add viz element for node
- viz = {}
- color = node_xml.find(f"{{{self.NS_VIZ}}}color")
- if color is not None:
- if self.VERSION == "1.1":
- viz["color"] = {
- "r": int(color.get("r")),
- "g": int(color.get("g")),
- "b": int(color.get("b")),
- }
- else:
- viz["color"] = {
- "r": int(color.get("r")),
- "g": int(color.get("g")),
- "b": int(color.get("b")),
- "a": float(color.get("a", 1)),
- }
- size = node_xml.find(f"{{{self.NS_VIZ}}}size")
- if size is not None:
- viz["size"] = float(size.get("value"))
- thickness = node_xml.find(f"{{{self.NS_VIZ}}}thickness")
- if thickness is not None:
- viz["thickness"] = float(thickness.get("value"))
- shape = node_xml.find(f"{{{self.NS_VIZ}}}shape")
- if shape is not None:
- viz["shape"] = shape.get("shape")
- if viz["shape"] == "image":
- viz["shape"] = shape.get("uri")
- position = node_xml.find(f"{{{self.NS_VIZ}}}position")
- if position is not None:
- viz["position"] = {
- "x": float(position.get("x", 0)),
- "y": float(position.get("y", 0)),
- "z": float(position.get("z", 0)),
- }
- if len(viz) > 0:
- data["viz"] = viz
- return data
- def add_parents(self, data, node_xml):
- parents_element = node_xml.find(f"{{{self.NS_GEXF}}}parents")
- if parents_element is not None:
- data["parents"] = []
- for p in parents_element.findall(f"{{{self.NS_GEXF}}}parent"):
- parent = p.get("for")
- data["parents"].append(parent)
- return data
- def add_slices(self, data, node_or_edge_xml):
- slices_element = node_or_edge_xml.find(f"{{{self.NS_GEXF}}}slices")
- if slices_element is not None:
- data["slices"] = []
- for s in slices_element.findall(f"{{{self.NS_GEXF}}}slice"):
- start = s.get("start")
- end = s.get("end")
- data["slices"].append((start, end))
- return data
- def add_spells(self, data, node_or_edge_xml):
- spells_element = node_or_edge_xml.find(f"{{{self.NS_GEXF}}}spells")
- if spells_element is not None:
- data["spells"] = []
- ttype = self.timeformat
- for s in spells_element.findall(f"{{{self.NS_GEXF}}}spell"):
- start = self.python_type[ttype](s.get("start"))
- end = self.python_type[ttype](s.get("end"))
- data["spells"].append((start, end))
- return data
- def add_edge(self, G, edge_element, edge_attr):
- # add an edge to the graph
- # raise error if we find mixed directed and undirected edges
- edge_direction = edge_element.get("type")
- if G.is_directed() and edge_direction == "undirected":
- raise nx.NetworkXError("Undirected edge found in directed graph.")
- if (not G.is_directed()) and edge_direction == "directed":
- raise nx.NetworkXError("Directed edge found in undirected graph.")
- # Get source and target and recast type if required
- source = edge_element.get("source")
- target = edge_element.get("target")
- if self.node_type is not None:
- source = self.node_type(source)
- target = self.node_type(target)
- data = self.decode_attr_elements(edge_attr, edge_element)
- data = self.add_start_end(data, edge_element)
- if self.VERSION == "1.1":
- data = self.add_slices(data, edge_element) # add slices
- else:
- data = self.add_spells(data, edge_element) # add spells
- # GEXF stores edge ids as an attribute
- # NetworkX uses them as keys in multigraphs
- # if networkx_key is not specified as an attribute
- edge_id = edge_element.get("id")
- if edge_id is not None:
- data["id"] = edge_id
- # check if there is a 'multigraph_key' and use that as edge_id
- multigraph_key = data.pop("networkx_key", None)
- if multigraph_key is not None:
- edge_id = multigraph_key
- weight = edge_element.get("weight")
- if weight is not None:
- data["weight"] = float(weight)
- edge_label = edge_element.get("label")
- if edge_label is not None:
- data["label"] = edge_label
- if G.has_edge(source, target):
- # seen this edge before - this is a multigraph
- self.simple_graph = False
- G.add_edge(source, target, key=edge_id, **data)
- if edge_direction == "mutual":
- G.add_edge(target, source, key=edge_id, **data)
- def decode_attr_elements(self, gexf_keys, obj_xml):
- # Use the key information to decode the attr XML
- attr = {}
- # look for outer '<attvalues>' element
- attr_element = obj_xml.find(f"{{{self.NS_GEXF}}}attvalues")
- if attr_element is not None:
- # loop over <attvalue> elements
- for a in attr_element.findall(f"{{{self.NS_GEXF}}}attvalue"):
- key = a.get("for") # for is required
- try: # should be in our gexf_keys dictionary
- title = gexf_keys[key]["title"]
- except KeyError as err:
- raise nx.NetworkXError(f"No attribute defined for={key}.") from err
- atype = gexf_keys[key]["type"]
- value = a.get("value")
- if atype == "boolean":
- value = self.convert_bool[value]
- else:
- value = self.python_type[atype](value)
- if gexf_keys[key]["mode"] == "dynamic":
- # for dynamic graphs use list of three-tuples
- # [(value1,start1,end1), (value2,start2,end2), etc]
- ttype = self.timeformat
- start = self.python_type[ttype](a.get("start"))
- end = self.python_type[ttype](a.get("end"))
- if title in attr:
- attr[title].append((value, start, end))
- else:
- attr[title] = [(value, start, end)]
- else:
- # for static graphs just assign the value
- attr[title] = value
- return attr
- def find_gexf_attributes(self, attributes_element):
- # Extract all the attributes and defaults
- attrs = {}
- defaults = {}
- mode = attributes_element.get("mode")
- for k in attributes_element.findall(f"{{{self.NS_GEXF}}}attribute"):
- attr_id = k.get("id")
- title = k.get("title")
- atype = k.get("type")
- attrs[attr_id] = {"title": title, "type": atype, "mode": mode}
- # check for the 'default' subelement of key element and add
- default = k.find(f"{{{self.NS_GEXF}}}default")
- if default is not None:
- if atype == "boolean":
- value = self.convert_bool[default.text]
- else:
- value = self.python_type[atype](default.text)
- defaults[title] = value
- return attrs, defaults
- def relabel_gexf_graph(G):
- """Relabel graph using "label" node keyword for node label.
- Parameters
- ----------
- G : graph
- A NetworkX graph read from GEXF data
- Returns
- -------
- H : graph
- A NetworkX graph with relabeled nodes
- Raises
- ------
- NetworkXError
- If node labels are missing or not unique while relabel=True.
- Notes
- -----
- This function relabels the nodes in a NetworkX graph with the
- "label" attribute. It also handles relabeling the specific GEXF
- node attributes "parents", and "pid".
- """
- # build mapping of node labels, do some error checking
- try:
- mapping = [(u, G.nodes[u]["label"]) for u in G]
- except KeyError as err:
- raise nx.NetworkXError(
- "Failed to relabel nodes: missing node labels found. Use relabel=False."
- ) from err
- x, y = zip(*mapping)
- if len(set(y)) != len(G):
- raise nx.NetworkXError(
- "Failed to relabel nodes: duplicate node labels found. Use relabel=False."
- )
- mapping = dict(mapping)
- H = nx.relabel_nodes(G, mapping)
- # relabel attributes
- for n in G:
- m = mapping[n]
- H.nodes[m]["id"] = n
- H.nodes[m].pop("label")
- if "pid" in H.nodes[m]:
- H.nodes[m]["pid"] = mapping[G.nodes[n]["pid"]]
- if "parents" in H.nodes[m]:
- H.nodes[m]["parents"] = [mapping[p] for p in G.nodes[n]["parents"]]
- return H
|