# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""DAG Circuit."""
import typing
from mindquantum.core import Circuit, gates
from mindquantum.utils.type_value_check import _check_input_type
# pylint: disable=invalid-name
[docs]class DAGNode:
"""
Basic node in Directed Acyclic Graph.
A DAG node has local index, which label the index of leg of node, and child nodes and father nodes.
"""
def __init__(self):
"""Initialize a DAGNode object."""
self.child: typing.Dict[int, "DAGNode"] = {} # key: local index, value: child DAGNode
self.father: typing.Dict[int, "DAGNode"] = {} # key: local index, value: father DAGNode
self.local: typing.List[int] = []
[docs] def clean(self):
"""Clean node and set it to empty."""
self.child = {}
self.father = {}
self.local = []
[docs] def insert_after(self, other_node: "DAGNode"):
"""
Insert other node after this dag node.
Args:
other_node (:class:`~.algorithm.compiler.DAGNode`): other DAG node.
"""
_check_input_type("other_node", DAGNode, other_node)
for local in self.local:
if local in other_node.local:
other_node.father[local] = self
if local in self.child:
other_node.child[local] = self.child.get(local)
self.child.get(local).fathre[local] = other_node
self.child[local] = other_node
[docs] def insert_before(self, other_node: "DAGNode"):
"""
Insert other node before this dag node.
Args:
other_node (:class:`~.algorithm.compiler.DAGNode`): other DAG node.
"""
_check_input_type("other_node", DAGNode, other_node)
for local in self.local:
if local in other_node.local:
other_node.child[local] = self
if local in self.father:
other_node.father[local] = self.father.get(local)
self.father.get(local).child[local] = other_node
self.father[local] = other_node
[docs]def connect_two_node(father_node: DAGNode, child_node: DAGNode, local_index: int):
"""
Connect two DAG node through given local_index.
Args:
father_node (DAGNode): The father DAG node.
child_node (DAGNode): The child DAG node.
local_index (int): which leg you want to connect.
"""
if local_index not in father_node.local or local_index not in child_node.local:
raise ValueError(
f"local_index {local_index} not in father_node" f" {father_node} or not in child_node {child_node}."
)
father_node.child[local_index] = child_node
child_node.father[local_index] = father_node
[docs]class DAGQubitNode(DAGNode):
"""
DAG node that work as quantum qubit.
Args:
qubit (int): id of qubit.
"""
def __init__(self, qubit: int):
"""Initialize a DAGQubitNode object."""
super().__init__()
_check_input_type("qubit", int, qubit)
self.qubit = qubit
self.local = [qubit]
def __str__(self):
"""Return a string representation of qubit node."""
return f"q{self.qubit}"
def __repr__(self):
"""Return a string representation of qubit node."""
return self.__str__()
[docs]class GateNode(DAGNode):
"""
DAG node that work as quantum gate.
Args:
gate (:class:`~.core.gates.BasicGate`): Quantum gate.
"""
def __init__(self, gate: gates.BasicGate):
"""Initialize a GateNode object."""
super().__init__()
_check_input_type("gate", gates.BasicGate, gate)
self.gate = gate
self.local = gate.obj_qubits + gate.ctrl_qubits
def __str__(self):
"""Return a string representation of gate node."""
return str(self.gate)
def __repr__(self):
"""Return a string representation of gate node."""
return self.__str__()
class BarrierNode(GateNode):
"""DAG node that work as barrier."""
def __init__(self, gate: gates.BasicGate, all_qubits: typing.List[int]):
"""Initialize a BarrierNode object."""
super().__init__(gate)
self.local = all_qubits
[docs]class DAGCircuit:
"""
A Directed Acyclic Graph of a quantum circuit.
Args:
circuit (:class:`~.core.circuit.Circuit`): the input quantum circuit.
Examples:
>>> from mindquantum.algorithm.compiler import DAGCircuit
>>> from mindquantum.core.circuit import Circuit
>>> circ = Circuit().h(0).x(1, 0)
>>> dag_circ = DAGCircuit(circ)
>>> dag_circ.head_node[0]
q0
>>> dag_circ.head_node[0].child
{0: H(0)}
"""
def __init__(self, circuit: Circuit):
"""Initialize a DAGCircuit object."""
_check_input_type("circuit", Circuit, circuit)
self.head_node = {i: DAGQubitNode(i) for i in sorted(circuit.all_qubits.keys())}
self.final_node = {i: DAGQubitNode(i) for i in sorted(circuit.all_qubits.keys())}
for i in self.head_node:
self.head_node[i].insert_after(self.final_node[i])
for gate in circuit:
if isinstance(gate, gates.BarrierGate):
if gate.obj_qubits:
self.append_node(BarrierNode(gate, sorted(gate.obj_qubits)))
else:
self.append_node(BarrierNode(gate, sorted(circuit.all_qubits.keys())))
else:
self.append_node(GateNode(gate))
self.global_phase = gates.GlobalPhase(0)
[docs] @staticmethod
def replace_node_with_dag_circuit(node: DAGNode, coming: "DAGCircuit"):
"""
Replace a node with a DAGCircuit.
Args:
node (:class:`~.algorithm.compiler.DAGNode`): the original DAG node.
coming (:class:`~.algorithm.compiler.DAGCircuit`): the coming DAG circuit.
Examples:
>>> from mindquantum.algorithm.compiler import DAGCircuit
>>> from mindquantum.core.circuit import Circuit
>>> circ = Circuit().x(1, 0)
>>> circ
q0: ────■─────
┃
┏━┻━┓
q1: ──┨╺╋╸┠───
┗━━━┛
>>> dag_circ = DAGCircuit(circ)
>>> node = dag_circ.head_node[0].child[0]
>>> node
X(1 <-: 0)
>>> sub_dag = DAGCircuit(Circuit().h(1).z(1, 0).h(1))
>>> DAGCircuit.replace_node_with_dag_circuit(node, sub_dag)
>>> dag_circ.to_circuit()
q0: ──────────■───────────
┃
┏━━━┓ ┏━┻━┓ ┏━━━┓
q1: ──┨ H ┠─┨ Z ┠─┨ H ┠───
┗━━━┛ ┗━━━┛ ┗━━━┛
"""
if set(node.local) != {head.qubit for head in coming.head_node.values()}:
raise ValueError(f"Circuit in coming DAG is not aligned with gate in node: {node}")
for local in node.local:
connect_two_node(node.father[local], coming.head_node[local].child[local], local)
connect_two_node(coming.final_node[local].father[local], node.child[local], local)
[docs] def append_node(self, node: DAGNode):
"""
Append a quantum gate node.
Args:
node (:class:`~.algorithm.compiler.DAGNode`): the DAG node you want to append.
Examples:
>>> from mindquantum.algorithm.compiler import DAGCircuit, GateNode
>>> from mindquantum.core.circuit import Circuit
>>> import mindquantum.core.gates as G
>>> circ = Circuit().h(0).x(1, 0)
>>> circ
┏━━━┓
q0: ──┨ H ┠───■─────
┗━━━┛ ┃
┏━┻━┓
q1: ────────┨╺╋╸┠───
┗━━━┛
>>> dag_circ = DAGCircuit(circ)
>>> node = GateNode(G.RX('a').on(0, 2))
>>> dag_circ.append_node(node)
>>> dag_circ.to_circuit()
┏━━━┓ ┏━━━━━━━┓
q0: ──┨ H ┠───■───┨ RX(a) ┠───
┗━━━┛ ┃ ┗━━━┳━━━┛
┏━┻━┓ ┃
q1: ────────┨╺╋╸┠─────╂───────
┗━━━┛ ┃
┃
q2: ──────────────────■───────
"""
_check_input_type('node', DAGNode, node)
for local in node.local:
if local not in self.head_node:
self.head_node[local] = DAGQubitNode(local)
self.final_node[local] = DAGQubitNode(local)
self.head_node[local].insert_after(self.final_node[local])
self.final_node[local].insert_before(node)
[docs] def depth(self) -> int:
"""
Return the depth of quantum circuit.
Examples:
>>> from mindquantum.core.circuit import Circuit
>>> from mindquantum.algorithm.compiler import DAGCircuit
>>> circ = Circuit().h(0).h(1).x(1, 0)
>>> circ
┏━━━┓
q0: ──┨ H ┠───■─────
┗━━━┛ ┃
┏━━━┓ ┏━┻━┓
q1: ──┨ H ┠─┨╺╋╸┠───
┗━━━┛ ┗━━━┛
>>> DAGCircuit(circ).depth()
2
"""
return len(self.layering())
[docs] def find_all_gate_node(self) -> typing.List[GateNode]:
"""
Find all gate node in this :class:`~.algorithm.compiler.DAGCircuit`.
Returns:
List[:class:`~.algorithm.compiler.GateNode`], a list of all :class:`~.algorithm.compiler.GateNode`
of this :class:`~.algorithm.compiler.DAGCircuit`.
Examples:
>>> from mindquantum.algorithm.compiler import DAGCircuit
>>> from mindquantum.core.circuit import Circuit
>>> circ = Circuit().h(0).x(1, 0)
>>> dag_circ = DAGCircuit(circ)
>>> dag_circ.find_all_gate_node()
[H(0), X(1 <-: 0)]
"""
found = set(self.head_node.values())
def _find(current_node: DAGNode, found):
if current_node not in found:
found.add(current_node)
for node in current_node.father.values():
_find(node, found)
for node in current_node.child.values():
_find(node, found)
for head_node in self.head_node.values():
for current_node in head_node.child.values():
_find(current_node, found)
return [i for i in found if not isinstance(i, DAGQubitNode)]
[docs] def layering(self) -> typing.List[Circuit]:
r"""
Layering the quantum circuit.
Returns:
List[:class:`~.core.circuit.Circuit`], a list of layered quantum circuit.
Examples:
>>> from mindquantum.algorithm.compiler import DAGCircuit
>>> from mindquantum.utils import random_circuit
>>> circ = random_circuit(3, 5, seed=42)
>>> circ
┏━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━┓
q0: ──┨ ┠─╳─┨ RY(-6.1944) ┠───────────────────
┃ ┃ ┃ ┗━━━━━━┳━━━━━━┛
┃ Rxx(1.2171) ┃ ┃ ┃ ┏━━━━━━━━━━━━━┓
q1: ──┨ ┠─┃────────╂────────┨ ┠───
┗━━━━━━━━━━━━━┛ ┃ ┃ ┃ ┃
┏━━━━━━━━━━━━┓ ┃ ┃ ┃ Rzz(-0.552) ┃
q2: ──┨ PS(2.6147) ┠──╳────────■────────┨ ┠───
┗━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━━┛
>>> dag_circ = DAGCircuit(circ)
>>> for idx, c in enumerate(dag_circ.layering()):
... print(f"layer {idx}:")
... print(c)
layer 0:
┏━━━━━━━━━━━━━┓
q0: ──┨ ┠───
┃ ┃
┃ Rxx(1.2171) ┃
q1: ──┨ ┠───
┗━━━━━━━━━━━━━┛
┏━━━━━━━━━━━━┓
q2: ──┨ PS(2.6147) ┠────
┗━━━━━━━━━━━━┛
layer 1:
q0: ──╳───
┃
┃
q2: ──╳───
layer 2:
┏━━━━━━━━━━━━━┓
q0: ──┨ RY(-6.1944) ┠───
┗━━━━━━┳━━━━━━┛
┃
q2: ─────────■──────────
layer 3:
┏━━━━━━━━━━━━━┓
q1: ──┨ ┠───
┃ ┃
┃ Rzz(-0.552) ┃
q2: ──┨ ┠───
┗━━━━━━━━━━━━━┛
"""
def _layering(current_node: GateNode, depth_map):
"""Layering the quantum circuit."""
if current_node.father:
prev_depth = []
for father_node in current_node.father.values():
if father_node not in depth_map:
_layering(father_node, depth_map)
prev_depth.append(depth_map[father_node])
depth_map[current_node] = max(prev_depth) + 1
for child in current_node.child.values():
if not isinstance(child, DAGQubitNode):
if child not in depth_map:
_layering(child, depth_map)
depth_map = {i: 0 for i in self.head_node.values()}
for current_node in self.head_node.values():
_layering(current_node, depth_map)
layer = [Circuit() for _ in range(len(set(depth_map.values())) - 1)]
for k, v in depth_map.items():
if v != 0:
if not isinstance(k, BarrierNode):
layer[v - 1] += k.gate
return [c for c in layer if len(c) != 0]
[docs] def to_circuit(self) -> Circuit:
"""
Convert :class:`~.algorithm.compiler.DAGCircuit` to quantum circuit.
Returns:
:class:`~.core.circuit.Circuit`, the quantum circuit of this DAG.
Examples:
>>> from mindquantum.core.circuit import Circuit
>>> from mindquantum.algorithm.compiler import DAGCircuit
>>> circ = Circuit().h(0).h(1).x(1, 0)
>>> circ
┏━━━┓
q0: ──┨ H ┠───■─────
┗━━━┛ ┃
┏━━━┓ ┏━┻━┓
q1: ──┨ H ┠─┨╺╋╸┠───
┗━━━┛ ┗━━━┛
>>> dag_circ = DAGCircuit(circ)
>>> dag_circ.to_circuit()
┏━━━┓
q0: ──┨ H ┠───■─────
┗━━━┛ ┃
┏━━━┓ ┏━┻━┓
q1: ──┨ H ┠─┨╺╋╸┠───
┗━━━┛ ┗━━━┛
"""
sorted_nodes = self.topological_sort()
circuit = Circuit()
considered_nodes = set(self.head_node.values())
def add_node_to_circuit(node, circuit, considered_nodes):
if node not in considered_nodes:
for parent in node.father.values():
add_node_to_circuit(parent, circuit, considered_nodes)
considered_nodes.add(node)
if isinstance(node, GateNode):
circuit += node.gate
for node in sorted_nodes:
add_node_to_circuit(node, circuit, considered_nodes)
return circuit
[docs] def topological_sort(self) -> typing.List[DAGNode]:
"""
Perform topological sorting on the DAG and return the nodes in topologically sorted order.
Returns:
List[DAGNode], List of nodes in topologically sorted order.
"""
from collections import deque, defaultdict
in_degree = defaultdict(int)
visited = set()
zero_in_degree_queue = deque()
# Initialize in-degrees and find initial zero in-degree nodes
for node in self.head_node.values():
if node not in visited:
queue = deque([node])
visited.add(node)
while queue:
current_node = queue.popleft()
for child in current_node.child.values():
if child not in visited:
in_degree[child] += 1
queue.append(child)
visited.add(child)
if in_degree[node] == 0:
zero_in_degree_queue.append(node)
# Topologically sorted nodes
sorted_nodes = []
while zero_in_degree_queue:
node = zero_in_degree_queue.popleft()
sorted_nodes.append(node)
for child in node.child.values():
in_degree[child] -= 1
if in_degree[child] == 0:
zero_in_degree_queue.append(child)
return [node for node in sorted_nodes if not isinstance(node, DAGQubitNode)]
# pylint: disable=too-many-return-statements,too-many-branches
[docs]def try_merge(
father_node: GateNode, child_node: GateNode
) -> typing.Tuple[bool, typing.List[GateNode], gates.GlobalPhase]:
"""
Try to merge two gate nodes.
Following this method, we merge two hermitian conjugated into identity, and also merge two same kind
parameterized gate into single parameterized gate.
Args:
father_node (:class:`~.algorithm.compiler.GateNode`): the father node want to merge.
child_node (:class:`~.algorithm.compiler.GateNode`): the child node want to merge.
Returns:
- bool, whether successfully merged.
- List[:class:`~.algorithm.compiler.GateNode`], the father node after merged.
- :class:`~.core.gates.GlobalPhase`, the global phase gate after merge two given gate node.
"""
if len(set(father_node.child.values())) != 1 or len(set(child_node.father.values())) != 1:
return False, [], None
for node in father_node.child.values():
if node != child_node:
return False, [], None
state, res, global_phase = father_node.gate.__merge__(child_node.gate)
if not state or len(res) > 1:
return False, father_node, None
if res:
merged_node = GateNode(res[0])
father_of_father = []
for i in father_node.local:
father_of_father.append(father_node.father[i])
connect_two_node(father_node.father[i], merged_node, i)
connect_two_node(merged_node, child_node.child[i], i)
father_node.clean()
child_node.clean()
if global_phase:
if global_phase.ctrl_qubits:
ctrl_gp = GateNode(global_phase).local
for i in ctrl_gp:
connect_two_node(merged_node, ctrl_gp, i)
connect_two_node(ctrl_gp, merged_node.child[i], i)
return True, list(set(father_of_father)), None
return True, list(set(father_of_father)), global_phase
return True, list(set(father_of_father)), None
father_of_father = []
if global_phase:
if global_phase.ctrl_qubits:
ctrl_gp = GateNode(global_phase)
for i in father_node.local:
father_of_father.append(father_node.father[i])
connect_two_node(father_node.father[i], ctrl_gp, i)
connect_two_node(ctrl_gp, child_node.child[i], i)
else:
for i in father_node.local:
father_of_father.append(father_node.father[i])
connect_two_node(father_node.father[i], child_node.child[i], i)
father_node.clean()
child_node.clean()
return True, list(set(father_of_father)), (None if global_phase.ctrl_qubits else global_phase)
for i in father_node.local:
father_of_father.append(father_node.father[i])
connect_two_node(father_node.father[i], child_node.child[i], i)
father_node.clean()
child_node.clean()
return True, list(set(father_of_father)), None