diff --git a/.gitignore b/.gitignore index f857bc2..936ccc7 100644 --- a/.gitignore +++ b/.gitignore @@ -114,4 +114,8 @@ dmypy.json .pyre/ # macOS -.DS_Store \ No newline at end of file +.DS_Store +*.code-workspace + +data/* +plots/* \ No newline at end of file diff --git a/qml_training.py b/qml_training.py index 68adee3..2195d88 100644 --- a/qml_training.py +++ b/qml_training.py @@ -2,25 +2,31 @@ import torch import math import random +from enum import Enum + DEBUG = False + +class EncodingType(Enum): + """Enumeration for quantum feature encoding types.""" + ANGLE = "angle" + AMPLITUDE = "amplitude" + from quantum_simulator import ( zero_state, + custom_state, apply_gate, + apply_circuit_to_ket, RY, RZ, RX, CNOT, - state_to_density, - build_full_unitary, - kraus_operator, - apply_named_gate_density, - apply_T1T2_noise_op, Z, I2 ) # sklearn only for data (this is standard + allowed) from sklearn.datasets import load_breast_cancer +from sklearn.datasets import make_moons from sklearn.preprocessing import StandardScaler from sklearn.decomposition import PCA from sklearn.model_selection import train_test_split @@ -28,7 +34,7 @@ # ============================================================ # Dataset (REAL-WORLD) # ============================================================ -def make_real_dataset(test_size=0.3, seed=42): +def make_real_dataset(test_size=0.3, seed=42, encoding=EncodingType.ANGLE): data = load_breast_cancer() X = data.data y = data.target @@ -39,11 +45,14 @@ def make_real_dataset(test_size=0.3, seed=42): # normalize X = StandardScaler().fit_transform(X) - # reduce to 2 features → 2 qubits - X = PCA(n_components=2).fit_transform(X) + # For angle encoding: n features → n qubits (use 2 features for 2 qubits) + # For amplitude encoding: 2^n amplitudes needed for n qubits (use 4 features for 2 qubits) + n_components = 4 if encoding == EncodingType.AMPLITUDE else 2 + X = PCA(n_components=n_components).fit_transform(X) - # scale to [-π, π] for angle encoding - X = math.pi * X / X.max(axis=0) + # scale to [-π, π] for angle encoding (or normalized range for amplitude) + if encoding == EncodingType.ANGLE: + X = math.pi * X / X.max(axis=0) X_train, X_test, y_train, y_test = train_test_split( X, y, @@ -59,6 +68,58 @@ def make_real_dataset(test_size=0.3, seed=42): torch.tensor(y_test, dtype=torch.float32), ) +# ============================================================ +# New: make_moons Dataset +# ============================================================ + +def make_moons_dataset(test_size=0.3, seed=42, noise=0.15, encoding=EncodingType.ANGLE): + """ + Generates a binary classification dataset (two interleaving moons). + Labels {0,1} are mapped to {-1,+1}. + """ + X, y = make_moons(n_samples=500, noise=noise, random_state=seed) # binary labels 0/1 + y = 2 * y - 1 # convert {0,1} to {-1,+1} + + # Standardize base 2D features + X = StandardScaler().fit_transform(X) + + # Match feature dimensionality to encoding logic (PCA where possible) + # - ANGLE encoding: 2 components → 2 qubits + # - AMPLITUDE encoding: need 4 real features → 2^2 amplitudes for 2 qubits + if encoding == EncodingType.AMPLITUDE: + if X.shape[1] < 4: + # Pad with zeros in alternating slots: [x1, 0, x2, 0] + # Each adjacent pair feeds one qubit amplitude; keeping one value non-zero per pair + # avoids initializing both amplitudes to zero and wasting that qubit. + X_pad = torch.zeros((X.shape[0], 4), dtype=torch.float32) + X_pad[:, 0] = torch.tensor(X[:, 0], dtype=torch.float32) + X_pad[:, 2] = torch.tensor(X[:, 1], dtype=torch.float32) + X = X_pad.numpy() + else: + X = PCA(n_components=4).fit_transform(X) + else: + X = PCA(n_components=2).fit_transform(X) + + # Scale to [-π, π] for ANGLE encoding (consistent with make_real_dataset) + if encoding == EncodingType.ANGLE: + X = math.pi * X / X.max(axis=0) + + # Split + X_train, X_test, y_train, y_test = train_test_split( + X, y, + test_size=test_size, + random_state=seed, + stratify=y + ) + + return ( + torch.tensor(X_train, dtype=torch.float32), + torch.tensor(X_test, dtype=torch.float32), + torch.tensor(y_train, dtype=torch.float32), + torch.tensor(y_test, dtype=torch.float32), + ) + + # ============================================================ # Metrics # ============================================================ @@ -127,16 +188,70 @@ def expectation_z(state, qubit, n): op = mat if op is None else torch.kron(op, mat) return torch.real(state.conj() @ (op @ state)) -# ============================================================ -# Deep Variational QNN -# ============================================================ -def deep_vqc_forward(x, theta, depth=3): - n = 2 - state = zero_state(n) + """ + Encoding methods - angle encoding, and amplitude encoding. Returns normalized ket vector state, and circuit + + x - input vector + n - amount of qubits + encoding - enum EncodingType + + Throws error if the encoding type is not known. + Will return state and circ, if circ is nonempty - the state encoding method needs it to be applied to the state to be initialized. This allows for error models that apply noise during state preparation. + + EX: + state, circuit = state_encoding(x, n, encoding=EncodingType.ANGLE) + TO GET KET: + state = apply_circuit_to_ket(state, circuit, n) + + TO USE DENSITY SIM WITH NOISE: + density = state_to_density(state) + density = run_noisy_circuit_density(density, circuit, n, T1, T2, gate_durations) + """ +def state_encoding(x, n, encoding=EncodingType.ANGLE): + # Specify encoding type + # custom_state(x) -> Creates arbitrary normalized amplitude state (ignores gate errors, just creats perfect state) + if encoding == EncodingType.ANGLE: + assert len(x) == n, "Input dimension must match number of qubits for angle encoding" + state = zero_state(n) + elif encoding == EncodingType.AMPLITUDE: + # For amplitude encoding, we need 2^n amplitudes + # Pad the input if necessary + required_size = 2 ** n + if len(x) < required_size: + print("Warning: Input size less than required for amplitude encoding, padding with zeros.") + # Pad with zeros to match required size + padded_x = torch.zeros(required_size) + if isinstance(x, torch.Tensor): + padded_x[:len(x)] = x.clone().detach() + else: + padded_x[:len(x)] = torch.tensor(x) + state = custom_state(padded_x) + else: + # Use first 2^n elements if input is larger + state = custom_state(x[:required_size]) + else: + raise Exception("Unknown encoding") + + circuit = [] + # Angle encoding state, assuming x is mapped to [-π,π] + if encoding == EncodingType.ANGLE: + circuit = param_gate_layer("RY",x) - state = apply_gate(state, RY(x[0]), [0], n) - state = apply_gate(state, RY(x[1]), [1], n) + return state, circuit +# ============================================================ +# Deep Variational QNN, no errors +# ============================================================ +def deep_vqc_forward(x, theta, depth=3, encoding=EncodingType.ANGLE, n=None): + # Calculate number of qubits based on encoding type + if n is None: + n = len(x) if encoding == EncodingType.ANGLE else int(math.log2(len(x))) + + # Prep state with desired encoding method + state,circ = state_encoding(x,n,encoding) + state = apply_circuit_to_ket(state,circ,n) + + # Variational layers idx = 0 for _ in range(depth): state = apply_gate(state, RY(theta[idx]), [0], n) @@ -152,36 +267,49 @@ def deep_vqc_forward(x, theta, depth=3): # ============================================================ # Helper function to build layer of RX/Y/Z on selected qubits -def param_gate_layer(gate: str, x , qubits: tuple[int]): +def param_gate_layer(gate: str, x, specify_qubits: tuple[int] | None = None): op_list = [] - for qubit in qubits: - op_list.append((gate, [qubit], x[qubit])) + if specify_qubits == None: + # Apply gate to all qubits + n = len(x) + for qubit in range(n): + op_list.append((gate, [qubit], x[qubit])) + else: + # apply gate to specified qubits + for qubit in specify_qubits: + op_list.append((gate, [qubit], x[qubit])) return op_list -def noisy_qnn_forward(x, theta, - T1=100, - T2=200, - gate_durations={ - "CNOT": 1, - "RY" : 1, - } - ): +""" +Noisy-VQC forward pass (one layer), with T1/T2 noise model. +""" +def noisy_qnn_forward(x, theta, T1=100, T2=200, + gate_durations={ # needed for noise model + "CNOT": 1, + "RY" : 1, + }, + encoding=EncodingType.ANGLE): + + # Calculate number of qubits based on encoding type + n = len(x) if encoding == EncodingType.ANGLE else int(math.log2(len(x))) + + init_state, circ = state_encoding(x,n,encoding) # circ now has the first RY layer (if angle encoding) - n = 2 + # First RY trainable layer + circ += param_gate_layer("RY",theta,[0,1]) + + # CNOT layer (0 -> 1) + circ += [("CNOT", [0,1])] - init_state = zero_state(n) - x_RY_layer = param_gate_layer("RY",x,[0,1]) - theta_RY_layer = param_gate_layer("RY",theta,[0,1]) - cnot_layer = [("CNOT", [0,1])] - all_gates = x_RY_layer + theta_RY_layer + cnot_layer - all_gates.append(("RZ", [0], theta[2])) + # Last trainable RZ on qubit 0 + circ += [("RZ", [0], theta[2])] if DEBUG: - print(f"QNN layer gates:\n{all_gates}") + print(f"QNN layer gates:\n{circ}") density = run_noisy_circuit_density( initial_state=init_state, - circuit=all_gates, + circuit=circ, num_qubits = n, T1 = T1, T2 = T2, @@ -195,56 +323,88 @@ def noisy_qnn_forward(x, theta, # ============================================================ # Quantum Kernel Model # ============================================================ -def quantum_feature_map(x): - n = 2 - state = zero_state(n) - state = apply_gate(state, RY(x[0]), [0], n) - state = apply_gate(state, RY(x[1]), [1], n) + +def quantum_feature_map(x, encoding=EncodingType.ANGLE): + # Calculate number of qubits based on encoding type + n = len(x) if encoding == EncodingType.ANGLE else int(math.log2(len(x))) + init_state,circ = state_encoding(x,n,encoding) + state = apply_circuit_to_ket(init_state,circ,n) state = apply_gate(state, CNOT, [0,1], n) return state -def quantum_kernel(x1, x2): - ψ1 = quantum_feature_map(x1) - ψ2 = quantum_feature_map(x2) +def quantum_kernel(x1, x2, encoding = EncodingType.ANGLE): + ψ1 = quantum_feature_map(x1, encoding) + ψ2 = quantum_feature_map(x2, encoding) return torch.abs(torch.dot(ψ1.conj(), ψ2))**2 -def kernel_predict(x, X_train, y_train): - vals = torch.tensor([quantum_kernel(x, xi) for xi in X_train]) +def kernel_predict(x, X_train, y_train, encoding = EncodingType.ANGLE): + vals = torch.tensor([quantum_kernel(x, xi, encoding) for xi in X_train]) return torch.sign(torch.sum(vals * y_train)) # ============================================================ # Training # ============================================================ -def train(model_type="deep_vqc"): - X_train, X_test, y_train, y_test = make_real_dataset() - epochs = 25 + """Run model training + + model_type: + "deep_vqc" - variational quantum circuit with 3 layers + "noise_aware" - noise-aware QNN with 1 layer + "kernel" - quantum kernel model (no training, just prediction) + encoding: + EncodingType.ANGLE - angle encoding + EncodingType.AMPLITUDE - amplitude encoding + epochs: number of training epochs + dataset: + "real" - breast cancer dataset + "moons" - make_moons dataset + record_metrics: if True, returns training metrics (loss, accuracy) history for plotting + T1: T1 relaxation time for noise model (µs) + T2: T2 dephasing time for noise model (µs) + + Returns: + If record_metrics=True: dict with loss_history, acc_history, final_metrics, etc. + Otherwise: None + + """ +def train(model_type="deep_vqc", encoding=EncodingType.ANGLE, epochs=25, dataset="real", record_metrics=False, T1=100, T2=200): + if dataset == "real": + X_train, X_test, y_train, y_test = make_real_dataset(encoding=encoding) + elif dataset == "moons": + X_train, X_test, y_train, y_test = make_moons_dataset(encoding=encoding) + else: + raise Exception("Unknown dataset") + lr = 0.1 + deep_vqc_depth = 3 if model_type == "kernel": scores = torch.tensor([ - kernel_predict(x, X_train, y_train) for x in X_test + kernel_predict(x, X_train, y_train, encoding) for x in X_test ]) metrics = compute_metrics(scores, y_test) - print("KERNEL TEST METRICS:", metrics) + print(f"{dataset.upper()} KERNEL TEST METRICS:", metrics) return theta = torch.randn(9 if model_type == "deep_vqc" else 3) * 0.1 + # For plotting + loss_history = [] + acc_history = [] + for epoch in range(epochs): grads = torch.zeros_like(theta) + total_loss = 0.0 for i in range(len(X_train)): xi, yi = X_train[i], y_train[i] pred = ( - deep_vqc_forward(xi, theta) + deep_vqc_forward(xi, theta, deep_vqc_depth, encoding) if model_type == "deep_vqc" - else noisy_qnn_forward(xi, theta) + else noisy_qnn_forward(xi, theta, T1=T1, T2=T2, encoding=encoding) ) - - # Debug first sample of deep_vqc - if model_type == "deep_vqc" and epoch == 0 and i == 0: - print(f" Initial pred = {pred.item():.6f}, label = {yi.item():.1f}") + + total_loss += (pred - yi)**2 for p in range(len(theta)): shift = math.pi / 2 @@ -253,57 +413,80 @@ def train(model_type="deep_vqc"): tm[p] -= shift fp = ( - deep_vqc_forward(xi, tp) + deep_vqc_forward(xi, tp, deep_vqc_depth, encoding) if model_type == "deep_vqc" - else noisy_qnn_forward(xi, tp) + else noisy_qnn_forward(xi, tp, T1=T1, T2=T2, encoding=encoding) ) fm = ( - deep_vqc_forward(xi, tm) + deep_vqc_forward(xi, tm, deep_vqc_depth, encoding) if model_type == "deep_vqc" - else noisy_qnn_forward(xi, tm) + else noisy_qnn_forward(xi, tm, T1=T1, T2=T2, encoding=encoding) ) - - # Debug: check if outputs are changing - if model_type == "deep_vqc" and epoch == 0 and i == 0 and p == 0: - print(f" DEBUG: f+ = {fp.item():.6f}, f- = {fm.item():.6f}, diff = {(fp-fm).item():.6f}") grads[p] += 0.5 * ((fp - yi)**2 - (fm - yi)**2) + # Update theta -= lr * grads / len(X_train) + # Record loss + loss_avg = total_loss.item() / len(X_train) + loss_history.append(loss_avg) + + # Test accuracy scores_test = torch.tensor([ - deep_vqc_forward(x, theta) + deep_vqc_forward(x, theta, deep_vqc_depth, encoding) if model_type == "deep_vqc" - else noisy_qnn_forward(x, theta) + else noisy_qnn_forward(x, theta, T1=T1, T2=T2, encoding=encoding) for x in X_test ]) - - metrics = compute_metrics(scores_test, y_test) - - # Debug metrics - grad_norm = torch.norm(grads).item() - param_norm = torch.norm(theta).item() + metric_dict = compute_metrics(scores_test, y_test) + acc_history.append(metric_dict["accuracy"]) print( - f"{model_type.upper()} | Epoch {epoch:02d} | " - f"Acc {metrics['accuracy']:.3f} | " - f"Prec {metrics['precision']:.3f} | " - f"Rec {metrics['recall']:.3f} | " - f"F1 {metrics['f1']:.3f} | " - f"AUC {metrics['roc_auc']:.3f} | " - f"GradNorm {grad_norm:.6f} | " - f"ParamNorm {param_norm:.6f}" + f"{dataset.upper()} | {model_type.upper()} | Epoch {epoch+1:02d} | " + f"Loss {loss_avg:.4f} | Acc {metric_dict['accuracy']:.3f}" ) + # Return metrics history if requested + if record_metrics: + return { + 'loss_history': loss_history, + 'acc_history': acc_history, + 'final_metrics': metric_dict, + 'model_type': model_type, + 'dataset': dataset, + 'encoding': encoding.value, + 'epochs': epochs + } + + + # ============================================================ # Run # ============================================================ if __name__ == "__main__": - print("\nTraining Deep Variational QNN (Real Dataset)") - train("deep_vqc") + # Test angle encoding + print("Training Deep Variational QNN (Angle Encoding, Real Dataset)") + train("deep_vqc",EncodingType.ANGLE) + print("\nTraining Noise-Aware QNN (Angle Encoding, Real Dataset)") + train("noise_aware",EncodingType.ANGLE) + print("\nTraining Quantum Kernel Model (Angle Encoding, Real Dataset)") + train("kernel",EncodingType.ANGLE) + + # Amplitude encoding tests (may be slow) + print("\nTraining Deep Variational QNN (Amplitude Encoding, Real Dataset)") + train("deep_vqc",EncodingType.AMPLITUDE) + print("\nTraining Noise-Aware QNN (Amplitude Encoding, Real Dataset)") + train("noise_aware",EncodingType.AMPLITUDE) + print("\nTraining Quantum Kernel Model (Amplitude Encoding, Real Dataset)") + train("kernel",EncodingType.AMPLITUDE) + + + # print("\nTraining Deep Variational QNN (Real Dataset)") + # train("deep_vqc") - print("\nTraining Noise-Aware QNN (Real Dataset)") - train("noise_aware") + # print("\nTraining Noise-Aware QNN (Real Dataset)") + # train("noise_aware") - print("\nTraining Quantum Kernel Model (Real Dataset)") - train("kernel") + # print("\nTraining Quantum Kernel Model (Real Dataset)") + # train("kernel") diff --git a/quantum_simulator.py b/quantum_simulator.py index b32aa09..98aeea4 100644 --- a/quantum_simulator.py +++ b/quantum_simulator.py @@ -1,3 +1,4 @@ +from PIL.Image import new from locale import str import torch import math @@ -18,7 +19,10 @@ def random_state(num_qubits: int): return state / torch.linalg.norm(state) def custom_state(amplitudes): - state = torch.tensor(amplitudes, dtype=torch.cfloat) + if isinstance(amplitudes, torch.Tensor): + state = amplitudes.to(dtype=torch.cfloat).clone().detach() + else: + state = torch.tensor(amplitudes, dtype=torch.cfloat) return state / torch.linalg.norm(state) # ─────────────────────────────────────────────────────────────── @@ -152,6 +156,32 @@ def apply_gate(state: torch.Tensor, gate: torch.Tensor, targets: list, num_qubit # reshape back to a vector of length 2^n return result.reshape(-1) + +"""Apply circuit list (in the style of circuits fed to "run_noisy_circuit_density") to a state, but stay in ket/vector notation + +Example circuit: + circuit = [ + ("H", [0]), + ("CNOT",[0,1]), + (RY, [2], math.pi/4), # parametric gate + ] + +EX: +state = zero_state(3) +new_state = apply_circuit_to_ket(state, circuit, 3) +""" +def apply_circuit_to_ket(state: torch.Tensor, circuit: List[Tuple[str, List[int]]], num_qubits: int): + new_state = state.clone() + for op in circuit: + name = op[0] + qubits = op[1] + if name in PARAMETRIC_GATES: + param = op[2] + gate = PARAMETRIC_GATES[name](param) + else: + gate = GATE_LIBRARY[name] + new_state = apply_gate(new_state, gate, qubits, num_qubits) + return new_state # ─────────────────────────────────────────────────────────────── # Measurement # ─────────────────────────────────────────────────────────────── diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0aa4701 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +torch>=2.0.0 +numpy>=1.24.0 +matplotlib>=3.7.0 +scikit-learn>=1.3.0 +pandas>=2.0.0 diff --git a/run_experiments.py b/run_experiments.py new file mode 100644 index 0000000..d872a95 --- /dev/null +++ b/run_experiments.py @@ -0,0 +1,329 @@ +""" +Experiment Runner for QML Training + +Runs all training configurations with specified parameters and saves +results to CSV files in the data/ folder. + +Usage: + python run_experiments.py --epochs 25 --T1 100 --T2 200 + python run_experiments.py --epochs 50 --T1 50 --T2 100 --output_prefix "noisy_" +""" + +import os +import csv +import argparse +from datetime import datetime +from itertools import product + +from qml_training import train, EncodingType + +# Configuration options +MODEL_TYPES = ["deep_vqc", "noise_aware"] # kernel doesn't have epoch-based training +ENCODINGS = [EncodingType.ANGLE, EncodingType.AMPLITUDE] +DATASETS = ["real", "moons"] + + +def ensure_data_dir(): + """Create data directory if it doesn't exist.""" + data_dir = os.path.join(os.path.dirname(__file__), "data") + os.makedirs(data_dir, exist_ok=True) + return data_dir + + +def load_csv_history(filepath): + """ + Load training history from CSV file. + + Returns: + dict with 'loss_history' and 'acc_history' lists + """ + loss_history = [] + acc_history = [] + with open(filepath, 'r') as f: + reader = csv.DictReader(f) + for row in reader: + loss_history.append(float(row['loss'])) + acc_history.append(float(row['acc'])) + return {'loss_history': loss_history, 'acc_history': acc_history} + + +def find_matching_csvs(data_dir, dataset, T1, T2, epochs): + """ + Find CSV files matching the given parameters. + + Returns: + dict mapping model labels to file paths + """ + pattern_map = { + 'VQC Angle': f"deep_vqc_{dataset}_t1{int(T1)}_t2{int(T2)}_ep{epochs}", + 'VQC Amplitude': f"deep_vqc_{dataset}_t1{int(T1)}_t2{int(T2)}_ep{epochs}", + 'QNN Angle': f"noise_aware_{dataset}_t1{int(T1)}_t2{int(T2)}_ep{epochs}", + 'QNN Amplitude': f"noise_aware_{dataset}_t1{int(T1)}_t2{int(T2)}_ep{epochs}", + } + + # Actually we need to differentiate by encoding in the filename + # Current format: {model}_{dataset}_t1{T1}_t2{T2}_ep{epochs}_{date}.csv + # We need to find files and match by model type + + found_files = {} + for filename in os.listdir(data_dir): + if not filename.endswith('.csv') or filename.startswith('summary'): + continue + + # Parse filename: model_dataset_t1X_t2Y_epZ_date.csv + parts = filename.replace('.csv', '').split('_') + if len(parts) < 6: + continue + + file_model = parts[0] + if parts[0] == 'deep': + file_model = 'deep_vqc' + parts = ['deep_vqc'] + parts[2:] # rejoin + elif parts[0] == 'noise': + file_model = 'noise_aware' + parts = ['noise_aware'] + parts[2:] + + file_dataset = parts[1] + + # Check if matches our criteria + if file_dataset != dataset: + continue + if f"t1{int(T1)}" not in filename or f"t2{int(T2)}" not in filename: + continue + if f"ep{epochs}" not in filename: + continue + + # Determine encoding from the file content or use latest file + filepath = os.path.join(data_dir, filename) + + if file_model == 'deep_vqc': + # We'll need to track both VQC files and pick based on order + if 'VQC' not in str(found_files.keys()): + found_files[f"VQC_{filename}"] = filepath + elif file_model == 'noise_aware': + if 'QNN' not in str(found_files.keys()): + found_files[f"QNN_{filename}"] = filepath + + return found_files + + +def plot_from_results(results, dataset, T1=100, T2=200, epochs=25, save=False): + """ + Plot comparison from in-memory results. + + Args: + results: List of result dicts from run_all_experiments + dataset: Dataset to filter for + T1: T1 noise parameter + T2: T2 noise parameter + epochs: Number of epochs + save: If True, save plots to plots/ directory + """ + from visualizer import plot_model_comparison + + data_dict = {} + for r in results: + if r['dataset'] != dataset: + continue + + model = r['model_type'] + encoding = r['encoding'] + + if model == 'deep_vqc': + label = f"VQC {encoding.capitalize()}" + else: + label = f"QNN {encoding.capitalize()}" + + data_dict[label] = { + 'loss_history': r['loss_history'], + 'acc_history': r['acc_history'] + } + + if data_dict: + plot_model_comparison(data_dict, dataset_name=dataset, + save=save, T1=T1, T2=T2, epochs=epochs) + + +def save_training_history_csv(metrics, filepath): + """ + Save epoch-by-epoch training history to CSV. + + Columns: epoch, loss, acc + """ + with open(filepath, 'w', newline='') as f: + writer = csv.writer(f) + writer.writerow(['epoch', 'loss', 'acc']) + for epoch, (loss, acc) in enumerate(zip(metrics['loss_history'], metrics['acc_history']), start=1): + writer.writerow([epoch, f'{loss:.6f}', f'{acc:.4f}']) + + +def save_summary_csv(all_results, filepath): + """ + Save summary of all experiments to a single CSV. + """ + with open(filepath, 'w', newline='') as f: + writer = csv.writer(f) + writer.writerow(['model', 'enc', 'data', 'loss', 'acc', 'f1', 'auc']) + for r in all_results: + writer.writerow([ + r['model_type'], + r['encoding'], + r['dataset'], + f"{r['loss_history'][-1]:.4f}", + f"{r['acc_history'][-1]:.4f}", + f"{r['final_metrics']['f1']:.4f}", + f"{r['final_metrics']['roc_auc']:.4f}" + ]) + + +def run_all_experiments(epochs=25, T1=100, T2=200): + """ + Run all training configurations and save results. + + Args: + epochs: Number of training epochs + T1: T1 relaxation time for noise model (µs) + T2: T2 dephasing time for noise model (µs) + + Returns: + List of all results dictionaries + """ + data_dir = ensure_data_dir() + all_results = [] + + # Generate timestamp for this run (day_hour_min) + timestamp = datetime.now().strftime("%d_%H%M") + + # All configurations to run + configurations = list(product(MODEL_TYPES, ENCODINGS, DATASETS)) + total_configs = len(configurations) + + print(f"\n{'='*60}") + print(f"QML Experiment Runner") + print(f"{'='*60}") + print(f"Epochs: {epochs}") + print(f"Noise Parameters: T1={T1}, T2={T2}") + print(f"Total configurations: {total_configs}") + print(f"Output directory: {data_dir}") + print(f"{'='*60}\n") + + for idx, (model_type, encoding, dataset) in enumerate(configurations, start=1): + config_name = f"{model_type}_{encoding.value}_{dataset}" + print(f"\n[{idx}/{total_configs}] Running: {config_name}") + print("-" * 40) + + try: + # Run training with record_metrics=True + metrics = train( + model_type=model_type, + encoding=encoding, + epochs=epochs, + dataset=dataset, + record_metrics=True, + T1=T1, + T2=T2 + ) + + if metrics is None: + print(f" Warning: No metrics returned for {config_name}") + continue + + # Add noise parameters to metrics + metrics['T1'] = T1 + metrics['T2'] = T2 + + all_results.append(metrics) + + # Save individual training history CSV + # Format: {model}_{dataset}_t1{T1}_t2{T2}_ep{epochs}_{date}.csv + history_filename = f"{model_type}_{dataset}_t1{int(T1)}_t2{int(T2)}_ep{epochs}_{timestamp}.csv" + history_filepath = os.path.join(data_dir, history_filename) + save_training_history_csv(metrics, history_filepath) + print(f" Saved: {history_filename}") + + except Exception as e: + print(f" Error running {config_name}: {e}") + continue + + # Save summary CSV with all results + if all_results: + # Format: summary_t1{T1}_t2{T2}_ep{epochs}_{date}.csv + summary_filename = f"summary_t1{int(T1)}_t2{int(T2)}_ep{epochs}_{timestamp}.csv" + summary_filepath = os.path.join(data_dir, summary_filename) + save_summary_csv(all_results, summary_filepath) + print(f"\n{'='*60}") + print(f"Summary saved: {summary_filename}") + print(f"Total successful runs: {len(all_results)}/{total_configs}") + print(f"{'='*60}\n") + + return all_results + + +def main(): + parser = argparse.ArgumentParser( + description="Run QML training experiments and save results to CSV" + ) + parser.add_argument( + '--epochs', type=int, default=25, + help='Number of training epochs (default: 25)' + ) + parser.add_argument( + '--T1', type=float, default=100, + help='T1 relaxation time in µs for noise model (default: 100)' + ) + parser.add_argument( + '--T2', type=float, default=200, + help='T2 dephasing time in µs for noise model (default: 200)' + ) + parser.add_argument( + '--models', type=str, nargs='+', default=None, + choices=['deep_vqc', 'noise_aware'], + help='Specific models to run (default: all)' + ) + parser.add_argument( + '--encodings', type=str, nargs='+', default=None, + choices=['angle', 'amplitude'], + help='Specific encodings to run (default: all)' + ) + parser.add_argument( + '--datasets', type=str, nargs='+', default=None, + choices=['real', 'moons'], + help='Specific datasets to run (default: all)' + ) + parser.add_argument( + '--plot', action='store_true', + help='Plot accuracy and loss comparison after training' + ) + + args = parser.parse_args() + + # Override global configs if specific options provided + global MODEL_TYPES, ENCODINGS, DATASETS + + if args.models: + MODEL_TYPES = args.models + if args.encodings: + ENCODINGS = [EncodingType.ANGLE if e == 'angle' else EncodingType.AMPLITUDE + for e in args.encodings] + if args.datasets: + DATASETS = args.datasets + + # Run experiments + results = run_all_experiments( + epochs=args.epochs, + T1=args.T1, + T2=args.T2 + ) + + # Plot if requested (always save when --plot is used) + if args.plot and results: + datasets_to_plot = args.datasets if args.datasets else ['real', 'moons'] + for dataset in datasets_to_plot: + plot_from_results(results, dataset, + T1=args.T1, T2=args.T2, epochs=args.epochs, save=True) + + return results + + +if __name__ == "__main__": + main() diff --git a/test_qml_parallel_run.py b/test_qml_parallel_run.py new file mode 100644 index 0000000..ec1cce3 --- /dev/null +++ b/test_qml_parallel_run.py @@ -0,0 +1,55 @@ +import unittest +import torch +from qml_training_parallel import ( + parallel_forward_pass, + parallel_gradient_step, + train_kernel_parallel, +) +from qml_training import make_real_dataset + + +class TestParallelQMLRun(unittest.TestCase): + def setUp(self): + X_train, X_test, y_train, y_test = make_real_dataset(test_size=0.9) + # Keep tiny slices for fast tests + self.X_train = X_train[:5] + self.X_test = X_test[:4] + self.y_train = y_train[:5] + self.y_test = y_test[:4] + self.num_workers = 2 + + def test_parallel_forward_pass_deep_vqc_runs(self): + theta = torch.randn(9) * 0.1 + scores = parallel_forward_pass(self.X_test, theta, model_type="deep_vqc", num_workers=self.num_workers) + self.assertTrue(torch.is_tensor(scores)) + self.assertEqual(scores.shape[0], len(self.X_test)) + + def test_parallel_forward_pass_noise_aware_runs(self): + theta = torch.randn(3) * 0.1 + scores = parallel_forward_pass(self.X_test, theta, model_type="noise_aware", num_workers=self.num_workers) + self.assertTrue(torch.is_tensor(scores)) + self.assertEqual(scores.shape[0], len(self.X_test)) + + def test_parallel_gradient_step_deep_vqc_runs(self): + theta = torch.randn(9) * 0.1 + grads = parallel_gradient_step(self.X_train, self.y_train, theta, model_type="deep_vqc", num_workers=self.num_workers) + self.assertTrue(torch.is_tensor(grads)) + self.assertEqual(grads.shape, theta.shape) + + def test_parallel_gradient_step_noise_aware_runs(self): + theta = torch.randn(3) * 0.1 + grads = parallel_gradient_step(self.X_train, self.y_train, theta, model_type="noise_aware", num_workers=self.num_workers) + self.assertTrue(torch.is_tensor(grads)) + self.assertEqual(grads.shape, theta.shape) + + def test_kernel_parallel_metrics_runs(self): + # This uses parallel prediction; it prints metrics and returns a dict + metrics = train_kernel_parallel(num_workers=self.num_workers) + self.assertIsInstance(metrics, dict) + # Expect common keys + for key in ["accuracy", "precision", "recall", "f1", "roc_auc"]: + self.assertIn(key, metrics) + + +if __name__ == "__main__": + unittest.main() diff --git a/test_qml_run.py b/test_qml_run.py new file mode 100644 index 0000000..afb1b16 --- /dev/null +++ b/test_qml_run.py @@ -0,0 +1,122 @@ +import unittest +import torch +from qml_training import ( + make_real_dataset, + deep_vqc_forward, + noisy_qnn_forward, + quantum_feature_map, + quantum_kernel, + kernel_predict, + EncodingType, + train, +) + + +class TestSequentialQMLRun(unittest.TestCase): + def setUp(self): + # Use real dataset but keep computations tiny by slicing + X_train, X_test, y_train, y_test = make_real_dataset(test_size=0.9) + self.X_train = X_train[:5] + self.X_test = X_test[:5] + self.y_train = y_train[:5] + self.y_test = y_test[:5] + + def test_deep_vqc_forward_runs(self): + x = self.X_train[0] + # 3 parameters per layer * depth=3 → 9 + theta = torch.randn(9) * 0.1 + out = deep_vqc_forward(x, theta) + self.assertTrue(torch.is_tensor(out)) + self.assertEqual(out.numel(), 1) + + def test_noisy_qnn_forward_runs(self): + x = self.X_train[0] + # One layer with 3 params + theta = torch.randn(3) * 0.1 + out = noisy_qnn_forward(x, theta) + self.assertTrue(torch.is_tensor(out)) + self.assertEqual(out.numel(), 1) + + def test_kernel_functions_run(self): + x1 = self.X_test[0] + x2 = self.X_test[1] + # Feature map + kernel + psi1 = quantum_feature_map(x1, encoding=EncodingType.ANGLE) + psi2 = quantum_feature_map(x2, encoding=EncodingType.ANGLE) + self.assertTrue(torch.is_tensor(psi1)) + self.assertTrue(torch.is_tensor(psi2)) + k = quantum_kernel(x1, x2, encoding=EncodingType.ANGLE) + self.assertTrue(torch.is_tensor(k)) + self.assertEqual(k.numel(), 1) + + def test_kernel_predict_runs(self): + x = self.X_test[0] + pred = kernel_predict(x, self.X_train, self.y_train, encoding=EncodingType.ANGLE) + self.assertTrue(torch.is_tensor(pred)) + self.assertEqual(pred.numel(), 1) + + +class TestAllConfigurationsTraining(unittest.TestCase): + """Test training with 2 epochs for all configurations.""" + + def test_deep_vqc_angle_encoding(self): + """Test Deep VQC with Angle Encoding.""" + print("\n=== Testing Deep VQC with Angle Encoding (2 epochs) ===") + train(model_type="deep_vqc", encoding=EncodingType.ANGLE, epochs=2) + + def test_noise_aware_angle_encoding(self): + """Test Noise-Aware QNN with Angle Encoding.""" + print("\n=== Testing Noise-Aware QNN with Angle Encoding (2 epochs) ===") + train(model_type="noise_aware", encoding=EncodingType.ANGLE, epochs=2) + + def test_kernel_angle_encoding(self): + """Test Quantum Kernel with Angle Encoding.""" + print("\n=== Testing Quantum Kernel with Angle Encoding (2 epochs) ===") + train(model_type="kernel", encoding=EncodingType.ANGLE, epochs=2) + + def test_deep_vqc_amplitude_encoding(self): + """Test Deep VQC with Amplitude Encoding.""" + print("\n=== Testing Deep VQC with Amplitude Encoding (2 epochs) ===") + train(model_type="deep_vqc", encoding=EncodingType.AMPLITUDE, epochs=2) + + def test_noise_aware_amplitude_encoding(self): + """Test Noise-Aware QNN with Amplitude Encoding.""" + print("\n=== Testing Noise-Aware QNN with Amplitude Encoding (2 epochs) ===") + train(model_type="noise_aware", encoding=EncodingType.AMPLITUDE, epochs=2) + + def test_kernel_amplitude_encoding(self): + """Test Quantum Kernel with Amplitude Encoding.""" + print("\n=== Testing Quantum Kernel with Amplitude Encoding (2 epochs) ===") + train(model_type="kernel", encoding=EncodingType.AMPLITUDE, epochs=2) + + +class TestAllConfigurationsTrainingMoons(unittest.TestCase): + """Test training with 2 epochs for all configurations on moons dataset.""" + + def test_deep_vqc_angle_encoding_moons(self): + print("\n=== Testing Deep VQC with Angle Encoding on Moons (2 epochs) ===") + train(model_type="deep_vqc", encoding=EncodingType.ANGLE, epochs=2, dataset="moons") + + def test_noise_aware_angle_encoding_moons(self): + print("\n=== Testing Noise-Aware QNN with Angle Encoding on Moons (2 epochs) ===") + train(model_type="noise_aware", encoding=EncodingType.ANGLE, epochs=2, dataset="moons") + + def test_kernel_angle_encoding_moons(self): + print("\n=== Testing Quantum Kernel with Angle Encoding on Moons (2 epochs) ===") + train(model_type="kernel", encoding=EncodingType.ANGLE, epochs=2, dataset="moons") + + def test_deep_vqc_amplitude_encoding_moons(self): + print("\n=== Testing Deep VQC with Amplitude Encoding on Moons (2 epochs) ===") + train(model_type="deep_vqc", encoding=EncodingType.AMPLITUDE, epochs=2, dataset="moons") + + def test_noise_aware_amplitude_encoding_moons(self): + print("\n=== Testing Noise-Aware QNN with Amplitude Encoding on Moons (2 epochs) ===") + train(model_type="noise_aware", encoding=EncodingType.AMPLITUDE, epochs=2, dataset="moons") + + def test_kernel_amplitude_encoding_moons(self): + print("\n=== Testing Quantum Kernel with Amplitude Encoding on Moons (2 epochs) ===") + train(model_type="kernel", encoding=EncodingType.AMPLITUDE, epochs=2, dataset="moons") + + +if __name__ == "__main__": + unittest.main() diff --git a/test_qnn_layers.py b/test_qnn_layers.py index 5d87edf..a534fb4 100644 --- a/test_qnn_layers.py +++ b/test_qnn_layers.py @@ -103,6 +103,25 @@ def test_empty_qubit_list(self): result = param_gate_layer("RY", x, qubits) self.assertEqual(len(result), 0) + + def test_default_applies_all_qubits(self): + """Default specify_qubits=None applies gate to every qubit in x.""" + x = torch.tensor([0.1, 0.2, 0.3]) + + result = param_gate_layer("RX", x) + + self.assertEqual(len(result), 3) + self.assertEqual([entry[1][0] for entry in result], [0, 1, 2]) + self.assertAlmostEqual(result[2][2].item(), 0.3, places=5) + + def test_default_matches_explicit_all(self): + """specify_qubits=None should match explicitly listing all qubits.""" + x = torch.tensor([0.7, 0.8]) + + default_layer = param_gate_layer("RZ", x) + explicit_layer = param_gate_layer("RZ", x, (0, 1)) + + self.assertEqual(default_layer, explicit_layer) def test_gate_layer_in_circuit(self): """Test that param_gate_layer output works in a circuit""" diff --git a/test_run_experiments.py b/test_run_experiments.py new file mode 100644 index 0000000..e5db32b --- /dev/null +++ b/test_run_experiments.py @@ -0,0 +1,353 @@ +""" +Unit tests for run_experiments.py + +Tests verify: +- Training runs complete successfully +- CSV files are created with correct format +- Metrics are recorded properly +- File naming conventions are correct +""" + +import unittest +import os +import csv +import shutil +import tempfile +from datetime import datetime + +from qml_training import train, EncodingType +from run_experiments import ( + ensure_data_dir, + save_training_history_csv, + save_summary_csv, + run_all_experiments, + MODEL_TYPES, + DATASETS +) + + +class TestTrainFunction(unittest.TestCase): + """Test the train function with record_metrics=True.""" + + def test_train_returns_metrics_dict(self): + """Verify train returns a dict with expected keys when record_metrics=True.""" + metrics = train( + model_type="deep_vqc", + encoding=EncodingType.ANGLE, + epochs=2, + dataset="moons", + record_metrics=True + ) + + self.assertIsInstance(metrics, dict) + self.assertIn('loss_history', metrics) + self.assertIn('acc_history', metrics) + self.assertIn('final_metrics', metrics) + self.assertIn('model_type', metrics) + self.assertIn('dataset', metrics) + self.assertIn('encoding', metrics) + self.assertIn('epochs', metrics) + + def test_train_loss_history_length(self): + """Verify loss_history has correct number of epochs.""" + epochs = 2 + metrics = train( + model_type="deep_vqc", + encoding=EncodingType.ANGLE, + epochs=epochs, + dataset="moons", + record_metrics=True + ) + + self.assertEqual(len(metrics['loss_history']), epochs) + self.assertEqual(len(metrics['acc_history']), epochs) + + def test_train_accuracy_in_valid_range(self): + """Verify accuracy values are between 0 and 1.""" + metrics = train( + model_type="noise_aware", + encoding=EncodingType.ANGLE, + epochs=2, + dataset="moons", + record_metrics=True + ) + + for acc in metrics['acc_history']: + self.assertGreaterEqual(acc, 0.0) + self.assertLessEqual(acc, 1.0) + + def test_train_final_metrics_keys(self): + """Verify final_metrics contains expected metric keys.""" + metrics = train( + model_type="deep_vqc", + encoding=EncodingType.ANGLE, + epochs=2, + dataset="moons", + record_metrics=True + ) + + final = metrics['final_metrics'] + self.assertIn('accuracy', final) + self.assertIn('precision', final) + self.assertIn('recall', final) + self.assertIn('f1', final) + self.assertIn('roc_auc', final) + + +class TestSaveTrainingHistoryCSV(unittest.TestCase): + """Test the save_training_history_csv function.""" + + def setUp(self): + """Create a temporary directory for test files.""" + self.test_dir = tempfile.mkdtemp() + + def tearDown(self): + """Remove temporary directory.""" + shutil.rmtree(self.test_dir) + + def test_csv_file_created(self): + """Verify CSV file is created.""" + metrics = { + 'loss_history': [1.0, 0.8, 0.6], + 'acc_history': [0.5, 0.7, 0.85] + } + filepath = os.path.join(self.test_dir, "test_history.csv") + + save_training_history_csv(metrics, filepath) + + self.assertTrue(os.path.exists(filepath)) + + def test_csv_has_correct_headers(self): + """Verify CSV has correct column headers.""" + metrics = { + 'loss_history': [1.0, 0.8], + 'acc_history': [0.5, 0.7] + } + filepath = os.path.join(self.test_dir, "test_history.csv") + + save_training_history_csv(metrics, filepath) + + with open(filepath, 'r') as f: + reader = csv.reader(f) + headers = next(reader) + self.assertEqual(headers, ['epoch', 'loss', 'acc']) + + def test_csv_has_correct_row_count(self): + """Verify CSV has correct number of data rows.""" + metrics = { + 'loss_history': [1.0, 0.8, 0.6], + 'acc_history': [0.5, 0.7, 0.85] + } + filepath = os.path.join(self.test_dir, "test_history.csv") + + save_training_history_csv(metrics, filepath) + + with open(filepath, 'r') as f: + reader = csv.reader(f) + rows = list(reader) + # 1 header + 3 data rows + self.assertEqual(len(rows), 4) + + def test_csv_epoch_numbers_correct(self): + """Verify epoch numbers start at 1 and increment.""" + metrics = { + 'loss_history': [1.0, 0.8, 0.6], + 'acc_history': [0.5, 0.7, 0.85] + } + filepath = os.path.join(self.test_dir, "test_history.csv") + + save_training_history_csv(metrics, filepath) + + with open(filepath, 'r') as f: + reader = csv.reader(f) + next(reader) # skip header + for i, row in enumerate(reader, start=1): + self.assertEqual(int(row[0]), i) + + +class TestSaveSummaryCSV(unittest.TestCase): + """Test the save_summary_csv function.""" + + def setUp(self): + """Create a temporary directory for test files.""" + self.test_dir = tempfile.mkdtemp() + + def tearDown(self): + """Remove temporary directory.""" + shutil.rmtree(self.test_dir) + + def test_summary_csv_created(self): + """Verify summary CSV file is created.""" + results = [{ + 'model_type': 'deep_vqc', + 'encoding': 'angle', + 'dataset': 'moons', + 'loss_history': [1.0, 0.5], + 'acc_history': [0.6, 0.8], + 'final_metrics': {'f1': 0.75, 'roc_auc': 0.82} + }] + filepath = os.path.join(self.test_dir, "summary.csv") + + save_summary_csv(results, filepath) + + self.assertTrue(os.path.exists(filepath)) + + def test_summary_csv_headers(self): + """Verify summary CSV has correct headers.""" + results = [{ + 'model_type': 'deep_vqc', + 'encoding': 'angle', + 'dataset': 'moons', + 'loss_history': [0.5], + 'acc_history': [0.8], + 'final_metrics': {'f1': 0.75, 'roc_auc': 0.82} + }] + filepath = os.path.join(self.test_dir, "summary.csv") + + save_summary_csv(results, filepath) + + with open(filepath, 'r') as f: + reader = csv.reader(f) + headers = next(reader) + self.assertEqual(headers, ['model', 'enc', 'data', 'loss', 'acc', 'f1', 'auc']) + + def test_summary_csv_multiple_results(self): + """Verify summary CSV handles multiple results.""" + results = [ + { + 'model_type': 'deep_vqc', + 'encoding': 'angle', + 'dataset': 'moons', + 'loss_history': [0.5], + 'acc_history': [0.8], + 'final_metrics': {'f1': 0.75, 'roc_auc': 0.82} + }, + { + 'model_type': 'noise_aware', + 'encoding': 'amplitude', + 'dataset': 'real', + 'loss_history': [0.6], + 'acc_history': [0.7], + 'final_metrics': {'f1': 0.65, 'roc_auc': 0.72} + } + ] + filepath = os.path.join(self.test_dir, "summary.csv") + + save_summary_csv(results, filepath) + + with open(filepath, 'r') as f: + reader = csv.reader(f) + rows = list(reader) + # 1 header + 2 data rows + self.assertEqual(len(rows), 3) + + +class TestEnsureDataDir(unittest.TestCase): + """Test the ensure_data_dir function.""" + + def test_data_dir_created(self): + """Verify data directory is created if it doesn't exist.""" + data_dir = ensure_data_dir() + self.assertTrue(os.path.isdir(data_dir)) + self.assertTrue(data_dir.endswith("data")) + + +class TestRunAllExperiments(unittest.TestCase): + """Integration tests for run_all_experiments with minimal epochs.""" + + @classmethod + def setUpClass(cls): + """Run experiments once for all tests in this class.""" + # Use minimal settings for speed + import run_experiments + # Temporarily reduce configurations for testing + cls.original_model_types = run_experiments.MODEL_TYPES + cls.original_encodings = run_experiments.ENCODINGS + cls.original_datasets = run_experiments.DATASETS + + # Only test one model, one encoding, one dataset for speed + run_experiments.MODEL_TYPES = ["deep_vqc"] + run_experiments.ENCODINGS = [EncodingType.ANGLE] + run_experiments.DATASETS = ["moons"] + + cls.results = run_all_experiments(epochs=2, T1=100, T2=200) + + @classmethod + def tearDownClass(cls): + """Restore original configurations.""" + import run_experiments + run_experiments.MODEL_TYPES = cls.original_model_types + run_experiments.ENCODINGS = cls.original_encodings + run_experiments.DATASETS = cls.original_datasets + + def test_returns_results_list(self): + """Verify run_all_experiments returns a list.""" + self.assertIsInstance(self.results, list) + + def test_results_not_empty(self): + """Verify at least one result is returned.""" + self.assertGreater(len(self.results), 0) + + def test_result_has_required_keys(self): + """Verify each result has required keys.""" + for result in self.results: + self.assertIn('loss_history', result) + self.assertIn('acc_history', result) + self.assertIn('final_metrics', result) + self.assertIn('model_type', result) + self.assertIn('dataset', result) + self.assertIn('T1', result) + self.assertIn('T2', result) + + def test_csv_files_created(self): + """Verify CSV files are created in data directory.""" + data_dir = ensure_data_dir() + csv_files = [f for f in os.listdir(data_dir) if f.endswith('.csv')] + self.assertGreater(len(csv_files), 0) + + def test_history_csv_format(self): + """Verify history CSV files have correct format.""" + data_dir = ensure_data_dir() + csv_files = [f for f in os.listdir(data_dir) + if f.endswith('.csv') and not f.startswith('summary')] + + if csv_files: + filepath = os.path.join(data_dir, csv_files[0]) + with open(filepath, 'r') as f: + reader = csv.reader(f) + headers = next(reader) + self.assertEqual(headers, ['epoch', 'loss', 'acc']) + + +class TestFileNamingConvention(unittest.TestCase): + """Test that file naming follows the expected convention.""" + + def test_history_filename_format(self): + """Verify history filename contains expected components.""" + data_dir = ensure_data_dir() + csv_files = [f for f in os.listdir(data_dir) + if f.endswith('.csv') and not f.startswith('summary')] + + if csv_files: + filename = csv_files[0] + # Should contain: model_dataset_t1{T1}_t2{T2}_ep{epochs}_{date}.csv + self.assertIn('_t1', filename) + self.assertIn('_t2', filename) + self.assertIn('_ep', filename) + + def test_summary_filename_format(self): + """Verify summary filename contains expected components.""" + data_dir = ensure_data_dir() + summary_files = [f for f in os.listdir(data_dir) + if f.startswith('summary') and f.endswith('.csv')] + + if summary_files: + filename = summary_files[0] + # Should contain: summary_t1{T1}_t2{T2}_ep{epochs}_{date}.csv + self.assertIn('summary_t1', filename) + self.assertIn('_t2', filename) + self.assertIn('_ep', filename) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/test_state_encoding.py b/test_state_encoding.py new file mode 100644 index 0000000..f5e8724 --- /dev/null +++ b/test_state_encoding.py @@ -0,0 +1,193 @@ +""" +Tests for state_encoding and param_gate_layer functions. + +The state_encoding function returns (initial_state, circuit) separately to allow +noise models to apply noise during state preparation. +""" + +import unittest +import torch +import math +from qml_training import state_encoding, EncodingType, param_gate_layer +from quantum_simulator import zero_state, apply_circuit_to_ket, state_to_density + + +class TestHelper(unittest.TestCase): + """Helper class with tensor assertion utilities.""" + def assert_tensors_close(self, actual, expected): + try: + torch.testing.assert_close(actual, expected) + except AssertionError as e: + self.fail(f"\nExpected: {expected}\nReceived: {actual}\nOriginal error: {e}") + + +class TestStateEncoding(TestHelper): + """Tests for state_encoding function.""" + + def test_angle_encoding_returns_tuple(self): + """Test angle encoding returns (state, circuit) tuple.""" + n = 2 + x = torch.tensor([math.pi/4, math.pi/6]) + + result = state_encoding(x, n, encoding=EncodingType.ANGLE) + + self.assertIsInstance(result, tuple) + self.assertEqual(len(result), 2) + state, circuit = result + self.assertIsInstance(circuit, list) + + def test_angle_encoding_initial_state(self): + """Test angle encoding returns |00⟩ initial state.""" + n = 2 + x = torch.tensor([math.pi/4, math.pi/6]) + + state, circuit = state_encoding(x, n, encoding=EncodingType.ANGLE) + + expected = zero_state(n) + self.assert_tensors_close(state, expected) + + def test_angle_encoding_circuit_structure(self): + """Test angle encoding circuit has correct RY gates.""" + n = 2 + x = torch.tensor([math.pi/4, math.pi/6]) + + state, circuit = state_encoding(x, n, encoding=EncodingType.ANGLE) + + self.assertEqual(len(circuit), 2) + for i, gate_tuple in enumerate(circuit): + self.assertIsInstance(gate_tuple, tuple) + self.assertEqual(len(gate_tuple), 3) + + gate_name, qubits, angle = gate_tuple + self.assertEqual(gate_name, "RY") + self.assertIsInstance(qubits, list) + self.assertEqual(len(qubits), 1) + self.assertTrue(torch.isclose(torch.tensor(angle), x[i])) + + def test_amplitude_encoding_returns_custom_state(self): + """Test amplitude encoding returns custom state with empty circuit.""" + n = 2 + x = torch.tensor([0.5, 0.5]) + + try: + state, circuit = state_encoding(x, n, encoding=EncodingType.AMPLITUDE) + + self.assertIsInstance(state, torch.Tensor) + self.assertIsInstance(circuit, list) + self.assertEqual(len(circuit), 0) + + norm = torch.linalg.norm(state) + self.assertAlmostEqual(norm.item(), 1.0, places=5) + except (NotImplementedError, Exception): + self.skipTest("custom_state not fully implemented") + + def test_invalid_encoding_type_raises(self): + """Test that invalid encoding type raises error.""" + n = 2 + x = torch.tensor([0.5, 0.5]) + + with self.assertRaises((ValueError, Exception)): + state_encoding(x, n, encoding="invalid") + + def test_angle_encoding_zero_input(self): + """Test angle encoding with zero input.""" + n = 2 + x = torch.tensor([0.0, 0.0]) + + state, circuit = state_encoding(x, n, encoding=EncodingType.ANGLE) + + expected = zero_state(n) + self.assert_tensors_close(state, expected) + + self.assertEqual(len(circuit), 2) + self.assertTrue(torch.isclose(torch.tensor(circuit[0][2]), torch.tensor(0.0))) + self.assertTrue(torch.isclose(torch.tensor(circuit[1][2]), torch.tensor(0.0))) + + def test_angle_encoding_pi_input(self): + """Test angle encoding with π input.""" + n = 2 + x = torch.tensor([math.pi, math.pi]) + + state, circuit = state_encoding(x, n, encoding=EncodingType.ANGLE) + + expected = zero_state(n) + self.assert_tensors_close(state, expected) + + self.assertTrue(torch.isclose(torch.tensor(circuit[0][2]), torch.tensor(math.pi))) + self.assertTrue(torch.isclose(torch.tensor(circuit[1][2]), torch.tensor(math.pi))) + + def test_angle_encoding_usage_ket_simulation(self): + """Test usage pattern: ket simulation with apply_circuit_to_ket.""" + n = 2 + x = torch.tensor([math.pi/4, math.pi/6]) + + state, circuit = state_encoding(x, n, encoding=EncodingType.ANGLE) + encoded_state = apply_circuit_to_ket(state, circuit, n) + + self.assertEqual(encoded_state.shape, (4,)) + norm = torch.linalg.norm(encoded_state) + self.assertAlmostEqual(norm.item(), 1.0, places=5) + + def test_angle_encoding_usage_density_simulation(self): + """Test usage pattern: density matrix with state_to_density.""" + n = 2 + x = torch.tensor([math.pi/4, math.pi/6]) + + state, circuit = state_encoding(x, n, encoding=EncodingType.ANGLE) + density = state_to_density(state) + + self.assertEqual(density.shape, (4, 4)) + trace = torch.trace(density) + self.assertAlmostEqual(trace.real.item(), 1.0, places=5) + self.assertIsInstance(circuit, list) + + +class TestParamGateLayer(TestHelper): + """Tests for param_gate_layer function.""" + + def test_all_qubits(self): + """Test param_gate_layer with specify_qubits=None.""" + x = torch.tensor([0.1, 0.2]) + gate = "RY" + + op_list = param_gate_layer(gate, x, specify_qubits=None) + + self.assertEqual(len(op_list), 2) + self.assertEqual(op_list[0][0], gate) + self.assertEqual(op_list[1][0], gate) + self.assertEqual(op_list[0][1], [0]) + self.assertEqual(op_list[1][1], [1]) + self.assertTrue(torch.isclose(torch.tensor(op_list[0][2]), x[0])) + self.assertTrue(torch.isclose(torch.tensor(op_list[1][2]), x[1])) + + def test_specified_qubits(self): + """Test param_gate_layer with specific qubits.""" + x = torch.tensor([0.1, 0.2, 0.3]) + gate = "RY" + + op_list = param_gate_layer(gate, x, specify_qubits=(0, 2)) + + self.assertEqual(len(op_list), 2) + self.assertEqual(op_list[0][1], [0]) + self.assertEqual(op_list[1][1], [2]) + self.assertTrue(torch.isclose(torch.tensor(op_list[0][2]), x[0])) + self.assertTrue(torch.isclose(torch.tensor(op_list[1][2]), x[2])) + + def test_gate_format(self): + """Test param_gate_layer returns correct gate format.""" + x = torch.tensor([0.1, 0.2]) + gate = "RZ" + + op_list = param_gate_layer(gate, x, specify_qubits=None) + + for gate_tuple in op_list: + self.assertIsInstance(gate_tuple, tuple) + self.assertEqual(len(gate_tuple), 3) + name, qubits, angle = gate_tuple + self.assertIsInstance(name, str) + self.assertIsInstance(qubits, list) + self.assertTrue(all(isinstance(q, int) for q in qubits)) + + +if __name__ == "__main__": + unittest.main() diff --git a/visualizer.py b/visualizer.py index 2a3bfbb..96a736c 100644 --- a/visualizer.py +++ b/visualizer.py @@ -1,5 +1,180 @@ import matplotlib.pyplot as plt import numpy as np +import os +from datetime import datetime + + +def ensure_plots_dir(): + """Create plots directory if it doesn't exist.""" + plots_dir = os.path.join(os.path.dirname(__file__), "plots") + os.makedirs(plots_dir, exist_ok=True) + return plots_dir + + +def plot_training_curves(loss_history, acc_history, dataset="", model_type="", title=None, + save=False, T1=None, T2=None, epochs=None): + """ + Plot training loss and test accuracy curves over epochs. + + Args: + loss_history: List of loss values per epoch + acc_history: List of accuracy values per epoch + dataset: Name of the dataset (for title) + model_type: Name of the model type (for title) + title: Optional custom title (overrides auto-generated title) + """ + epochs = range(1, len(loss_history) + 1) + + plt.figure(figsize=(10, 4)) + + plt.subplot(1, 2, 1) + plt.plot(epochs, loss_history, label="Train Loss", marker='o', markersize=3) + plt.title(f"{dataset.capitalize()} {model_type.upper()} Train Loss" if not title else f"{title} - Loss") + plt.xlabel("Epoch") + plt.ylabel("Loss") + plt.legend() + plt.grid(True, alpha=0.3) + + plt.subplot(1, 2, 2) + plt.plot(epochs, acc_history, label="Test Accuracy", marker='o', markersize=3, color='green') + plt.title(f"{dataset.capitalize()} {model_type.upper()} Test Accuracy" if not title else f"{title} - Accuracy") + plt.xlabel("Epoch") + plt.ylabel("Accuracy") + plt.legend() + plt.grid(True, alpha=0.3) + + plt.tight_layout() + + if save: + plots_dir = ensure_plots_dir() + timestamp = datetime.now().strftime("%d_%H%M") + t1_str = f"_t1{int(T1)}" if T1 is not None else "" + t2_str = f"_t2{int(T2)}" if T2 is not None else "" + ep_str = f"_ep{epochs}" if epochs is not None else "" + filename = f"{model_type}_{dataset}{t1_str}{t2_str}{ep_str}_{timestamp}.png" + filepath = os.path.join(plots_dir, filename) + plt.savefig(filepath, dpi=150, bbox_inches='tight') + print(f" Plot saved: {filename}") + + plt.show() + + +def plot_multiple_training_curves(metrics_list, labels=None, title="Training Comparison", + save=False, dataset="", T1=None, T2=None, epochs=None): + """ + Plot multiple training runs on the same figure for comparison. + + Args: + metrics_list: List of dicts, each containing 'loss_history' and 'acc_history' + labels: List of labels for each run (optional) + title: Title for the plot + """ + if labels is None: + labels = [f"Run {i+1}" for i in range(len(metrics_list))] + + plt.figure(figsize=(12, 5)) + + plt.subplot(1, 2, 1) + for metrics, label in zip(metrics_list, labels): + epochs = range(1, len(metrics['loss_history']) + 1) + plt.plot(epochs, metrics['loss_history'], label=label, marker='o', markersize=2) + plt.title(f"{title} - Train Loss") + plt.xlabel("Epoch") + plt.ylabel("Loss") + plt.legend() + plt.grid(True, alpha=0.3) + + plt.subplot(1, 2, 2) + for metrics, label in zip(metrics_list, labels): + epochs = range(1, len(metrics['acc_history']) + 1) + plt.plot(epochs, metrics['acc_history'], label=label, marker='o', markersize=2) + plt.title(f"{title} - Test Accuracy") + plt.xlabel("Epoch") + plt.ylabel("Accuracy") + plt.legend() + plt.grid(True, alpha=0.3) + + plt.tight_layout() + + if save: + plots_dir = ensure_plots_dir() + timestamp = datetime.now().strftime("%d_%H%M") + t1_str = f"_t1{int(T1)}" if T1 is not None else "" + t2_str = f"_t2{int(T2)}" if T2 is not None else "" + ep_str = f"_ep{epochs}" if epochs is not None else "" + filename = f"comparison_{dataset}{t1_str}{t2_str}{ep_str}_{timestamp}.png" + filepath = os.path.join(plots_dir, filename) + plt.savefig(filepath, dpi=150, bbox_inches='tight') + print(f" Plot saved: {filename}") + + plt.show() + + +def plot_model_comparison(data_dict, dataset_name="", title=None, + save=False, T1=None, T2=None, epochs=None): + """ + Plot accuracy and loss comparison for 4 model configurations. + + Args: + data_dict: Dict mapping model labels to {'loss_history': [...], 'acc_history': [...]} + Expected keys: 'VQC Angle', 'VQC Amplitude', 'QNN Angle', 'QNN Amplitude' + dataset_name: Name of the dataset for title + title: Optional custom title prefix + save: If True, save plot to plots/ directory + T1: T1 noise parameter (for filename) + T2: T2 noise parameter (for filename) + epochs: Number of epochs (for filename) + """ + fig, axes = plt.subplots(1, 2, figsize=(14, 5)) + + colors = { + 'VQC Angle': '#1f77b4', + 'VQC Amplitude': '#ff7f0e', + 'QNN Angle': '#2ca02c', + 'QNN Amplitude': '#d62728' + } + + title_prefix = title if title else f"{dataset_name.capitalize()} Dataset" + + # Loss plot + ax1 = axes[0] + for label, data in data_dict.items(): + epoch_range = range(1, len(data['loss_history']) + 1) + ax1.plot(epoch_range, data['loss_history'], label=label, + color=colors.get(label, None), linewidth=2, marker='o', markersize=3) + ax1.set_title(f"{title_prefix} - Training Loss") + ax1.set_xlabel("Epoch") + ax1.set_ylabel("Loss") + ax1.legend() + ax1.grid(True, alpha=0.3) + + # Accuracy plot + ax2 = axes[1] + for label, data in data_dict.items(): + epoch_range = range(1, len(data['acc_history']) + 1) + ax2.plot(epoch_range, data['acc_history'], label=label, + color=colors.get(label, None), linewidth=2, marker='o', markersize=3) + ax2.set_title(f"{title_prefix} - Test Accuracy") + ax2.set_xlabel("Epoch") + ax2.set_ylabel("Accuracy") + ax2.legend() + ax2.grid(True, alpha=0.3) + + plt.tight_layout() + + if save: + plots_dir = ensure_plots_dir() + timestamp = datetime.now().strftime("%d_%H%M") + t1_str = f"_t1{int(T1)}" if T1 is not None else "" + t2_str = f"_t2{int(T2)}" if T2 is not None else "" + ep_str = f"_ep{epochs}" if epochs is not None else "" + filename = f"models_{dataset_name}{t1_str}{t2_str}{ep_str}_{timestamp}.png" + filepath = os.path.join(plots_dir, filename) + plt.savefig(filepath, dpi=150, bbox_inches='tight') + print(f" Plot saved: {filename}") + + plt.show() + def plot_measurement_comparison(normal_counts, kraus_counts, title="Measurement comparison"): # All bitstrings that appear in either result