Source code for mindquantum.algorithm.compiler.dag.dag

# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""DAG Circuit."""
import typing

from mindquantum.core import Circuit, gates
from mindquantum.utils.type_value_check import _check_input_type

# pylint: disable=invalid-name


[docs]class DAGNode: """ Basic node in Directed Acyclic Graph. A DAG node has local index, which label the index of leg of node, and child nodes and father nodes. """ def __init__(self): """Initialize a DAGNode object.""" self.child: typing.Dict[int, "DAGNode"] = {} # key: local index, value: child DAGNode self.father: typing.Dict[int, "DAGNode"] = {} # key: local index, value: father DAGNode self.local: typing.List[int] = []
[docs] def clean(self): """Clean node and set it to empty.""" self.child = {} self.father = {} self.local = []
[docs] def insert_after(self, other_node: "DAGNode"): """ Insert other node after this dag node. Args: other_node (:class:`~.algorithm.compiler.DAGNode`): other DAG node. """ _check_input_type("other_node", DAGNode, other_node) for local in self.local: if local in other_node.local: other_node.father[local] = self if local in self.child: other_node.child[local] = self.child.get(local) self.child.get(local).fathre[local] = other_node self.child[local] = other_node
[docs] def insert_before(self, other_node: "DAGNode"): """ Insert other node before this dag node. Args: other_node (:class:`~.algorithm.compiler.DAGNode`): other DAG node. """ _check_input_type("other_node", DAGNode, other_node) for local in self.local: if local in other_node.local: other_node.child[local] = self if local in self.father: other_node.father[local] = self.father.get(local) self.father.get(local).child[local] = other_node self.father[local] = other_node
[docs]def connect_two_node(father_node: DAGNode, child_node: DAGNode, local_index: int): """ Connect two DAG node through given local_index. Args: father_node (DAGNode): The father DAG node. child_node (DAGNode): The child DAG node. local_index (int): which leg you want to connect. """ if local_index not in father_node.local or local_index not in child_node.local: raise ValueError( f"local_index {local_index} not in father_node" f" {father_node} or not in child_node {child_node}." ) father_node.child[local_index] = child_node child_node.father[local_index] = father_node
[docs]class DAGQubitNode(DAGNode): """ DAG node that work as quantum qubit. Args: qubit (int): id of qubit. """ def __init__(self, qubit: int): """Initialize a DAGQubitNode object.""" super().__init__() _check_input_type("qubit", int, qubit) self.qubit = qubit self.local = [qubit] def __str__(self): """Return a string representation of qubit node.""" return f"q{self.qubit}" def __repr__(self): """Return a string representation of qubit node.""" return self.__str__()
[docs]class GateNode(DAGNode): """ DAG node that work as quantum gate. Args: gate (:class:`~.core.gates.BasicGate`): Quantum gate. """ def __init__(self, gate: gates.BasicGate): """Initialize a GateNode object.""" super().__init__() _check_input_type("gate", gates.BasicGate, gate) self.gate = gate self.local = gate.obj_qubits + gate.ctrl_qubits def __str__(self): """Return a string representation of gate node.""" return str(self.gate) def __repr__(self): """Return a string representation of gate node.""" return self.__str__()
class BarrierNode(GateNode): """DAG node that work as barrier.""" def __init__(self, gate: gates.BasicGate, all_qubits: typing.List[int]): """Initialize a BarrierNode object.""" super().__init__(gate) self.local = all_qubits
[docs]class DAGCircuit: """ A Directed Acyclic Graph of a quantum circuit. Args: circuit (:class:`~.core.circuit.Circuit`): the input quantum circuit. Examples: >>> from mindquantum.algorithm.compiler import DAGCircuit >>> from mindquantum.core.circuit import Circuit >>> circ = Circuit().h(0).x(1, 0) >>> dag_circ = DAGCircuit(circ) >>> dag_circ.head_node[0] q0 >>> dag_circ.head_node[0].child {0: H(0)} """ def __init__(self, circuit: Circuit): """Initialize a DAGCircuit object.""" _check_input_type("circuit", Circuit, circuit) self.head_node = {i: DAGQubitNode(i) for i in sorted(circuit.all_qubits.keys())} self.final_node = {i: DAGQubitNode(i) for i in sorted(circuit.all_qubits.keys())} for i in self.head_node: self.head_node[i].insert_after(self.final_node[i]) for gate in circuit: if isinstance(gate, gates.BarrierGate): if gate.obj_qubits: self.append_node(BarrierNode(gate, sorted(gate.obj_qubits))) else: self.append_node(BarrierNode(gate, sorted(circuit.all_qubits.keys()))) else: self.append_node(GateNode(gate)) self.global_phase = gates.GlobalPhase(0)
[docs] @staticmethod def replace_node_with_dag_circuit(node: DAGNode, coming: "DAGCircuit"): """ Replace a node with a DAGCircuit. Args: node (:class:`~.algorithm.compiler.DAGNode`): the original DAG node. coming (:class:`~.algorithm.compiler.DAGCircuit`): the coming DAG circuit. Examples: >>> from mindquantum.algorithm.compiler import DAGCircuit >>> from mindquantum.core.circuit import Circuit >>> circ = Circuit().x(1, 0) >>> circ q0: ────■───── ┏━┻━┓ q1: ──┨╺╋╸┠─── ┗━━━┛ >>> dag_circ = DAGCircuit(circ) >>> node = dag_circ.head_node[0].child[0] >>> node X(1 <-: 0) >>> sub_dag = DAGCircuit(Circuit().h(1).z(1, 0).h(1)) >>> DAGCircuit.replace_node_with_dag_circuit(node, sub_dag) >>> dag_circ.to_circuit() q0: ──────────■─────────── ┏━━━┓ ┏━┻━┓ ┏━━━┓ q1: ──┨ H ┠─┨ Z ┠─┨ H ┠─── ┗━━━┛ ┗━━━┛ ┗━━━┛ """ if set(node.local) != {head.qubit for head in coming.head_node.values()}: raise ValueError(f"Circuit in coming DAG is not aligned with gate in node: {node}") for local in node.local: connect_two_node(node.father[local], coming.head_node[local].child[local], local) connect_two_node(coming.final_node[local].father[local], node.child[local], local)
[docs] def append_node(self, node: DAGNode): """ Append a quantum gate node. Args: node (:class:`~.algorithm.compiler.DAGNode`): the DAG node you want to append. Examples: >>> from mindquantum.algorithm.compiler import DAGCircuit, GateNode >>> from mindquantum.core.circuit import Circuit >>> import mindquantum.core.gates as G >>> circ = Circuit().h(0).x(1, 0) >>> circ ┏━━━┓ q0: ──┨ H ┠───■───── ┗━━━┛ ┃ ┏━┻━┓ q1: ────────┨╺╋╸┠─── ┗━━━┛ >>> dag_circ = DAGCircuit(circ) >>> node = GateNode(G.RX('a').on(0, 2)) >>> dag_circ.append_node(node) >>> dag_circ.to_circuit() ┏━━━┓ ┏━━━━━━━┓ q0: ──┨ H ┠───■───┨ RX(a) ┠─── ┗━━━┛ ┃ ┗━━━┳━━━┛ ┏━┻━┓ ┃ q1: ────────┨╺╋╸┠─────╂─────── ┗━━━┛ ┃ q2: ──────────────────■─────── """ _check_input_type('node', DAGNode, node) for local in node.local: if local not in self.head_node: self.head_node[local] = DAGQubitNode(local) self.final_node[local] = DAGQubitNode(local) self.head_node[local].insert_after(self.final_node[local]) self.final_node[local].insert_before(node)
[docs] def depth(self) -> int: """ Return the depth of quantum circuit. Examples: >>> from mindquantum.core.circuit import Circuit >>> from mindquantum.algorithm.compiler import DAGCircuit >>> circ = Circuit().h(0).h(1).x(1, 0) >>> circ ┏━━━┓ q0: ──┨ H ┠───■───── ┗━━━┛ ┃ ┏━━━┓ ┏━┻━┓ q1: ──┨ H ┠─┨╺╋╸┠─── ┗━━━┛ ┗━━━┛ >>> DAGCircuit(circ).depth() 2 """ return len(self.layering())
[docs] def find_all_gate_node(self) -> typing.List[GateNode]: """ Find all gate node in this :class:`~.algorithm.compiler.DAGCircuit`. Returns: List[:class:`~.algorithm.compiler.GateNode`], a list of all :class:`~.algorithm.compiler.GateNode` of this :class:`~.algorithm.compiler.DAGCircuit`. Examples: >>> from mindquantum.algorithm.compiler import DAGCircuit >>> from mindquantum.core.circuit import Circuit >>> circ = Circuit().h(0).x(1, 0) >>> dag_circ = DAGCircuit(circ) >>> dag_circ.find_all_gate_node() [H(0), X(1 <-: 0)] """ found = set(self.head_node.values()) def _find(current_node: DAGNode, found): if current_node not in found: found.add(current_node) for node in current_node.father.values(): _find(node, found) for node in current_node.child.values(): _find(node, found) for head_node in self.head_node.values(): for current_node in head_node.child.values(): _find(current_node, found) return [i for i in found if not isinstance(i, DAGQubitNode)]
[docs] def layering(self) -> typing.List[Circuit]: r""" Layering the quantum circuit. Returns: List[:class:`~.core.circuit.Circuit`], a list of layered quantum circuit. Examples: >>> from mindquantum.algorithm.compiler import DAGCircuit >>> from mindquantum.utils import random_circuit >>> circ = random_circuit(3, 5, seed=42) >>> circ ┏━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━┓ q0: ──┨ ┠─╳─┨ RY(-6.1944) ┠─────────────────── ┃ ┃ ┃ ┗━━━━━━┳━━━━━━┛ ┃ Rxx(1.2171) ┃ ┃ ┃ ┏━━━━━━━━━━━━━┓ q1: ──┨ ┠─┃────────╂────────┨ ┠─── ┗━━━━━━━━━━━━━┛ ┃ ┃ ┃ ┃ ┏━━━━━━━━━━━━┓ ┃ ┃ ┃ Rzz(-0.552) ┃ q2: ──┨ PS(2.6147) ┠──╳────────■────────┨ ┠─── ┗━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━━┛ >>> dag_circ = DAGCircuit(circ) >>> for idx, c in enumerate(dag_circ.layering()): ... print(f"layer {idx}:") ... print(c) layer 0: ┏━━━━━━━━━━━━━┓ q0: ──┨ ┠─── ┃ ┃ ┃ Rxx(1.2171) ┃ q1: ──┨ ┠─── ┗━━━━━━━━━━━━━┛ ┏━━━━━━━━━━━━┓ q2: ──┨ PS(2.6147) ┠──── ┗━━━━━━━━━━━━┛ layer 1: q0: ──╳─── q2: ──╳─── layer 2: ┏━━━━━━━━━━━━━┓ q0: ──┨ RY(-6.1944) ┠─── ┗━━━━━━┳━━━━━━┛ q2: ─────────■────────── layer 3: ┏━━━━━━━━━━━━━┓ q1: ──┨ ┠─── ┃ ┃ ┃ Rzz(-0.552) ┃ q2: ──┨ ┠─── ┗━━━━━━━━━━━━━┛ """ def _layering(current_node: GateNode, depth_map): """Layering the quantum circuit.""" if current_node.father: prev_depth = [] for father_node in current_node.father.values(): if father_node not in depth_map: _layering(father_node, depth_map) prev_depth.append(depth_map[father_node]) depth_map[current_node] = max(prev_depth) + 1 for child in current_node.child.values(): if not isinstance(child, DAGQubitNode): if child not in depth_map: _layering(child, depth_map) depth_map = {i: 0 for i in self.head_node.values()} for current_node in self.head_node.values(): _layering(current_node, depth_map) layer = [Circuit() for _ in range(len(set(depth_map.values())) - 1)] for k, v in depth_map.items(): if v != 0: if not isinstance(k, BarrierNode): layer[v - 1] += k.gate return [c for c in layer if len(c) != 0]
[docs] def to_circuit(self) -> Circuit: """ Convert :class:`~.algorithm.compiler.DAGCircuit` to quantum circuit. Returns: :class:`~.core.circuit.Circuit`, the quantum circuit of this DAG. Examples: >>> from mindquantum.core.circuit import Circuit >>> from mindquantum.algorithm.compiler import DAGCircuit >>> circ = Circuit().h(0).h(1).x(1, 0) >>> circ ┏━━━┓ q0: ──┨ H ┠───■───── ┗━━━┛ ┃ ┏━━━┓ ┏━┻━┓ q1: ──┨ H ┠─┨╺╋╸┠─── ┗━━━┛ ┗━━━┛ >>> dag_circ = DAGCircuit(circ) >>> dag_circ.to_circuit() ┏━━━┓ q0: ──┨ H ┠───■───── ┗━━━┛ ┃ ┏━━━┓ ┏━┻━┓ q1: ──┨ H ┠─┨╺╋╸┠─── ┗━━━┛ ┗━━━┛ """ sorted_nodes = self.topological_sort() circuit = Circuit() considered_nodes = set(self.head_node.values()) def add_node_to_circuit(node, circuit, considered_nodes): if node not in considered_nodes: for parent in node.father.values(): add_node_to_circuit(parent, circuit, considered_nodes) considered_nodes.add(node) if isinstance(node, GateNode): circuit += node.gate for node in sorted_nodes: add_node_to_circuit(node, circuit, considered_nodes) return circuit
[docs] def topological_sort(self) -> typing.List[DAGNode]: """ Perform topological sorting on the DAG and return the nodes in topologically sorted order. Returns: List[DAGNode], List of nodes in topologically sorted order. """ from collections import deque, defaultdict in_degree = defaultdict(int) visited = set() zero_in_degree_queue = deque() # Initialize in-degrees and find initial zero in-degree nodes for node in self.head_node.values(): if node not in visited: queue = deque([node]) visited.add(node) while queue: current_node = queue.popleft() for child in current_node.child.values(): if child not in visited: in_degree[child] += 1 queue.append(child) visited.add(child) if in_degree[node] == 0: zero_in_degree_queue.append(node) # Topologically sorted nodes sorted_nodes = [] while zero_in_degree_queue: node = zero_in_degree_queue.popleft() sorted_nodes.append(node) for child in node.child.values(): in_degree[child] -= 1 if in_degree[child] == 0: zero_in_degree_queue.append(child) return [node for node in sorted_nodes if not isinstance(node, DAGQubitNode)]
# pylint: disable=too-many-return-statements,too-many-branches
[docs]def try_merge( father_node: GateNode, child_node: GateNode ) -> typing.Tuple[bool, typing.List[GateNode], gates.GlobalPhase]: """ Try to merge two gate nodes. Following this method, we merge two hermitian conjugated into identity, and also merge two same kind parameterized gate into single parameterized gate. Args: father_node (:class:`~.algorithm.compiler.GateNode`): the father node want to merge. child_node (:class:`~.algorithm.compiler.GateNode`): the child node want to merge. Returns: - bool, whether successfully merged. - List[:class:`~.algorithm.compiler.GateNode`], the father node after merged. - :class:`~.core.gates.GlobalPhase`, the global phase gate after merge two given gate node. """ if len(set(father_node.child.values())) != 1 or len(set(child_node.father.values())) != 1: return False, [], None for node in father_node.child.values(): if node != child_node: return False, [], None state, res, global_phase = father_node.gate.__merge__(child_node.gate) if not state or len(res) > 1: return False, father_node, None if res: merged_node = GateNode(res[0]) father_of_father = [] for i in father_node.local: father_of_father.append(father_node.father[i]) connect_two_node(father_node.father[i], merged_node, i) connect_two_node(merged_node, child_node.child[i], i) father_node.clean() child_node.clean() if global_phase: if global_phase.ctrl_qubits: ctrl_gp = GateNode(global_phase).local for i in ctrl_gp: connect_two_node(merged_node, ctrl_gp, i) connect_two_node(ctrl_gp, merged_node.child[i], i) return True, list(set(father_of_father)), None return True, list(set(father_of_father)), global_phase return True, list(set(father_of_father)), None father_of_father = [] if global_phase: if global_phase.ctrl_qubits: ctrl_gp = GateNode(global_phase) for i in father_node.local: father_of_father.append(father_node.father[i]) connect_two_node(father_node.father[i], ctrl_gp, i) connect_two_node(ctrl_gp, child_node.child[i], i) else: for i in father_node.local: father_of_father.append(father_node.father[i]) connect_two_node(father_node.father[i], child_node.child[i], i) father_node.clean() child_node.clean() return True, list(set(father_of_father)), (None if global_phase.ctrl_qubits else global_phase) for i in father_node.local: father_of_father.append(father_node.father[i]) connect_two_node(father_node.father[i], child_node.child[i], i) father_node.clean() child_node.clean() return True, list(set(father_of_father)), None