#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import hashlib
import json
import logging
import warnings
from abc import ABCMeta
from abc import abstractmethod
from copy import deepcopy
from typing import Dict
from typing import List
from typing import Mapping
from typing import Tuple
from typing import Union
from urllib.parse import urlparse
try:
import vineyard
except (ImportError, TypeError):
vineyard = None
from graphscope.framework import dag_utils
from graphscope.framework import utils
from graphscope.framework.dag import DAGNode
from graphscope.framework.errors import check_argument
from graphscope.framework.graph_schema import GraphSchema
from graphscope.framework.graph_utils import EdgeLabel
from graphscope.framework.graph_utils import EdgeSubLabel
from graphscope.framework.graph_utils import VertexLabel
from graphscope.framework.operation import Operation
from graphscope.framework.utils import apply_docstring
from graphscope.framework.utils import data_type_to_cpp
from graphscope.proto import attr_value_pb2
from graphscope.proto import graph_def_pb2
from graphscope.proto import types_pb2
logger = logging.getLogger("graphscope")
class GraphInterface(metaclass=ABCMeta):
"""Base Class to derive GraphDAGNode and Graph"""
def __init__(self):
self._session = None
self._directed = True
self._generate_eid = True
self._retain_oid = True
self._oid_type = "int64"
self._vid_type = "uint64"
self._vertex_map = graph_def_pb2.GLOBAL_VERTEX_MAP
self._compact_edges = False
self._use_perfect_hash = False
self._extend_label_data = 0
@property
def session_id(self):
raise NotImplementedError
@abstractmethod
def add_column(self, results, selector):
raise NotImplementedError
@abstractmethod
def add_vertices(self, vertices, label="_", properties=None, vid_field=0):
raise NotImplementedError
@abstractmethod
def add_edges(
self,
edges,
label="_",
properties=None,
src_label=None,
dst_label=None,
src_field=0,
dst_field=1,
):
raise NotImplementedError
@abstractmethod
def consolidate_columns(
self,
label: str,
columns: Union[List[str], Tuple[str]],
result_column: str,
):
raise NotImplementedError
def is_directed(self):
return self._directed
def to_numpy(self, selector, vertex_range=None):
raise NotImplementedError
def to_dataframe(self, selector, vertex_range=None):
raise NotImplementedError
def save_to(self, path, **kwargs):
raise NotImplementedError
@classmethod
def load_from(cls, path, sess, **kwargs):
raise NotImplementedError
@abstractmethod
def project(self, vertices, edges):
raise NotImplementedError
def unload(self):
with warnings.catch_warnings():
warnings.simplefilter("always", DeprecationWarning)
warnings.warn(
"The Graph.unload() method has been deprecated, please using the `del` operator instead, i.e., `del graph`",
DeprecationWarning,
)
def _from_nx_graph(self, g):
"""Create a gs graph from a nx graph.
Args:
g (:class:`graphscope.nx.graph`): A nx graph that contains graph data.
Raises:
RuntimeError: NX graph and gs graph not in the same session.
TypeError: Convert a graph view of nx graph to gs graph.
Returns: :class:`graphscope.framework.operation.Operation`
that will be used to construct a :class:`graphscope.Graph`
Examples:
.. code:: python
>>> import graphscope as gs
>>> nx_g = gs.nx.path_graph(10)
>>> gs_g = gs.Graph(nx_g)
"""
if self.session_id != g.session_id:
raise RuntimeError(
"networkx graph and graphscope graph not in the same session."
)
if hasattr(g, "_graph"):
raise TypeError("graph view can not convert to gs graph")
return dag_utils.dynamic_to_arrow(g)
def _from_vineyard(self, vineyard_object):
"""Load a graph from a already existed vineyard graph.
Args:
vineyard_object (:class:`vineyard.Object`, :class:`vineyard.ObjectID`
or :class:`vineyard.ObjectName`): vineyard object,
which represents a graph.
Returns:
:class:`graphscope.framework.operation.Operation`
"""
if isinstance(vineyard_object, vineyard.Object):
return self._construct_op_from_vineyard_id(vineyard_object.id)
if isinstance(vineyard_object, vineyard.ObjectID):
return self._construct_op_from_vineyard_id(vineyard_object)
if isinstance(vineyard_object, vineyard.ObjectName):
return self._construct_op_from_vineyard_name(vineyard_object)
def _construct_op_from_vineyard_id(self, vineyard_id):
assert self._session is not None
config = {}
config[types_pb2.IS_FROM_VINEYARD_ID] = utils.b_to_attr(True)
config[types_pb2.VINEYARD_ID] = utils.i_to_attr(int(vineyard_id))
# FIXME(hetao) hardcode oid/vid type for codegen, when loading from vineyard
#
# the metadata should be retrieved from vineyard
config[types_pb2.OID_TYPE] = utils.s_to_attr("int64_t")
config[types_pb2.VID_TYPE] = utils.s_to_attr("uint64_t")
return dag_utils.create_graph(
self.session_id, graph_def_pb2.ARROW_PROPERTY, attrs=config
)
def _construct_op_from_vineyard_name(self, vineyard_name):
assert self._session is not None
config = {}
config[types_pb2.IS_FROM_VINEYARD_ID] = utils.b_to_attr(True)
config[types_pb2.VINEYARD_NAME] = utils.s_to_attr(str(vineyard_name))
# FIXME(hetao) hardcode oid/vid type for codegen, when loading from vineyard
#
# the metadata should be retrieved from vineyard
config[types_pb2.OID_TYPE] = utils.s_to_attr("int64_t")
config[types_pb2.VID_TYPE] = utils.s_to_attr("uint64_t")
return dag_utils.create_graph(
self.session_id, graph_def_pb2.ARROW_PROPERTY, attrs=config
)
def _construct_op_of_empty_graph(self):
config = {}
config[types_pb2.ARROW_PROPERTY_DEFINITION] = attr_value_pb2.AttrValue()
config[types_pb2.DIRECTED] = utils.b_to_attr(self._directed)
config[types_pb2.GENERATE_EID] = utils.b_to_attr(self._generate_eid)
config[types_pb2.RETAIN_OID] = utils.b_to_attr(self._retain_oid)
config[types_pb2.OID_TYPE] = utils.s_to_attr(self._oid_type)
config[types_pb2.VID_TYPE] = utils.s_to_attr(self._vid_type)
config[types_pb2.IS_FROM_VINEYARD_ID] = utils.b_to_attr(False)
config[types_pb2.IS_FROM_GAR] = utils.b_to_attr(False)
config[types_pb2.VERTEX_MAP_TYPE] = utils.i_to_attr(self._vertex_map)
config[types_pb2.COMPACT_EDGES] = utils.b_to_attr(self._compact_edges)
config[types_pb2.USE_PERFECT_HASH] = utils.b_to_attr(self._use_perfect_hash)
config[types_pb2.EXTEND_LABEL_DATA] = utils.i_to_attr(self._extend_label_data)
return dag_utils.create_graph(
self.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=None, attrs=config
)
[docs]class GraphDAGNode(DAGNode, GraphInterface):
"""A class represents a graph node in a DAG.
In GraphScope, all operations that generate a new graph will return
a instance of :class:`GraphDAGNode`, which will be automatically
executed by :meth:`Session.run` in `eager` mode.
The following example demonstrates its usage:
.. code:: python
>>> # lazy mode
>>> import graphscope as gs
>>> sess = gs.session(mode="lazy")
>>> g = sess.g()
>>> g1 = g.add_vertices("person.csv","person")
>>> print(g1) # <graphscope.framework.graph.GraphDAGNode object>
>>> g2 = sess.run(g1)
>>> print(g2) # <graphscope.framework.graph.Graph object>
>>> # eager mode
>>> import graphscope as gs
>>> sess = gs.session(mode="eager")
>>> g = sess.g()
>>> g1 = g.add_vertices("person.csv","person")
>>> print(g1) # <graphscope.framework.graph.Graph object>
>>> del g1
"""
[docs] def __init__(
self,
session,
incoming_data=None,
oid_type="int64",
vid_type="uint64",
directed=True,
generate_eid=True,
retain_oid=True,
vertex_map: Union[str, int] = "global",
compact_edges=False,
use_perfect_hash=False,
):
"""Construct a :class:`GraphDAGNode` object.
Args:
session (:class:`Session`): A graphscope session instance.
incoming_data: Graph can be initialized through various type of sources,
which can be one of:
- :class:`graphscope.framework.operation.Operation`
- :class:`graphscope.nx.Graph`
- :class:`graphscope.Graph`
- :class:`vineyard.Object`, :class:`vineyard.ObjectId` or :class:`vineyard.ObjectName`
oid_type: (str, optional): Type of vertex original id. Defaults to "int64".
vid_type: (str, optional): Type of vertex internal id. Defaults to "uint64".
directed: (bool, optional): Directed graph or not. Defaults to True.
generate_eid: (bool, optional): Generate id for each edge when set True. Defaults to True.
retain_oid: (bool, optional): Keep original ID in vertex table when set True. Defaults to True.
vertex_map (str, optional): Indicate use global vertex map or local vertex map. Can be "global" or "local".
Defaults to global.
compact_edges (bool, optional): Compact edges (CSR) using varint and delta encoding. Defaults to False.
Note that compact edges helps to half the memory usage of edges in graph data structure, but may cause
at most 10%~20% performance degeneration in some algorithms. Defaults to False.
use_perfect_hash (bool, optional): Use perfect hash in vertex map to optimize the memory usage. Defaults to False.
"""
super().__init__()
self._session = session
oid_type = utils.normalize_data_type_str(oid_type)
if oid_type not in ("int32_t", "int64_t", "std::string"):
raise ValueError("oid_type can only be int32_t, int64_t or string.")
vid_type = utils.normalize_data_type_str(vid_type)
if vid_type not in ("uint32_t", "uint64_t"):
raise ValueError("vid_type can only be uint32_t or uint64_t.")
self._oid_type = oid_type
self._vid_type = vid_type
self._directed = directed
self._generate_eid = generate_eid
self._retain_oid = retain_oid
self._graph_type = graph_def_pb2.ARROW_PROPERTY
self._vertex_map = utils.vertex_map_type_to_enum(vertex_map)
self._compact_edges = compact_edges
self._use_perfect_hash = use_perfect_hash
# for need to extend label in 'eager mode' when add_vertices and add_edges
# 0 - not extending label
# 1 - extend vertex label
# 2 - extend edge label
self._extend_label_data = 0
# list of pair <parent_op_key, VertexLabel/EdgeLabel>
self._unsealed_vertices_and_edges = list()
# check for newly added vertices and edges.
self._v_labels = list()
self._e_labels = list()
self._e_relationships = list()
self._base_graph = None
# add op to dag
self._resolve_op(incoming_data)
self._session.dag.add_op(self._op)
# statically create the unload op, as the op may change, the
# unload op should be refreshed as well.
if self._op is None:
self._unload_op = None
else:
self._unload_op = dag_utils.unload_graph(self)
@property
def v_labels(self):
return self._v_labels
@v_labels.setter
def v_labels(self, value):
self._v_labels = value
@property
def e_labels(self):
return self._e_labels
@e_labels.setter
def e_labels(self, value):
self._e_labels = value
@property
def e_relationships(self):
return self._e_relationships
@e_relationships.setter
def e_relationships(self, value):
self._e_relationships = value
@property
def graph_type(self):
"""The type of the graph object.
Returns:
type (`types_pb2.GraphType`): the type of the graph.
"""
return self._graph_type
@property
def oid_type(self):
return utils.normalize_data_type_str(self._oid_type)
@property
def vid_type(self):
return utils.normalize_data_type_str(self._vid_type)
def _project_to_simple(self, v_prop=None, e_prop=None):
check_argument(self.graph_type == graph_def_pb2.ARROW_PROPERTY)
op = dag_utils.project_to_simple(self, str(v_prop), str(e_prop))
# construct dag node
graph_dag_node = GraphDAGNode(
self._session,
op,
self._oid_type,
self._vid_type,
self._directed,
self._generate_eid,
self._retain_oid,
self._vertex_map,
self._compact_edges,
self._use_perfect_hash,
)
graph_dag_node._base_graph = self
return graph_dag_node
def _resolve_op(self, incoming_data):
if incoming_data is None:
# create dag node of empty graph
self._op = self._construct_op_of_empty_graph()
elif isinstance(incoming_data, Operation):
self._op = incoming_data
if self._op.type == types_pb2.PROJECT_TO_SIMPLE:
self._graph_type = graph_def_pb2.ARROW_PROJECTED
elif isinstance(incoming_data, Graph):
self._op = dag_utils.copy_graph(incoming_data)
self._graph_type = incoming_data.graph_type
elif isinstance(incoming_data, GraphDAGNode):
if incoming_data.session_id != self.session_id:
raise RuntimeError(f"{incoming_data} not in the same session.")
raise NotImplementedError
elif vineyard is not None and isinstance(
incoming_data, (vineyard.Object, vineyard.ObjectID, vineyard.ObjectName)
):
self._op = self._from_vineyard(incoming_data)
else:
# Don't import the :code:`NXGraph` in top-level statements to improve the
# performance of :code:`import graphscope`.
from graphscope import nx
if isinstance(incoming_data, nx.classes.graph._GraphBase):
self._op = self._from_nx_graph(incoming_data)
else:
raise RuntimeError("Not supported incoming data.")
# update the unload op
self._unload_op = dag_utils.unload_graph(self)
def to_numpy(self, selector, vertex_range=None):
"""Select some elements of the graph and output to numpy.
Args:
selector (str): Select a portion of graph as a numpy.ndarray.
vertex_range(dict, optional): Slice vertices. Defaults to None.
Returns:
:class:`graphscope.framework.context.ResultDAGNode`:
A result holds the `numpy.ndarray`, evaluated in eager mode.
"""
# avoid circular import
from graphscope.framework.context import ResultDAGNode
check_argument(self.graph_type == graph_def_pb2.ARROW_PROPERTY)
vertex_range = utils.transform_vertex_range(vertex_range)
op = dag_utils.graph_to_numpy(self, selector, vertex_range)
return ResultDAGNode(self, op)
def to_dataframe(self, selector, vertex_range=None):
"""Select some elements of the graph and output as a pandas.DataFrame
Args:
selector (dict): Select some portions of graph.
vertex_range (dict, optional): Slice vertices. Defaults to None.
Returns:
:class:`graphscope.framework.context.ResultDAGNode`:
A result holds the `pandas.DataFrame`, evaluated in eager mode.
"""
# avoid circular import
from graphscope.framework.context import ResultDAGNode
check_argument(self.graph_type == graph_def_pb2.ARROW_PROPERTY)
check_argument(
isinstance(selector, Mapping),
"selector of to dataframe must be a dict",
)
selector = json.dumps(selector)
vertex_range = utils.transform_vertex_range(vertex_range)
op = dag_utils.graph_to_dataframe(self, selector, vertex_range)
return ResultDAGNode(self, op)
def to_directed(self):
op = dag_utils.to_directed(self)
graph_dag_node = GraphDAGNode(self._session, op)
return graph_dag_node
def to_undirected(self):
op = dag_utils.to_undirected(self)
graph_dag_node = GraphDAGNode(self._session, op)
return graph_dag_node
[docs] def add_vertices(
self, vertices, label="_", properties=None, vid_field: Union[int, str] = 0
):
"""Add vertices to the graph, and return a new graph.
Args:
vertices (Union[str, Loader]): Vertex data source.
label (str, optional): Vertex label name. Defaults to "_".
properties (list[str], optional): List of column names loaded as properties. Defaults to None.
vid_field (int or str, optional): Column index or property name used as id field. Defaults to 0.
Raises:
ValueError: If the given value is invalid or conflict with current graph.
Returns:
:class:`graphscope.framework.graph.GraphDAGNode`:
A new graph with vertex added, evaluated in eager mode.
"""
if self._vertex_map == graph_def_pb2.LOCAL_VERTEX_MAP:
raise ValueError(
"Cannot incrementally add vertices to graphs with local vertex map, "
"please use `graphscope.load_from()` instead."
)
if self._compact_edges:
raise ValueError(
"Cannot incrementally add vertices to graphs with compacted edges, "
"please use `graphscope.load_from()` instead."
)
if not self._v_labels and self._e_labels:
raise ValueError("Cannot manually add vertices after inferred vertices.")
# currently not support local_vertex_map
if label in self._v_labels:
self._extend_label_data = 1
warnings.warn(
f"Label {label} already existed in graph"
", origin label data will be extend."
)
unsealed_vertices_and_edges = deepcopy(self._unsealed_vertices_and_edges)
vertex_label = VertexLabel(
label=label,
loader=vertices,
properties=properties,
vid_field=vid_field,
id_type=self._oid_type,
session_id=self._session.session_id,
)
unsealed_vertices_and_edges.append((self.op.key, vertex_label))
v_labels = deepcopy(self._v_labels)
if self._extend_label_data == 0:
v_labels.append(label)
# generate and add a loader op to dag
loader_op = dag_utils.create_loader(vertex_label)
self._session.dag.add_op(loader_op)
# construct add label op
op = dag_utils.add_labels_to_graph(self, loader_op)
# construct dag node
graph_dag_node = GraphDAGNode(
self._session,
op,
self._oid_type,
self._vid_type,
self._directed,
self._generate_eid,
self._retain_oid,
self._vertex_map,
self._compact_edges,
self._use_perfect_hash,
)
graph_dag_node._v_labels = v_labels
graph_dag_node._e_labels = self._e_labels
graph_dag_node._e_relationships = self._e_relationships
graph_dag_node._unsealed_vertices_and_edges = unsealed_vertices_and_edges
graph_dag_node._base_graph = self
return graph_dag_node
[docs] def add_edges(
self,
edges,
label="_e",
properties=None,
src_label=None,
dst_label=None,
src_field: Union[int, str] = 0,
dst_field: Union[int, str] = 1,
):
"""Add edges to the graph, and return a new graph.
Here the src_label and dst_label must be both specified or both unspecified,
i. src_label and dst_label both unspecified and current graph has no vertex label.
We deduce vertex label from edge table, and set vertex label name to '_'.
ii. src_label and dst_label both unspecified and current graph has one vertex label.
We set src_label and dst label to this single vertex label.
ii. src_label and dst_label both specified and existed in current graph's vertex labels.
iii. src_label and dst_label both specified and some are not existed in current graph's vertex labels.
We deduce missing vertex labels from edge tables.
Args:
edges (Union[str, Loader]): Edge data source.
label (str, optional): Edge label name. Defaults to "_e".
properties (list[str], optional): List of column names loaded as properties. Defaults to None.
src_label (str, optional): Source vertex label. Defaults to None.
dst_label (str, optional): Destination vertex label. Defaults to None.
src_field (int, optional): Column index or name used as src field. Defaults to 0.
dst_field (int, optional): Column index or name used as dst field. Defaults to 1.
Raises:
ValueError: If the given value is invalid or conflict with current graph.
Returns:
:class:`graphscope.framework.graph.GraphDAGNode`:
A new graph with edge added, evaluated in eager mode.
"""
if self._compact_edges:
raise ValueError(
"Cannot incrementally add edges to graphs with compacted edges, "
"please use `graphscope.load_from()` instead."
)
if src_label is None and dst_label is None:
check_argument(
len(self._v_labels) <= 1,
"Ambiguous vertex label, please specify the src_label and dst_label.",
)
if len(self._v_labels) == 1:
src_label = dst_label = self._v_labels[0]
else:
src_label = dst_label = "_"
if src_label is None or dst_label is None:
raise ValueError(
"src and dst label must be both specified or either unspecified."
)
check_argument(
src_field != dst_field, "src and dst field cannot refer to the same field"
)
if self.evaluated:
if label in self._e_labels:
self._extend_label_data = 2
unsealed_vertices = list()
unsealed_edges = list()
v_labels = deepcopy(self._v_labels)
e_labels = deepcopy(self._e_labels)
relations = deepcopy(self._e_relationships)
if src_label not in self._v_labels:
logger.warning("Deducing vertex labels %s", src_label)
v_labels.append(src_label)
if src_label != dst_label and dst_label not in self._v_labels:
logger.warning("Deducing vertex labels %s", dst_label)
v_labels.append(dst_label)
parent = self
if not self.evaluated and label in self.e_labels:
# aggregate op with the same edge label
fork = False
unsealed_vertices_and_edges = list()
for parent_op_key, unsealed_v_or_e in self._unsealed_vertices_and_edges:
if (
isinstance(unsealed_v_or_e, EdgeLabel)
and unsealed_v_or_e.label == label
):
parent = self._backtrack_graph_dag_node_by_op_key(parent_op_key)
cur_label = unsealed_v_or_e
cur_label.add_sub_label(
EdgeSubLabel(
edges,
properties,
src_label,
dst_label,
src_field,
dst_field,
id_type=self._oid_type,
)
)
fork = True
else:
unsealed_vertices_and_edges.append((parent_op_key, unsealed_v_or_e))
if fork:
if isinstance(unsealed_v_or_e, VertexLabel):
unsealed_vertices.append(unsealed_v_or_e)
else:
unsealed_edges.append(unsealed_v_or_e)
unsealed_edges.append(cur_label)
unsealed_vertices_and_edges.append((parent.op.key, cur_label))
else:
unsealed_vertices_and_edges = deepcopy(self._unsealed_vertices_and_edges)
e_labels.append(label)
relations.append([(src_label, dst_label)])
cur_label = EdgeLabel(label, self._oid_type, self._session.session_id)
cur_label.add_sub_label(
EdgeSubLabel(
edges,
properties,
src_label,
dst_label,
src_field,
dst_field,
id_type=self._oid_type,
)
)
unsealed_edges.append(cur_label)
unsealed_vertices_and_edges.append((parent.op.key, cur_label))
# generate and add a loader op to dag
loader_op = dag_utils.create_loader(unsealed_vertices + unsealed_edges)
self._session.dag.add_op(loader_op)
# construct add label op
op = dag_utils.add_labels_to_graph(parent, loader_op)
# construct dag node
graph_dag_node = GraphDAGNode(
self._session,
op,
self._oid_type,
self._vid_type,
self._directed,
self._generate_eid,
self._retain_oid,
self._vertex_map,
self._compact_edges,
self._use_perfect_hash,
)
graph_dag_node._v_labels = v_labels
graph_dag_node._e_labels = e_labels
graph_dag_node._e_relationships = relations
graph_dag_node._unsealed_vertices_and_edges = unsealed_vertices_and_edges
graph_dag_node._base_graph = parent
return graph_dag_node
[docs] def consolidate_columns(
self,
label: str,
columns: Union[List[str], Tuple[str]],
result_column: str,
):
"""Consolidate columns of given vertex / edge properties (of same type) into one column.
For example, if we have a graph with vertex label "person", and edge labels "knows"
and "follows", and we want to consolidate the "weight0", "weight1" properties of the
vertex and both edges into a new column "weight", we can do:
.. code:: python
>>> g = ...
>>> g = g.consolidate_columns("person", ["weight0", "weight1"], "weight")
>>> g = g.consolidate_columns("knows", ["weight0", "weight1"], "weight")
>>> g = g.consolidate_columns("follows", ["weight0", "weight1"], "weight")
Args:
label: the label of the vertex or edge.
columns (dict): the properties of given vertex or edge to be consolidated.
result_column: the name of the new column.
Returns:
:class:`graphscope.framework.graph.GraphDAGNode`:
A new graph with column consolidated, evaluated in eager mode.
"""
check_argument(
isinstance(columns, (list, tuple)),
"columns must be a list or tuple of strings",
)
op = dag_utils.consolidate_columns(self, label, columns, result_column)
graph_dag_node = GraphDAGNode(
self._session,
op,
self._oid_type,
self._vid_type,
self._directed,
self._generate_eid,
self._retain_oid,
self._vertex_map,
self._compact_edges,
self._use_perfect_hash,
)
graph_dag_node._base_graph = self
return graph_dag_node
def _backtrack_graph_dag_node_by_op_key(self, key):
if self.op.key == key:
return self
graph_dag_node = self._base_graph
while graph_dag_node is not None:
if graph_dag_node.op.key == key:
return graph_dag_node
graph_dag_node = graph_dag_node._base_graph
[docs] def add_column(self, results, selector):
"""Add the results as a column to the graph. Modification rules are given by the selector.
Args:
results: A instance of concrete class derive from (:class:`graphscope.framework.context.BaseContextDAGNode`):
A context that created by doing an app query on a graph, and holds the corresponding results.
selector (dict): Select results to add as column.
Format is similar to selectors in :class:`graphscope.framework.context.Context`
Returns:
:class:`graphscope.framework.graph.GraphDAGNode`:
A new graph with new columns, evaluated in eager mode.
"""
check_argument(
isinstance(selector, Mapping), "selector of add column must be a dict"
)
for key, value in selector.items():
results._check_selector(value)
selector = json.dumps(selector)
op = dag_utils.add_column(self, results, selector)
graph_dag_node = GraphDAGNode(
self._session,
op,
vertex_map=self._vertex_map,
compact_edges=self._compact_edges,
use_perfect_hash=self._use_perfect_hash,
)
graph_dag_node._base_graph = self
return graph_dag_node
def __del__(self):
try:
self.session.run(self._unload())
except Exception: # pylint: disable=broad-except
pass
def _unload(self):
"""Unload this graph from graphscope engine.
Returns:
:class:`graphscope.framework.graph.UnloadedGraph`: Evaluated in eager mode.
"""
return UnloadedGraph(self._session, self._unload_op)
[docs] def project(
self,
vertices: Mapping[str, Union[List[str], None]],
edges: Mapping[str, Union[List[str], None]],
):
"""Project a subgraph from the property graph, and return a new graph.
A graph produced by project just like a normal property graph, and can be projected further.
Args:
vertices (dict):
key is the vertex label name, the value is a list of str, which represents the
name of properties. Specifically, it will select all properties if value is None.
Note that, the label of the vertex in all edges you want to project should be included.
edges (dict):
key is the edge label name, the value is a list of str, which represents the
name of properties. Specifically, it will select all properties if value is None.
Returns:
:class:`graphscope.framework.graph.GraphDAGNode`:
A new graph projected from the property graph, evaluated in eager mode.
"""
check_argument(self.graph_type == graph_def_pb2.ARROW_PROPERTY)
if isinstance(vertices, (list, set)) or isinstance(edges, (list, set)):
raise ValueError(
"\nThe project vertices or edges cannot be a set or a list, rather, a dict is expected, \n"
"where the key is the label name and the value is a list of property name. E.g.,\n"
"\n"
" g.project(vertices={'person': ['name', 'age']},\n"
" edges={'knows': ['weight']})\n"
"\n"
"The property list for vertices and edges can be empty if not needed, e.g.,\n"
"\n"
" g.project(vertices={'person': []}, edges={'knows': []})\n"
)
op = dag_utils.project_arrow_property_graph(
self, json.dumps(vertices), json.dumps(edges)
)
# construct dag node
graph_dag_node = GraphDAGNode(
self._session,
op,
self._oid_type,
self._vid_type,
self._directed,
self._generate_eid,
self._retain_oid,
self._vertex_map,
self._compact_edges,
self._use_perfect_hash,
)
graph_dag_node._base_graph = self
return graph_dag_node
[docs]class Graph(GraphInterface):
"""A class for representing metadata of a graph in the GraphScope.
A :class:`Graph` object holds the metadata of a graph, such as key, schema, and the graph is directed or not.
It is worth noticing that the graph is stored by the backend such as Analytical Engine, Vineyard.
In other words, the graph object holds nothing but metadata.
The following example demonstrates its usage:
.. code:: python
>>> import graphscope as gs
>>> sess = gs.session()
>>> graph = sess.g()
>>> graph = graph.add_vertices("person.csv", "person")
>>> graph = graph.add_vertices("software.csv", "software")
>>> graph = graph.add_edges("knows.csv", "knows", src_label="person", dst_label="person")
>>> graph = graph.add_edges("created.csv", "created", src_label="person", dst_label="software")
>>> print(graph)
>>> print(graph.schema)
"""
[docs] def __init__(
self,
graph_node,
):
"""Construct a :class:`Graph` object."""
self._graph_node = graph_node
self._session = self._graph_node.session
# copy and set op evaluated
self._graph_node.op = deepcopy(self._graph_node.op)
self._graph_node.evaluated = True
self._graph_node._unload_op = dag_utils.unload_graph(self._graph_node)
self._session.dag.add_op(self._graph_node.op)
self._key = None
self._vineyard_id = 0
self._fragments = None
self._schema = GraphSchema()
self._detached = False
self._vertex_map = graph_node._vertex_map
self._compact_edges = graph_node._compact_edges
self._use_perfect_hash = graph_node._use_perfect_hash
self._interactive_instance_list = []
self._learning_instance_list = []
def update_from_graph_def(self, graph_def):
if graph_def.graph_type == graph_def_pb2.ARROW_FLATTENED:
self._graph_node._graph_type = graph_def_pb2.ARROW_FLATTENED
check_argument(
self._graph_node.graph_type == graph_def.graph_type,
"Graph type doesn't match {} versus {}".format(
self._graph_node.graph_type, graph_def.graph_type
),
)
self._key = graph_def.key
self._directed = graph_def.directed
self._is_multigraph = graph_def.is_multigraph
self._compact_edges = graph_def.compact_edges
self._use_perfect_hash = graph_def.use_perfect_hash
vy_info = graph_def_pb2.VineyardInfoPb()
graph_def.extension.Unpack(vy_info)
self._vineyard_id = vy_info.vineyard_id
self._fragments = list(vy_info.fragments)
self._oid_type = data_type_to_cpp(vy_info.oid_type)
self._vid_type = data_type_to_cpp(vy_info.vid_type)
self._generate_eid = vy_info.generate_eid
self._retain_oid = vy_info.retain_oid
self._schema_path = vy_info.schema_path
self._schema.from_graph_def(graph_def)
self._v_labels = self._schema.vertex_labels
self._e_labels = self._schema.edge_labels
self._e_relationships = self._schema.edge_relationships
# init saved_signature (must be after init schema)
self._saved_signature = self.signature
def __getattr__(self, name):
if hasattr(self._graph_node, name):
return getattr(self._graph_node, name)
raise AttributeError("{0} not found.".format(name))
@property
def key(self):
"""The key of the corresponding graph in engine."""
return self._key
@property
def schema(self):
"""Schema of the graph.
Returns:
:class:`GraphSchema`: the schema of the graph
"""
return self._schema
@property
def schema_path(self):
"""Path that Coordinator will write interactive schema path to.
Returns:
str: The path contains the schema. for interactive engine.
"""
return self._schema_path
@property
def signature(self):
return hashlib.sha256(
"{}.{}".format(self._schema.signature(), self._key).encode(
"utf-8", errors="ignore"
)
).hexdigest()
@property
def op(self):
return self._graph_node.op
@property
def oid_type(self):
return self._graph_node.oid_type
@property
def vid_type(self):
return self._graph_node.vid_type
@property
def template_str(self):
# transform str/string to std::string
oid_type = utils.normalize_data_type_str(self._oid_type)
vid_type = utils.normalize_data_type_str(self._vid_type)
vdata_type = utils.data_type_to_cpp(self._schema.vdata_type)
edata_type = utils.data_type_to_cpp(self._schema.edata_type)
vertex_map_type = utils.vertex_map_type_to_cpp(self._vertex_map)
vertex_map_type = f"{vertex_map_type}<{oid_type},{vid_type}>"
compact_type = "true" if self._compact_edges else "false"
if self._graph_type == graph_def_pb2.ARROW_PROPERTY:
template = f"vineyard::ArrowFragment<{oid_type},{vid_type},{vertex_map_type},{compact_type}>"
elif self._graph_type == graph_def_pb2.ARROW_PROJECTED:
template = f"gs::ArrowProjectedFragment<{oid_type},{vid_type},{vdata_type},{edata_type},{vertex_map_type},{compact_type}>" # noqa: E501
elif self._graph_type == graph_def_pb2.ARROW_FLATTENED:
template = f"ArrowFlattenedFragment<{oid_type},{vid_type},{vdata_type},{edata_type},{compact_type}>"
elif self._graph_type == graph_def_pb2.DYNAMIC_PROJECTED:
template = f"gs::DynamicProjectedFragment<{vdata_type},{edata_type}>"
else:
raise ValueError(f"Unsupported graph type: {self._graph_type}")
return template
@property
def vineyard_id(self):
"""Get the vineyard object_id of this graph.
Returns:
str: return vineyard id of this graph
"""
return self._vineyard_id
@property
def fragments(self):
return self._fragments
@property
def session_id(self):
"""Get the currrent session_id.
Returns:
str: Return session id that the graph belongs to.
"""
return self._session.session_id
[docs] def detach(self):
"""Detaching a graph makes it being left in vineyard even when the varaible for
this :class:`Graph` object leaves the lexical scope.
The graph can be accessed using the graph's :code:`ObjectID` or its name later.
"""
self._detached = True
[docs] def loaded(self):
"""True if current graph has been loaded in the session."""
return self._session.info["status"] == "active" and self._key is not None
def __str__(self):
v_str = "\n".join([f"VERTEX: {label}" for label in self._v_labels])
relations = []
for i in range(len(self._e_labels)):
relations.extend(
[(self._e_labels[i], src, dst) for src, dst in self._e_relationships[i]]
)
e_str = "\n".join(
[f"EDGE: {label}\tsrc: {src}\tdst: {dst}" for label, src, dst in relations]
)
return f"graphscope.Graph\n{graph_def_pb2.GraphTypePb.Name(self._graph_type)}\n{v_str}\n{e_str}"
def __repr__(self):
return self.__str__()
def _unload(self):
"""Unload this graph from graphscope engine."""
rlt = None
if self._session.info["status"] != "active" or self._key is None:
return
if self._detached:
return
# close the associated interactive and learning instances
self._close_interactive_instances()
self._close_learning_instances()
# unload the graph
rlt = self._session._wrapper(self._graph_node._unload())
self._key = None
return rlt
def __del__(self):
# cleanly ignore all exceptions, cause session may already closed / destroyed.
try:
self._session.run(self._unload())
except Exception: # pylint: disable=broad-except
pass
@apply_docstring(GraphDAGNode._project_to_simple)
def _project_to_simple(self, v_prop=None, e_prop=None):
return self._session._wrapper(
self._graph_node._project_to_simple(v_prop, e_prop)
)
[docs] @apply_docstring(GraphDAGNode.add_column)
def add_column(self, results, selector):
return self._session._wrapper(self._graph_node.add_column(results, selector))
[docs] def to_numpy(self, selector, vertex_range=None):
"""Select some elements of the graph and output to numpy.
Args:
selector (str): Select a portion of graph as a numpy.ndarray.
vertex_range(dict, optional): Slice vertices. Defaults to None.
Returns:
`numpy.ndarray`
"""
self._check_unmodified()
return self._session._wrapper(self._graph_node.to_numpy(selector, vertex_range))
[docs] def to_dataframe(self, selector, vertex_range=None):
"""Select some elements of the graph and output as a pandas.DataFrame
Args:
selector (dict): Select some portions of graph.
vertex_range (dict, optional): Slice vertices. Defaults to None.
Returns:
`pandas.DataFrame`
"""
self._check_unmodified()
return self._session._wrapper(
self._graph_node.to_dataframe(selector, vertex_range)
)
[docs] def to_directed(self):
"""Returns a directed representation of the graph.
Returns:
:class:`Graph`: A directed graph with the same name, same nodes, and
with each edge (u, v, data) replaced by two directed edges (u, v, data) and (v, u, data).
"""
if self._directed:
return self
return self._session._wrapper(self._graph_node.to_directed())
[docs] def to_undirected(self):
"""Returns an undirected representation of the digraph.
Returns:
:class:`Graph`: An undirected graph with the same name and nodes and
with edge (u, v, data) if either (u, v, data) or (v, u, data) is in the digraph.
If both edges exist in digraph, they will both be preserved.
You must check and correct for this manually if desired.
"""
if not self._directed:
return self
return self._session._wrapper(self._graph_node.to_undirected())
def is_directed(self):
return self._directed
def is_multigraph(self):
return self._is_multigraph
def _check_unmodified(self):
check_argument(
self.signature == self._saved_signature, "Graph has been modified!"
)
@staticmethod
def _load_from_graphar(path, sess, **kwargs):
# graphar now only support global vertex map.
vertex_map = utils.vertex_map_type_to_enum("global")
# oid_type = utils.get_oid_type_from_graph_info(path)
config = {
types_pb2.OID_TYPE: utils.s_to_attr(
"int64_t"
), # graphar use vertex index as oid, so it always be int64_t
types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"),
types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False),
types_pb2.IS_FROM_GAR: utils.b_to_attr(True),
types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(vertex_map),
types_pb2.COMPACT_EDGES: utils.b_to_attr(False),
types_pb2.GRAPH_INFO_PATH: utils.s_to_attr(path),
types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)),
}
op = dag_utils.create_graph(
sess.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=[], attrs=config
)
return sess._wrapper(GraphDAGNode(sess, op))
[docs] @classmethod
def load_from(cls, uri, sess=None, **kwargs):
"""Load a ArrowProperty graph from a certain data source. The data source
can be vineyard serialized files, graphar serialized files, or other data
sources supported by graphscope.
Args:
uri (str): URI contains the description of the data source or
path contains the serialization files,
example: "graphar+file:///tmp/graphar/xxx"
sess (`graphscope.Session`): The target session that the graph
will be construct, if None, use the default session.
selector (dict, optional): the selector to select the data to read.
graphar_store_in_local (bool, optional): whether store graphar format in local, default is False.
Returns:
`Graph`: A new graph object.
"""
from graphscope.client.session import get_default_session
def _check_load_options(load_options):
for k, v in load_options.items():
if k == "selector":
if not isinstance(v, dict):
raise ValueError(
"selector should be a dict, but got {}".format(type(v))
)
elif k == "graphar_store_in_local":
if not isinstance(v, bool):
raise ValueError(
"graphar_store_in_local should be a bool, but got {}".format(
v
)
)
if sess is None:
sess = get_default_session()
uri_str = uri
uri = urlparse(uri)
if uri.scheme and "+" in uri.scheme:
source = uri.scheme.split("+")[0]
if uri.scheme.split("+")[-1] not in ["file", "s3", "oss", "hdfs"]:
raise ValueError(
"Unknown file system %s, currently only support file, s3, oss and hdfs"
% uri.scheme.split("+")[-1]
)
path = uri.scheme.split("+")[-1] + "://" + uri.netloc + uri.path
if source == "graphar":
_check_load_options(kwargs)
return cls._load_from_graphar(path, sess, **kwargs)
else:
raise ValueError("Unknown source %s with uri $s:" % source, uri_str)
else:
# not a uri string, assume it is a path for deserialization
op = dag_utils.deserialize_graph(uri_str, sess, **kwargs)
return sess._wrapper(GraphDAGNode(sess, op))
[docs] def save_to(
self,
path,
format="serialization",
**kwargs,
):
"""Save graph to specified location with specified format.
Args:
path (str): the directory path to write graph.
format (str): the format to write graph, default is "serialization".
selector (dict, optional): the selector to select the data to write.
graphar_graph_name (str, optional): the name of graph in graphar format.
graphar_file_type (str, optional): the file type of graphar format,
support "parquet", "orc", "csv", default is "parquet".
graphar_vertex_chunk_size (int, optional): the chunk size of vertex in graphar format, default is 2^18.
graphar_edge_chunk_size (int, optional): the chunk size of edge in graphar format, default is 2^22.
graphar_store_in_local (bool, optional): whether store graphar format in local, default is False.
Return (dict): A dict contains the type and uri string of output data.
"""
def _check_write_options(write_options):
for k, v in write_options.items():
if k == "graphar_graph_name" and not isinstance(v, str):
raise ValueError(
"graphar_graph_name should be a string, but got {}".format(
type(v)
)
)
elif k == "graphar_file_type" and v not in ["parquet", "orc", "csv"]:
raise ValueError(
"graphar_file_type should be one of ['parquet', 'orc', 'csv'], but got {}".format(
v
)
)
elif k == "graphar_vertex_chunk_size":
if not isinstance(v, int) or v <= 0:
raise ValueError(
"graphar_vertex_chunk_size should be a positive integer, but got {}".format(
v
)
)
elif k == "graphar_edge_chunk_size":
if not isinstance(v, int) or v <= 0:
raise ValueError(
"graphar_edge_chunk_size should be a positive integer, but got {}".format(
v
)
)
elif k == "graphar_store_in_local":
if not isinstance(v, bool):
raise ValueError(
"graphar_store_in_local should be a bool, but got {}".format(
v
)
)
elif k == "selector":
if not isinstance(v, dict):
raise ValueError(
"selector should be a dict, but got {}".format(type(v))
)
if format == "graphar":
if "graphar_graph_name" not in kwargs:
kwargs["graphar_graph_name"] = "graph" # default graph name
_check_write_options(kwargs)
graph_name = kwargs["graphar_graph_name"]
maybe_uri = urlparse(path)
if maybe_uri.scheme and maybe_uri.scheme not in [
"file",
"s3",
"oss",
"hdfs",
]:
raise ValueError(
"Unknown file system %s, currently only support file, s3, oss and hdfs"
% maybe_uri.scheme
)
if not maybe_uri.scheme:
maybe_uri = maybe_uri._replace(scheme="file")
op = dag_utils.save_to_graphar(self, path, **kwargs)
self._session.dag.add_op(op)
self._session._wrapper(op)
return {
"type": format,
"URI": "graphar+" + maybe_uri.geturl() + graph_name + ".graph.yaml",
}
elif format == "serialization":
# serialize graph
op = dag_utils.serialize_graph(self, path, **kwargs)
self._session.dag.add_op(op)
self._session._wrapper(op)
return {"type": format, "URI": path}
else:
raise ValueError("Unknown format: %s" % format)
[docs] @apply_docstring(GraphDAGNode.add_vertices)
def add_vertices(
self, vertices, label="_", properties=None, vid_field: Union[int, str] = 0
) -> Union["Graph", GraphDAGNode]:
if not self.loaded():
raise RuntimeError("The graph is not loaded")
return self._session._wrapper(
self._graph_node.add_vertices(vertices, label, properties, vid_field)
)
[docs] @apply_docstring(GraphDAGNode.add_edges)
def add_edges(
self,
edges,
label="_",
properties=None,
src_label=None,
dst_label=None,
src_field: Union[int, str] = 0,
dst_field: Union[int, str] = 1,
) -> Union["Graph", GraphDAGNode]:
if not self.loaded():
raise RuntimeError("The graph is not loaded")
return self._session._wrapper(
self._graph_node.add_edges(
edges, label, properties, src_label, dst_label, src_field, dst_field
)
)
[docs] @apply_docstring(GraphDAGNode.consolidate_columns)
def consolidate_columns(
self,
label: str,
columns: Union[List[str], Tuple[str]],
result_column: str,
) -> Union["Graph", GraphDAGNode]:
if not self.loaded():
raise RuntimeError("The graph is not loaded")
return self._session._wrapper(
self._graph_node.consolidate_columns(label, columns, result_column)
)
[docs] @apply_docstring(GraphDAGNode.project)
def project(
self,
vertices: Mapping[str, Union[List[str], None]],
edges: Mapping[str, Union[List[str], None]],
) -> Union["Graph", GraphDAGNode]:
if not self.loaded():
raise RuntimeError("The graph is not loaded")
return self._session._wrapper(self._graph_node.project(vertices, edges))
def _attach_interactive_instance(self, instance):
"""Store the instance when a new interactive instance is started.
Args:
instance: interactive instance
"""
self._interactive_instance_list.append(instance)
def _attach_learning_instance(self, instance):
"""Store the instance when a new learning instance is created.
Args:
instance: learning instance
"""
self._learning_instance_list.append(instance)
def _close_interactive_instances(self):
for instance in self._interactive_instance_list:
try:
instance.close()
except Exception: # pylint: disable=broad-except
logger.exception("Failed to close interactive instances")
self._interactive_instance_list.clear()
def _close_learning_instances(self):
for instance in self._learning_instance_list:
try:
instance.close()
except Exception: # pylint: disable=broad-except
logger.exception("Failed to close interactive instances")
self._learning_instance_list.clear()
class UnloadedGraph(DAGNode):
"""Unloaded graph node in a DAG."""
def __init__(self, session, op):
self._session = session
self._op = op
# add op to dag
self._session.dag.add_op(self._op)