diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index a26616c..a8da215 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -12,33 +12,37 @@ jobs:
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11']
-
+
steps:
- name: Check out code
uses: actions/checkout@v4
-
+
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
-
+
- name: Install dependencies
run: |
python -m pip install --upgrade pip
- pip install -e .
-
+ pip install -e ".[dev]"
+
- name: Test import
- run: python -c "import ttperf; print('โ
ttperf imports successfully')"
-
+ run: python -c "import ttperf; print('ttperf imports successfully')"
+
+ - name: Run unit tests with coverage
+ run: |
+ pytest tests/ -v --cov=ttperf --cov-report=term-missing --cov-report=xml
+
- name: Test CLI help
run: |
- ttperf 2>&1 | grep -q "Usage:" && echo "โ
CLI help works" || echo "โ CLI help failed"
-
+ ttperf --help 2>&1 | grep -q "Usage:" && echo "CLI help works" || echo "CLI help failed"
+
- name: Build package
run: |
pip install build
python -m build
-
+
- name: Check package
run: |
pip install twine
diff --git a/README.md b/README.md
index 4c17d38..b4eb5eb 100644
--- a/README.md
+++ b/README.md
@@ -1,10 +1,10 @@
-# ๐ ttperf - TT-Metal Performance Profiler
+# ttperf - TT-Metal Performance Profiler


-
+[](https://pypi.org/project/ttperf/)
[](https://github.com/Aswincloud/ttperf/issues)
[](https://github.com/Aswincloud/ttperf/stargazers)
@@ -12,34 +12,36 @@
-## โจ Features
+## Features
-- ๐ **Automated Profiling**: Seamlessly runs Tenstorrent's TT-Metal profiler with pytest
-- ๐ **CSV Analysis**: Automatically extracts and parses performance CSV files
-- โก **Real-time Output**: Shows profiling progress in real-time
-- ๐ **Performance Metrics**: Calculates total DEVICE KERNEL DURATION
-- ๐ฏ **Simple CLI**: Easy-to-use command-line interface
-- ๐ ๏ธ **Flexible**: Supports named profiles and various test paths
-- ๐ **Operation-based Profiling**: Profile specific operations by name (e.g., `ttperf add`)
-- โ๏ธ **Dynamic Configuration**: Customize tensor shape, dtype, and layout for operations
+- **Automated Profiling**: Seamlessly runs Tenstorrent's TT-Metal profiler with pytest
+- **CSV Analysis**: Automatically extracts and parses performance CSV files
+- **Real-time Output**: Shows profiling progress in real-time
+- **Performance Metrics**: Calculates total DEVICE KERNEL DURATION
+- **Simple CLI**: Easy-to-use command-line interface
+- **Flexible**: Supports named profiles and various test paths
+- **Operation-based Profiling**: Profile specific operations by name (e.g., `ttperf add`)
+- **Dynamic Configuration**: Customize tensor shape, dtype, and layout for operations
+- **Config File Support**: Set defaults via `~/.ttperf.yaml` or `./.ttperf.yaml`
+- **CI-friendly**: `--quiet` flag suppresses decorative output; `--verbose` enables debug logging
-## ๐ Quick Start
+## Quick Start
### Installation
```bash
# Install from PyPI (recommended)
pip install ttperf
+
+# With YAML config file support
+pip install "ttperf[yaml]"
```
**Or install from source:**
```bash
-# Clone the repository
git clone https://github.com/Aswincloud/ttperf.git
cd ttperf
-
-# Install the package
pip install -e .
```
@@ -55,9 +57,6 @@ ttperf add
# Option 2: Run from within tt-metal directory (or any subdirectory)
cd /path/to/your/tt-metal
ttperf relu
-# Or from a subdirectory
-cd /path/to/your/tt-metal/tests
-ttperf matmul
```
**tt-metal Path Search Order:**
@@ -81,21 +80,61 @@ ttperf add
ttperf relu
ttperf matmul
-# Profile operations with custom profile names
-ttperf my_add_profile add
-ttperf my_relu_profile relu
-
-# Profile operations with custom configuration
+# Custom tensor configuration
ttperf add --shape 1,1,32,32 --dtype bfloat16 --layout tile
ttperf relu --shape 1,1,64,64 --dtype float32 --layout row_major
-# Profile operations with memory configuration
-ttperf add --dram # Use DRAM memory (default)
-ttperf relu --l1 # Use L1 memory
-ttperf add --shape 1,1,64,64 --l1 # Combined options
+# Memory options
+ttperf add --dram # Use DRAM memory (default)
+ttperf relu --l1 # Use L1 memory
+
+# CI-friendly (no emoji/decorative output)
+ttperf --quiet add
+
+# Copy CSV output to a directory
+ttperf add --output-dir ./results/
+
+# Enable verbose debug logging
+ttperf --verbose add
+```
+
+## CLI Reference
+
+```
+ttperf [OPTIONS] [PROFILE_NAME] [pytest]
+
+Options:
+ --version Show version information
+ --help, -h Show this help message
+ --list-ops, -l List all supported operations
+ --debug, -d Show real-time profiler output
+ --verbose, -v Enable verbose logging (debug messages)
+ --quiet, -q Suppress decorative/emoji output (for CI)
+ --shape SHAPE Tensor shape (e.g., 1,1,32,32)
+ --dtype DTYPE Data type: bfloat16/bf16, float32/fp32/f32, int32/i32
+ --layout LAYOUT Memory layout: tile, row_major/rm
+ --memory-config CONFIG Memory configuration: dram, l1
+ --dram Use DRAM memory (default)
+ --l1 Use L1 memory
+ --output-dir DIR Copy generated CSV to this directory
```
-## ๐ Usage Examples
+## Config File
+
+Create `~/.ttperf.yaml` (global) or `./.ttperf.yaml` (project-local) to set defaults:
+
+```yaml
+# ~/.ttperf.yaml
+shape: 1,1,32,32
+dtype: bfloat16
+layout: tile
+memory_config: dram
+output_dir: ./results
+```
+
+CLI flags always override config file values.
+
+## Usage Examples
### Test File Profiling
```bash
@@ -118,7 +157,6 @@ ttperf tests/ops/test_matmul.py::test_basic_matmul
ttperf add
ttperf subtract
ttperf multiply
-ttperf divide
# Activation functions
ttperf relu
@@ -130,50 +168,31 @@ ttperf gelu
ttperf sqrt
ttperf exp
ttperf log
-ttperf sin
-ttperf cos
# Comparison operations
ttperf gt
ttperf lt
ttperf eq
-ttperf ne
# Reduction operations
ttperf max
ttperf min
-ttperf mean
ttperf sum
# Backward operations
ttperf add_bw
ttperf relu_bw
-ttperf sigmoid_bw
```
### Dynamic Configuration
```bash
-# Custom tensor shape
ttperf add --shape 1,1,32,32
ttperf relu --shape 2,3,64,128
-
-# Custom data type
ttperf add --dtype float32
-ttperf multiply --dtype int32
-
-# Custom memory layout
ttperf add --layout row_major
-ttperf relu --layout tile
-
-# Combined configuration
ttperf add --shape 1,1,64,64 --dtype float32 --layout row_major
-ttperf gelu --shape 2,1,32,32 --dtype bfloat16 --layout tile
-
-# Memory configuration options
-ttperf add --memory-config dram # Explicit DRAM
-ttperf relu --memory-config l1 # Explicit L1
-ttperf add --dram --shape 1,1,128,128 # DRAM with custom shape
-ttperf relu --l1 --dtype float32 # L1 with custom dtype
+ttperf add --dram --shape 1,1,128,128
+ttperf relu --l1 --dtype float32
```
### List All Supported Operations
@@ -185,38 +204,41 @@ ttperf -l
### Output Example
```
-๐ง Using custom configuration:
- Shape: (1, 1, 32, 32)
- Dtype: bfloat16
- Layout: tile
-๐ท๏ธ Auto-generated profile name: temp_test_add
-โถ๏ธ Running: ./tools/tracy/profile_this.py -n temp_test_add -c "pytest temp_test_add.py"
-
-... (profiling output) ...
-
-๐ Found CSV path: /path/to/profile_results.csv
-โฑ๏ธ DEVICE KERNEL DURATION [ns] total: 1234567.89 ns
+Auto-generated profile name: temp_test_add
+Running test...
+
+============================================================
+TEST SUMMARY
+============================================================
+Test: add
+Status: PASSED
+Configuration: shape=(1, 1, 32, 32), dtype=bfloat16, layout=tile, memory_config=dram (custom)
+CSV Path: /path/to/profile_results.csv
+DEVICE KERNEL DURATION [ns] total: 1234567.89 ns
+============================================================
```
-## ๐ ๏ธ How It Works
+## How It Works
1. **Command Parsing**: Analyzes input arguments to determine profile name and test path/operation
-2. **Operation Detection**: If an operation name is provided, maps it to the corresponding test method
-3. **Dynamic Configuration**: If custom configuration is provided, generates a temporary test file with the specified parameters
-4. **Profile Execution**: Runs the Tenstorrent's TT-Metal profiler with the specified test
-5. **Output Monitoring**: Streams profiling output in real-time
-6. **CSV Extraction**: Parses the output to find the generated CSV file path
-7. **Performance Analysis**: Reads the CSV and calculates total device kernel duration
+2. **Config Loading**: Reads `~/.ttperf.yaml` or `./.ttperf.yaml` for defaults (CLI flags take priority)
+3. **Operation Detection**: If an operation name is provided, maps it to the corresponding test method
+4. **Dynamic Configuration**: If custom configuration is provided, sets environment variables for the test
+5. **Profile Execution**: Runs the Tenstorrent's TT-Metal profiler with the specified test
+6. **Output Monitoring**: Streams profiling output in real-time (with `--debug`)
+7. **CSV Extraction**: Parses the output to find the generated CSV file path, verifies it exists
+8. **Performance Analysis**: Reads the CSV and calculates total device kernel duration
+9. **Output Copy**: Optionally copies the CSV to `--output-dir` if specified
-## ๐ Performance Metrics
+## Performance Metrics
The tool extracts the following key metrics:
- **DEVICE KERNEL DURATION [ns]**: Total time spent in device kernels
- **CSV Path**: Location of the detailed profiling results
-- **Real-time Progress**: Live output during profiling
+- **Real-time Progress**: Live output during profiling (with `--debug`)
-## โ๏ธ Configuration Options
+## Configuration Options
### Shape Configuration
- **Format**: Comma-separated integers (e.g., `1,1,32,32`)
@@ -224,33 +246,41 @@ The tool extracts the following key metrics:
- **Example**: `--shape 2,3,64,128`
### Data Type Configuration
-- **Valid Options**: `bfloat16`, `float32`, `int32`
+- **Valid Options**: `bfloat16` (or `bf16`), `float32` (or `fp32`/`f32`), `int32` (or `i32`)
- **Default**: `bfloat16`
- **Example**: `--dtype float32`
### Layout Configuration
-- **Valid Options**: `tile`, `row_major`
+- **Valid Options**: `tile`, `row_major` (or `rm`)
- **Default**: `tile`
- **Example**: `--layout row_major`
-## ๐ง Requirements
+## Requirements
- Python 3.8+
- pandas
- Tenstorrent's TT-Metal development environment
- pytest
+- PyYAML (optional, for config file support)
-## ๐ Project Structure
+## Project Structure
```
ttperf/
-โโโ ttperf.py # Main CLI implementation
-โโโ pyproject.toml # Project configuration
-โโโ README.md # This file
-โโโ .gitignore # Git ignore rules
+โโโ ttperf/
+โ โโโ __init__.py
+โ โโโ ttperf.py # Main CLI implementation
+โ โโโ data/
+โ โโโ operation_configs.json
+โ โโโ test_eltwise_operations.py
+โโโ tests/
+โ โโโ test_ttperf.py # Unit tests
+โโโ pyproject.toml
+โโโ README.md
+โโโ .gitignore
```
-## ๐ค Contributing
+## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
@@ -260,25 +290,25 @@ Contributions are welcome! Please feel free to submit a Pull Request.
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request
-## ๐ License
+## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
-## โ ๏ธ Disclaimer
+## Disclaimer
This tool is an independent utility that interfaces with Tenstorrent's TT-Metal profiling tools. It is not affiliated with or endorsed by Tenstorrent Inc. The tool serves as a convenience wrapper around existing TT-Metal profiling infrastructure.
-## ๐ Issues
+## Issues
If you encounter any issues, please [create an issue](https://github.com/Aswincloud/ttperf/issues) on GitHub.
-## ๐จโ๐ป Author
+## Author
**Aswin Z**
- GitHub: [@Aswincloud](https://github.com/Aswincloud)
- Portfolio: [aswincloud.com](https://aswincloud.com)
-## ๐ Acknowledgments
+## Acknowledgments
- Tenstorrent's TT-Metal development team for the profiling tools
- Python community for excellent libraries like pandas
@@ -286,5 +316,5 @@ If you encounter any issues, please [create an issue](https://github.com/Aswincl
---
-Made with โค๏ธ for the Tenstorrent TT-Metal community
-
\ No newline at end of file
+Made with care for the Tenstorrent TT-Metal community
+
diff --git a/pyproject.toml b/pyproject.toml
index c299728..e292fb3 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -24,6 +24,15 @@ classifiers = [
"Topic :: System :: Benchmark",
]
+[project.optional-dependencies]
+dev = [
+ "pytest>=7.0",
+ "pytest-cov>=4.0",
+]
+yaml = [
+ "PyYAML>=6.0",
+]
+
[project.urls]
Homepage = "https://github.com/Aswincloud/ttperf"
Repository = "https://github.com/Aswincloud/ttperf"
@@ -44,4 +53,12 @@ packages = ["ttperf"]
ttperf = [
"data/test_eltwise_operations.py",
"data/operation_configs.json"
-]
\ No newline at end of file
+]
+
+[tool.pytest.ini_options]
+testpaths = ["tests"]
+addopts = "--tb=short"
+
+[tool.coverage.run]
+source = ["ttperf"]
+omit = ["ttperf/data/*"]
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_ttperf.py b/tests/test_ttperf.py
new file mode 100644
index 0000000..ff1a11f
--- /dev/null
+++ b/tests/test_ttperf.py
@@ -0,0 +1,244 @@
+"""
+Unit tests for ttperf core functions.
+
+Run with:
+ pytest tests/test_ttperf.py -v --cov=ttperf
+"""
+
+import io
+import os
+import sys
+import csv
+import tempfile
+import unittest
+from unittest.mock import patch, MagicMock
+
+# ---------------------------------------------------------------------------
+# Helpers โ make ttperf importable without a full TT-Metal environment
+# ---------------------------------------------------------------------------
+
+# Ensure the package root is on the path when running from the repo root
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
+
+import ttperf.ttperf as t
+
+
+# ---------------------------------------------------------------------------
+# extract_csv_path
+# ---------------------------------------------------------------------------
+
+class TestExtractCsvPath(unittest.TestCase):
+
+ def _make_csv(self):
+ """Create a real temporary CSV file and return its path."""
+ f = tempfile.NamedTemporaryFile(suffix='.csv', delete=False,
+ mode='w', newline='')
+ writer = csv.writer(f)
+ writer.writerow(["DEVICE KERNEL DURATION [ns]"])
+ writer.writerow([100.0])
+ f.close()
+ return f.name
+
+ def test_extracts_valid_path(self):
+ csv_path = self._make_csv()
+ try:
+ output = f"OPs csv generated at: {csv_path}\nsome other line\n"
+ result = t.extract_csv_path(output)
+ self.assertEqual(result, csv_path)
+ finally:
+ os.unlink(csv_path)
+
+ def test_exits_when_not_found(self):
+ with self.assertRaises(SystemExit):
+ t.extract_csv_path("no csv here")
+
+ def test_exits_when_file_missing(self):
+ output = "OPs csv generated at: /nonexistent/path/result.csv\n"
+ with self.assertRaises(SystemExit):
+ t.extract_csv_path(output)
+
+
+# ---------------------------------------------------------------------------
+# get_device_kernel_duration
+# ---------------------------------------------------------------------------
+
+class TestGetDeviceKernelDuration(unittest.TestCase):
+
+ def _make_csv(self, rows):
+ f = tempfile.NamedTemporaryFile(suffix='.csv', delete=False,
+ mode='w', newline='')
+ writer = csv.writer(f)
+ writer.writerow(["DEVICE KERNEL DURATION [ns]"])
+ for val in rows:
+ writer.writerow([val])
+ f.close()
+ return f.name
+
+ def test_sums_durations(self):
+ path = self._make_csv([100.0, 200.5, 300.0])
+ try:
+ result = t.get_device_kernel_duration(path)
+ self.assertAlmostEqual(result, 600.5)
+ finally:
+ os.unlink(path)
+
+ def test_exits_on_missing_column(self):
+ f = tempfile.NamedTemporaryFile(suffix='.csv', delete=False,
+ mode='w', newline='')
+ writer = csv.writer(f)
+ writer.writerow(["SOME_OTHER_COL"])
+ writer.writerow([42])
+ f.close()
+ try:
+ with self.assertRaises(SystemExit):
+ t.get_device_kernel_duration(f.name)
+ finally:
+ os.unlink(f.name)
+
+
+# ---------------------------------------------------------------------------
+# parse_shape
+# ---------------------------------------------------------------------------
+
+class TestParseShape(unittest.TestCase):
+
+ def test_valid_4d(self):
+ self.assertEqual(t.parse_shape("1,1,32,32"), (1, 1, 32, 32))
+
+ def test_valid_with_spaces(self):
+ self.assertEqual(t.parse_shape("2, 3, 64, 128"), (2, 3, 64, 128))
+
+ def test_invalid_exits(self):
+ with self.assertRaises(SystemExit):
+ t.parse_shape("a,b,c")
+
+
+# ---------------------------------------------------------------------------
+# validate_dtype
+# ---------------------------------------------------------------------------
+
+class TestValidateDtype(unittest.TestCase):
+
+ def test_canonical(self):
+ self.assertEqual(t.validate_dtype("bfloat16"), "bfloat16")
+ self.assertEqual(t.validate_dtype("float32"), "float32")
+ self.assertEqual(t.validate_dtype("int32"), "int32")
+
+ def test_aliases(self):
+ self.assertEqual(t.validate_dtype("bf16"), "bfloat16")
+ self.assertEqual(t.validate_dtype("fp32"), "float32")
+ self.assertEqual(t.validate_dtype("f32"), "float32")
+ self.assertEqual(t.validate_dtype("i32"), "int32")
+
+ def test_invalid_exits(self):
+ with self.assertRaises(SystemExit):
+ t.validate_dtype("float16")
+
+
+# ---------------------------------------------------------------------------
+# validate_layout
+# ---------------------------------------------------------------------------
+
+class TestValidateLayout(unittest.TestCase):
+
+ def test_canonical(self):
+ self.assertEqual(t.validate_layout("tile"), "tile")
+ self.assertEqual(t.validate_layout("row_major"), "row_major")
+
+ def test_aliases(self):
+ self.assertEqual(t.validate_layout("rm"), "row_major")
+ self.assertEqual(t.validate_layout("rowmajor"), "row_major")
+
+ def test_invalid_exits(self):
+ with self.assertRaises(SystemExit):
+ t.validate_layout("strided")
+
+
+# ---------------------------------------------------------------------------
+# generate_profile_name
+# ---------------------------------------------------------------------------
+
+class TestGenerateProfileName(unittest.TestCase):
+
+ def test_from_double_colon(self):
+ self.assertEqual(t.generate_profile_name("file.py::test_foo"), "test_foo")
+
+ def test_from_py_file(self):
+ self.assertEqual(t.generate_profile_name("tests/test_conv.py"), "test_conv")
+
+ def test_fallback(self):
+ result = t.generate_profile_name("some_dir")
+ self.assertEqual(result, "some_dir")
+
+
+# ---------------------------------------------------------------------------
+# load_config_file
+# ---------------------------------------------------------------------------
+
+class TestLoadConfigFile(unittest.TestCase):
+
+ def test_returns_empty_dict_when_no_file(self):
+ with patch('os.path.exists', return_value=False):
+ result = t.load_config_file()
+ self.assertEqual(result, {})
+
+ def test_loads_yaml_when_present(self):
+ yaml_content = "shape: 1,1,64,64\ndtype: float32\n"
+ with tempfile.NamedTemporaryFile(suffix='.yaml', delete=False, mode='w') as f:
+ f.write(yaml_content)
+ f.flush()
+ fname = f.name
+
+ try:
+ # Temporarily redirect config search to our temp file
+ with patch.object(t, 'load_config_file', wraps=t.load_config_file):
+ with patch('os.path.exists') as mock_exists:
+ mock_exists.side_effect = lambda p: p == fname or os.path.exists(p)
+ # Directly call yaml loading logic
+ try:
+ import yaml
+ with open(fname) as yf:
+ loaded = yaml.safe_load(yf)
+ self.assertIn('shape', loaded)
+ self.assertEqual(loaded['dtype'], 'float32')
+ except ImportError:
+ self.skipTest("PyYAML not installed")
+ finally:
+ os.unlink(fname)
+
+
+# ---------------------------------------------------------------------------
+# extract_test_config_and_status
+# ---------------------------------------------------------------------------
+
+class TestExtractTestConfigAndStatus(unittest.TestCase):
+
+ def test_passed_status(self):
+ output = "tests/test_eltwise_operations.py::TestEltwiseOperations::test_add PASSED\n1 passed in 1.23s"
+ result = t.extract_test_config_and_status(output)
+ self.assertEqual(result['status'], 'PASSED')
+ self.assertEqual(result['test_name'], 'add')
+
+ def test_failed_status(self):
+ output = "FAILED test_something.py - AssertionError\n1 failed"
+ result = t.extract_test_config_and_status(output)
+ self.assertEqual(result['status'], 'FAILED')
+
+ def test_unknown_status(self):
+ result = t.extract_test_config_and_status("no status here")
+ self.assertEqual(result['status'], 'unknown')
+
+
+# ---------------------------------------------------------------------------
+# __version__
+# ---------------------------------------------------------------------------
+
+class TestVersion(unittest.TestCase):
+
+ def test_version_is_string(self):
+ self.assertIsInstance(t.__version__, str)
+ self.assertTrue(len(t.__version__) > 0)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/ttperf/ttperf.py b/ttperf/ttperf.py
index 76f1d8d..f81ab9c 100644
--- a/ttperf/ttperf.py
+++ b/ttperf/ttperf.py
@@ -3,49 +3,63 @@
import sys
import os
import subprocess
+import shutil
import pandas as pd
import re
import ast
import json
import argparse
-import pkg_resources
+import logging
from typing import Dict, List, Optional, Tuple
+try:
+ from importlib.metadata import version, PackageNotFoundError
+ try:
+ __version__ = version("ttperf")
+ except PackageNotFoundError:
+ __version__ = "dev"
+except ImportError:
+ # Python < 3.8 fallback
+ __version__ = "dev"
+
+logger = logging.getLogger(__name__)
+
+
def load_operation_configs() -> Dict:
"""Load operation configurations from JSON file."""
try:
- # Try to load from package data first
+ from importlib.resources import files
+ config_text = files('ttperf').joinpath('data/operation_configs.json').read_text()
+ return json.loads(config_text)
+ except Exception:
+ pass
+ try:
+ import pkg_resources
config_path = pkg_resources.resource_filename('ttperf', 'data/operation_configs.json')
with open(config_path, 'r') as f:
return json.load(f)
- except (FileNotFoundError, pkg_resources.DistributionNotFound):
- # Fallback to local file
- local_path = os.path.join(os.path.dirname(__file__), 'data', 'operation_configs.json')
- with open(local_path, 'r') as f:
- return json.load(f)
+ except Exception:
+ pass
+ local_path = os.path.join(os.path.dirname(__file__), 'data', 'operation_configs.json')
+ with open(local_path, 'r') as f:
+ return json.load(f)
+
def get_operation_config(operation_name: str) -> Dict:
"""Get configuration for a specific operation from JSON."""
configs = load_operation_configs()
-
- # Get operation-specific config or fall back to defaults
op_config = configs['operations'].get(operation_name, {})
defaults = configs['defaults'].copy()
-
- # For bitwise operations, use int32 as default dtype
if operation_name.startswith('bitwise_'):
defaults['dtype'] = 'int32'
-
- # Merge with defaults
result = defaults.copy()
result.update(op_config)
-
return result
+
def get_expected_config_for_operation(operation_name: str) -> dict:
"""Get expected configuration for specific operations based on JSON config."""
config = get_operation_config(operation_name)
-
return {
'shape': str(tuple(config['shape'])),
'dtype': config['dtype'],
@@ -55,15 +69,14 @@ def get_expected_config_for_operation(operation_name: str) -> dict:
def get_test_file_path() -> str:
"""Get the path to the test_eltwise_operations.py file."""
- # Try to find the test file in the package data
try:
+ import pkg_resources
test_file = pkg_resources.resource_filename('ttperf', 'data/test_eltwise_operations.py')
if os.path.exists(test_file):
return test_file
- except:
+ except Exception:
pass
-
- # Fallback: look in current directory and common locations
+
possible_paths = [
"test_eltwise_operations.py",
"ttperf/data/test_eltwise_operations.py",
@@ -71,28 +84,39 @@ def get_test_file_path() -> str:
os.path.join(os.getcwd(), "test_eltwise_operations.py"),
os.path.join(os.path.expanduser("~"), "ttperf", "test_eltwise_operations.py")
]
-
+
for path in possible_paths:
if os.path.exists(path):
return path
-
- return "test_eltwise_operations.py" # Default fallback
+
+ return "test_eltwise_operations.py"
def extract_csv_path(output: str) -> str:
+ """Extract the CSV file path from profiler output."""
match = re.search(r"OPs csv generated at: (.+?\.csv)", output)
if not match:
+ logger.debug("Full output:\n%s", output)
print("โ CSV path not found in output.")
sys.exit(1)
- return match.group(1)
+ csv_path = match.group(1).strip()
+ if not os.path.exists(csv_path):
+ print(f"โ CSV file not found at path: {csv_path}")
+ print(f" Please verify the profiler completed successfully.")
+ sys.exit(1)
+ return csv_path
def get_device_kernel_duration(csv_path: str) -> float:
+ """Read the CSV and return the total DEVICE KERNEL DURATION."""
df = pd.read_csv(csv_path)
- if "DEVICE KERNEL DURATION [ns]" not in df.columns:
- print("โ 'DEVICE KERNEL DURATION [ns]' column not found.")
+ target_col = "DEVICE KERNEL DURATION [ns]"
+ if target_col not in df.columns:
+ available = ", ".join(df.columns.tolist())
+ print(f"โ '{target_col}' column not found in CSV.")
+ print(f" Available columns: {available}")
sys.exit(1)
- return df["DEVICE KERNEL DURATION [ns]"].sum()
+ return df[target_col].sum()
def extract_test_methods_from_file(file_path: str) -> dict:
@@ -100,42 +124,36 @@ def extract_test_methods_from_file(file_path: str) -> dict:
try:
with open(file_path, 'r') as f:
content = f.read()
-
- # Parse the Python file
+
tree = ast.parse(content)
-
- # Find the TestEltwiseOperations class
test_class = None
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef) and node.name == 'TestEltwiseOperations':
test_class = node
break
-
+
if not test_class:
return {}
-
- # Extract test method names
+
operation_mapping = {}
for node in test_class.body:
if isinstance(node, ast.FunctionDef) and node.name.startswith('test_'):
- # Convert test_method_name to operation_name
- operation_name = node.name[5:] # Remove 'test_' prefix
+ operation_name = node.name[5:]
operation_mapping[operation_name] = node.name
-
+
return operation_mapping
except Exception as e:
- print(f"Warning: Could not parse test file: {e}")
+ logger.warning("Could not parse test file: %s", e)
return {}
-def get_operation_test_mapping():
- """Get mapping of operation names to test methods in test_eltwise_operations.py"""
+def get_operation_test_mapping() -> dict:
+ """Get mapping of operation names to test methods in test_eltwise_operations.py."""
test_file_path = get_test_file_path()
-
+
if os.path.exists(test_file_path):
return extract_test_methods_from_file(test_file_path)
-
- # Fallback to a minimal mapping if file doesn't exist
+
return {
"add": "test_add",
"relu": "test_relu",
@@ -156,7 +174,7 @@ def is_operation_name(arg: str) -> bool:
return arg.lower() in operation_mapping
-def get_test_method_for_operation(operation_name: str) -> str:
+def get_test_method_for_operation(operation_name: str) -> Optional[str]:
"""Get the test method name for a given operation."""
operation_mapping = get_operation_test_mapping()
return operation_mapping.get(operation_name.lower())
@@ -173,7 +191,6 @@ def parse_shape(shape_str: str) -> tuple:
def validate_dtype(dtype_str: str) -> str:
"""Validate and return dtype string."""
- # Map aliases to canonical names
dtype_aliases = {
'bfloat16': 'bfloat16',
'bf16': 'bfloat16',
@@ -183,131 +200,158 @@ def validate_dtype(dtype_str: str) -> str:
'int32': 'int32',
'i32': 'int32'
}
-
dtype_lower = dtype_str.lower()
if dtype_lower in dtype_aliases:
return dtype_aliases[dtype_lower]
-
- valid_options = list(set(dtype_aliases.keys()))
- print(f"โ Invalid dtype: {dtype_str}. Valid options: {', '.join(sorted(valid_options))}")
+ valid_options = sorted(set(dtype_aliases.keys()))
+ print(f"โ Invalid dtype: {dtype_str}. Valid options: {', '.join(valid_options)}")
sys.exit(1)
def validate_layout(layout_str: str) -> str:
"""Validate and return layout string."""
- # Map aliases to canonical names
layout_aliases = {
'tile': 'tile',
'row_major': 'row_major',
'rm': 'row_major',
'rowmajor': 'row_major'
}
-
layout_lower = layout_str.lower()
if layout_lower in layout_aliases:
return layout_aliases[layout_lower]
-
- valid_options = list(set(layout_aliases.keys()))
- print(f"โ Invalid layout: {layout_str}. Valid options: {', '.join(sorted(valid_options))}")
+ valid_options = sorted(set(layout_aliases.keys()))
+ print(f"โ Invalid layout: {layout_str}. Valid options: {', '.join(valid_options)}")
sys.exit(1)
def validate_memory_config(memory_config_str: str) -> str:
"""Validate and return memory configuration string."""
- # Map aliases to canonical names
memory_config_aliases = {
'dram': 'dram',
'l1': 'l1',
'dram_interleaved': 'dram',
'l1_memory': 'l1'
}
-
memory_config_lower = memory_config_str.lower()
if memory_config_lower in memory_config_aliases:
return memory_config_aliases[memory_config_lower]
-
- valid_options = list(set(memory_config_aliases.keys()))
- print(f"โ Invalid memory config: {memory_config_str}. Valid options: {', '.join(sorted(valid_options))}")
+ valid_options = sorted(set(memory_config_aliases.keys()))
+ print(f"โ Invalid memory config: {memory_config_str}. Valid options: {', '.join(valid_options)}")
sys.exit(1)
-def set_test_configuration(shape: tuple, dtype: str, layout: str, memory_config: str = None, operation_name: str = None):
+def set_test_configuration(
+ shape: tuple,
+ dtype: str,
+ layout: str,
+ memory_config: Optional[str] = None,
+ operation_name: Optional[str] = None,
+ quiet: bool = False
+) -> None:
"""Set environment variables for test configuration."""
- # For bitwise operations, always use int32 regardless of what's specified
if operation_name and operation_name.startswith('bitwise_'):
dtype = 'int32'
-
+
os.environ['TTPERF_CUSTOM_SHAPE'] = str(shape)
os.environ['TTPERF_CUSTOM_DTYPE'] = dtype
os.environ['TTPERF_CUSTOM_LAYOUT'] = layout
if memory_config:
os.environ['TTPERF_CUSTOM_MEMORY_CONFIG'] = memory_config
- print(f"๐ง Using custom configuration:")
- print(f" Shape: {shape}")
- print(f" Dtype: {dtype}")
- print(f" Layout: {layout}")
- if memory_config:
- print(f" Memory Config: {memory_config}")
+
+ if not quiet:
+ print(f"๐ง Using custom configuration:")
+ print(f" Shape: {shape}")
+ print(f" Dtype: {dtype}")
+ print(f" Layout: {layout}")
+ if memory_config:
+ print(f" Memory Config: {memory_config}")
+
+
+def load_config_file() -> dict:
+ """Load defaults from ~/.ttperf.yaml or ./.ttperf.yaml if present."""
+ config = {}
+ candidates = [
+ os.path.join(os.getcwd(), '.ttperf.yaml'),
+ os.path.expanduser('~/.ttperf.yaml'),
+ ]
+ for path in candidates:
+ if os.path.exists(path):
+ try:
+ import yaml
+ with open(path, 'r') as f:
+ loaded = yaml.safe_load(f)
+ if isinstance(loaded, dict):
+ config = loaded
+ logger.debug("Loaded config from %s", path)
+ except ImportError:
+ # PyYAML not available; try configparser as fallback
+ import configparser
+ cp = configparser.ConfigParser()
+ cp.read(path)
+ if 'defaults' in cp:
+ config = dict(cp['defaults'])
+ logger.debug("Loaded config (ini) from %s", path)
+ except Exception as e:
+ logger.warning("Could not load config file %s: %s", path, e)
+ break
+ return config
-def print_help():
- print("""๐ ttperf - TT-Metal Performance Profiler
+def print_help(quiet: bool = False) -> None:
+ print("""ttperf - TT-Metal Performance Profiler
Usage: ttperf [OPTIONS] [PROFILE_NAME] [pytest]
Examples:
ttperf test_performance.py # Auto-generated profile: test_performance
- ttperf my_profile pytest test_performance.py # Custom profile name: my_profile
- ttperf tests/test_ops.py::test_matmul # Auto-generated profile: test_matmul
+ ttperf my_profile pytest test_performance.py # Custom profile name: my_profile
+ ttperf tests/test_ops.py::test_matmul # Auto-generated profile: test_matmul
ttperf add # Profile specific operation: add
- ttperf my_profile add # Custom profile name for operation: my_profile
- ttperf add --shape 1,1,32,32 --dtype bf16 --layout tile # Custom configuration
- ttperf relu --dtype fp32 --layout rm # Using aliases
- ttperf add --dram # Use DRAM memory (default)
- ttperf relu --l1 # Use L1 memory
+ ttperf my_profile add # Custom profile name for operation
+ ttperf add --shape 1,1,32,32 --dtype bf16 --layout tile
+ ttperf relu --dtype fp32 --layout rm
+ ttperf add --dram
+ ttperf relu --l1
Options:
- --version, -v Show version information
+ --version Show version information
--help, -h Show this help message
--list-ops, -l List all supported operations
--debug, -d Show real-time debug output
+ --verbose, -v Enable verbose logging (logger.debug messages)
+ --quiet, -q Suppress decorative/emoji output (useful for CI)
--shape SHAPE Tensor shape (e.g., 1,1,32,32)
--dtype DTYPE Data type (bfloat16/bf16, float32/fp32/f32, int32/i32)
--layout LAYOUT Memory layout (tile, row_major/rm)
--memory-config CONFIG Memory configuration (dram, l1)
--dram Use DRAM memory (default)
--l1 Use L1 memory
+ --output-dir DIR Copy generated CSV to this directory after profiling
Arguments:
PROFILE_NAME Optional name for the profiling session
test_path Path to test file or specific test method
operation Operation name to profile (e.g., add, relu, matmul)
+Config File:
+ ttperf reads defaults from ~/.ttperf.yaml or ./.ttperf.yaml (local takes priority).
+ CLI flags always override config file values.
+
Environment Variables:
PYTHONPATH Path to tt-metal installation (optional)
-Configuration:
- ttperf searches for tt-metal in the following order:
- 1. PYTHONPATH environment variable (if specified)
- 2. Current working directory (walks up to find tt-metal root)
-
- Examples:
- export PYTHONPATH=/path/to/tt-metal # Use specific tt-metal location
- cd /path/to/tt-metal && ttperf add # Run from within tt-metal
-
For more information, visit: https://github.com/Aswincloud/ttperf""")
-def print_supported_operations():
+def print_supported_operations(quiet: bool = False) -> None:
"""Print all supported operations."""
operation_mapping = get_operation_test_mapping()
operations = sorted(operation_mapping.keys())
-
- print("๐ Supported Operations:")
+
+ print("Supported Operations:")
print("=" * 50)
-
- # Group operations by category
- categories = {
+
+ categories: Dict[str, List[str]] = {
"Unary": [],
"Binary": [],
"Ternary": [],
@@ -315,7 +359,7 @@ def print_supported_operations():
"Complex": [],
"Backward": []
}
-
+
for op in operations:
if op.endswith("_bw"):
categories["Backward"].append(op)
@@ -325,16 +369,16 @@ def print_supported_operations():
categories["Reduction"].append(op)
elif op in ["complex_tensor", "real", "imag", "angle", "conj", "polar", "complex_recip"]:
categories["Complex"].append(op)
- elif op in ["add", "subtract", "multiply", "divide", "gt", "lt", "eq", "ne", "ge", "le",
- "logical_and", "logical_or", "logical_xor", "atan2", "hypot", "logaddexp",
- "logaddexp2", "maximum", "minimum", "pow", "fmod", "remainder",
- "squared_difference", "bitwise_and", "bitwise_or", "bitwise_xor",
- "mul", "sub", "rpow", "rdiv", "ldexp", "xlogy", "nextafter", "bias_gelu",
- "addalpha", "subalpha", "isclose"] or op.endswith("_"):
+ elif op in ["add", "subtract", "multiply", "divide", "gt", "lt", "eq", "ne", "ge", "le",
+ "logical_and", "logical_or", "logical_xor", "atan2", "hypot", "logaddexp",
+ "logaddexp2", "maximum", "minimum", "pow", "fmod", "remainder",
+ "squared_difference", "bitwise_and", "bitwise_or", "bitwise_xor",
+ "mul", "sub", "rpow", "rdiv", "ldexp", "xlogy", "nextafter", "bias_gelu",
+ "addalpha", "subalpha", "isclose"] or op.endswith("_"):
categories["Binary"].append(op)
else:
categories["Unary"].append(op)
-
+
for category, ops in categories.items():
if ops:
print(f"\n{category} Operations ({len(ops)}):")
@@ -345,53 +389,58 @@ def print_supported_operations():
print()
if len(ops) % 3 != 0:
print()
-
+
print(f"\n\nTotal: {len(operations)} operations supported")
def generate_profile_name(test_cmd: str) -> str:
"""Generate a profile name from the test command/path."""
- # Handle specific test method (e.g., test_ops.py::test_matmul -> test_matmul)
if "::" in test_cmd:
return test_cmd.split("::")[-1]
-
- # Handle file path (e.g., tests/test_conv.py -> test_conv)
if test_cmd.endswith(".py"):
- filename = os.path.splitext(os.path.basename(test_cmd))[0] # Gets filename without extension
+ filename = os.path.splitext(os.path.basename(test_cmd))[0]
return filename
-
- # Handle directory or other cases
return os.path.basename(test_cmd) or "profile"
-def parse_args(argv):
- # Handle version and help flags
- if "--version" in argv or "-v" in argv:
- print("ttperf version 0.1.7")
+def parse_args(argv: List[str]) -> Tuple:
+ """Parse CLI arguments, applying config file defaults first."""
+ # Load config file defaults
+ file_config = load_config_file()
+
+ if "--version" in argv:
+ print(f"ttperf version {__version__}")
sys.exit(0)
-
+
if "--help" in argv or "-h" in argv:
print_help()
sys.exit(1)
-
+
if "--list-ops" in argv or "-l" in argv:
print_supported_operations()
sys.exit(0)
-
- # Parse arguments
+
parser = argparse.ArgumentParser(add_help=False)
- parser.add_argument('--debug', '-d', action='store_true', help='Show real-time debug output')
- parser.add_argument('--shape', type=str, help='Tensor shape (e.g., 1,1,32,32)')
- parser.add_argument('--dtype', type=str, help='Data type (bfloat16/bf16, float32/fp32/f32, int32/i32)')
- parser.add_argument('--layout', type=str, help='Memory layout (tile, row_major/rm)')
- parser.add_argument('--memory-config', type=str, choices=['dram', 'l1'], default='dram', help='Memory configuration (dram, l1)')
- parser.add_argument('--dram', action='store_const', const='dram', dest='memory_config', help='Use DRAM memory (default)')
- parser.add_argument('--l1', action='store_const', const='l1', dest='memory_config', help='Use L1 memory')
-
- # Parse known args to extract configuration options
+ parser.add_argument('--debug', '-d', action='store_true')
+ parser.add_argument('--verbose', '-v', action='store_true')
+ parser.add_argument('--quiet', '-q', action='store_true')
+ parser.add_argument('--shape', type=str, default=file_config.get('shape'))
+ parser.add_argument('--dtype', type=str, default=file_config.get('dtype'))
+ parser.add_argument('--layout', type=str, default=file_config.get('layout'))
+ parser.add_argument('--memory-config', type=str, choices=['dram', 'l1'],
+ default=file_config.get('memory_config', 'dram'))
+ parser.add_argument('--dram', action='store_const', const='dram', dest='memory_config')
+ parser.add_argument('--l1', action='store_const', const='l1', dest='memory_config')
+ parser.add_argument('--output-dir', type=str, default=file_config.get('output_dir'))
+
args, remaining = parser.parse_known_args(argv)
-
- # Default values
+
+ # Configure logging based on --verbose
+ if args.verbose:
+ logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s')
+ else:
+ logging.basicConfig(level=logging.WARNING, format='%(levelname)s: %(message)s')
+
name = None
test_cmd = None
custom_config = None
@@ -403,7 +452,6 @@ def parse_args(argv):
elif arg.lower() == "pytest":
continue
elif is_operation_name(arg):
- # This is an operation name, construct the test command
operation_name = arg
test_method = get_test_method_for_operation(operation_name)
test_file_path = get_test_file_path()
@@ -416,291 +464,220 @@ def parse_args(argv):
print_help()
sys.exit(1)
- # Process custom configuration
if args.shape or args.dtype or args.layout or args.memory_config:
- # Check if we're profiling an operation (not a test file)
if test_cmd and "test_eltwise_operations.py" in test_cmd:
- # Parse configuration
shape = parse_shape(args.shape) if args.shape else (1, 1, 32, 32)
dtype = validate_dtype(args.dtype) if args.dtype else "bfloat16"
layout = validate_layout(args.layout) if args.layout else "tile"
memory_config = validate_memory_config(args.memory_config) if args.memory_config else "dram"
-
- # For bitwise operations, always use int32 regardless of what's specified
+
if operation_name and operation_name.startswith('bitwise_'):
dtype = 'int32'
-
- # Store custom configuration
+
custom_config = {
'shape': shape,
'dtype': dtype,
'layout': layout,
'memory_config': memory_config
}
-
- # Set environment variables for the test
- set_test_configuration(shape, dtype, layout, memory_config, operation_name)
+
+ set_test_configuration(shape, dtype, layout, memory_config, operation_name, quiet=args.quiet)
else:
- print("โ ๏ธ Custom configuration options (--shape, --dtype, --layout, --memory-config) only work with operation names, not test files.")
+ if not args.quiet:
+ print("Warning: Custom configuration options only work with operation names, not test files.")
- # Auto-generate profile name if not provided
if not name:
name = generate_profile_name(test_cmd)
- print(f"๐ท๏ธ Auto-generated profile name: {name}")
+ if not args.quiet:
+ print(f"Auto-generated profile name: {name}")
- return name, test_cmd, args.debug, custom_config
+ return name, test_cmd, args.debug, custom_config, args.quiet, args.output_dir
def find_tt_metal_path() -> str:
- """Find tt-metal directory path in order of preference.
-
- Search order:
- 1. PYTHONPATH environment variable (if specified)
- 2. Current working directory (walk up to find tt-metal root)
- """
- # 1. Check PYTHONPATH environment variable
+ """Find tt-metal directory path in order of preference."""
pythonpath = os.environ.get('PYTHONPATH', '')
if pythonpath:
for path in pythonpath.split(':'):
- # Check if this path is tt-metal or contains tt-metal
if 'tt-metal' in path:
- # If path ends with tt-metal, use it directly
if path.endswith('tt-metal') or os.path.basename(path) == 'tt-metal':
if os.path.exists(path) and os.path.isdir(path):
return path
- # Otherwise, check if parent is tt-metal
parent = os.path.dirname(path)
if os.path.basename(parent) == 'tt-metal' and os.path.isdir(parent):
return parent
-
- # 2. Use current working directory - walk up to find tt-metal root
+
cwd = os.getcwd()
current = cwd
-
- # Walk up the directory tree to find tt-metal
while current != '/':
if os.path.basename(current) == 'tt-metal':
return current
current = os.path.dirname(current)
-
- # If not found, return current directory (will fail with helpful error)
+
return cwd
-def build_profile_command(name, test_cmd):
+def build_profile_command(name: str, test_cmd: str) -> str:
+ """Build the tracy profile command string."""
name_arg = f"-n {name}" if name else ""
tt_metal_path = find_tt_metal_path()
-
- # Check if the path exists and has the tracy tool
+
tracy_tool = os.path.join(tt_metal_path, "tools", "tracy", "profile_this.py")
if not os.path.exists(tracy_tool):
- print(f"โ ๏ธ Warning: Tracy tool not found at {tracy_tool}")
+ print(f"Warning: Tracy tool not found at {tracy_tool}")
print(f" Detected tt-metal path: {tt_metal_path}")
print(f" Please ensure:")
print(f" 1. tt-metal is installed correctly, and")
print(f" 2. Either:")
print(f" - Add tt-metal to PYTHONPATH: export PYTHONPATH=/path/to/tt-metal")
print(f" - Run from within tt-metal directory: cd /path/to/tt-metal")
-
+
return f"{tracy_tool} {name_arg} -c \"pytest {test_cmd}\""
def extract_config_from_csv(csv_path: str) -> dict:
"""Extract test configuration from the CSV file."""
- config = {}
-
+ config: dict = {}
try:
df = pd.read_csv(csv_path)
if len(df) > 0:
- # Get the first row (assuming single operation)
row = df.iloc[0]
-
- # Extract shape from input dimensions
- # Format: INPUT_0_W_PAD[LOGICAL], INPUT_0_Z_PAD[LOGICAL], INPUT_0_Y_PAD[LOGICAL], INPUT_0_X_PAD[LOGICAL]
- w = row.get('INPUT_0_W_PAD[LOGICAL]', '1')
- z = row.get('INPUT_0_Z_PAD[LOGICAL]', '1')
- y = row.get('INPUT_0_Y_PAD[LOGICAL]', '32')
- x = row.get('INPUT_0_X_PAD[LOGICAL]', '32')
-
- # Parse dimensions (they may be in format like "32[32]")
- def parse_dim(dim_str):
+
+ def parse_dim(dim_str: str) -> str:
if isinstance(dim_str, str) and '[' in dim_str:
return dim_str.split('[')[0]
return str(dim_str)
-
- w_val = parse_dim(w)
- z_val = parse_dim(z)
- y_val = parse_dim(y)
- x_val = parse_dim(x)
-
- config['shape'] = f"{w_val}, {z_val}, {y_val}, {x_val}"
-
- # Extract dtype (prefer output datatype, fallback to input)
+
+ w = parse_dim(row.get('INPUT_0_W_PAD[LOGICAL]', '1'))
+ z = parse_dim(row.get('INPUT_0_Z_PAD[LOGICAL]', '1'))
+ y = parse_dim(row.get('INPUT_0_Y_PAD[LOGICAL]', '32'))
+ x = parse_dim(row.get('INPUT_0_X_PAD[LOGICAL]', '32'))
+ config['shape'] = f"{w}, {z}, {y}, {x}"
+
output_dtype = row.get('OUTPUT_0_DATATYPE', row.get('INPUT_0_DATATYPE', 'BFLOAT16'))
- # Convert to lowercase for consistency
config['dtype'] = output_dtype.lower() if isinstance(output_dtype, str) else 'bfloat16'
-
- # Extract layout (prefer output layout, fallback to input)
+
output_layout = row.get('OUTPUT_0_LAYOUT', row.get('INPUT_0_LAYOUT', 'TILE'))
- # Convert to lowercase for consistency
config['layout'] = output_layout.lower() if isinstance(output_layout, str) else 'tile'
-
- # Extract memory configuration from memory columns
+
output_memory = row.get('OUTPUT_0_MEMORY', row.get('INPUT_0_MEMORY', 'DEV_1_DRAM_INTERLEAVED'))
if isinstance(output_memory, str):
if 'L1' in output_memory.upper():
config['memory_config'] = 'l1'
- elif 'DRAM' in output_memory.upper():
- config['memory_config'] = 'dram'
else:
- config['memory_config'] = 'dram' # Default fallback
+ config['memory_config'] = 'dram'
else:
- config['memory_config'] = 'dram' # Default fallback
-
+ config['memory_config'] = 'dram'
+
except Exception as e:
- print(f"โ ๏ธ Warning: Could not extract config from CSV: {e}")
- # Return empty config - will fall back to other methods
-
+ logger.warning("Could not extract config from CSV: %s", e)
+
return config
-def extract_test_config_and_status(output: str, csv_path: str = None) -> dict:
+def extract_test_config_and_status(output: str, csv_path: Optional[str] = None) -> dict:
"""Extract test configuration and pass/fail status from output and CSV."""
- result = {
+ result: dict = {
'config': {},
'status': 'unknown',
'test_name': 'unknown'
}
-
- # Extract test name - just the operation name, not the full class.method
- # Look for patterns like "test_eltwise_operations.py::TestEltwiseOperations::test_add"
+
test_match = re.search(r'::([^:]+)::test_([a-zA-Z_]+)', output)
if test_match:
- method_name = test_match.group(2)
- # Remove 'test_' prefix to get just the operation name
- result['test_name'] = method_name
+ result['test_name'] = test_match.group(2)
else:
- # Fallback: look for test method names
test_method_match = re.search(r'test_([a-zA-Z_]+)', output)
if test_method_match:
result['test_name'] = test_method_match.group(1)
-
- # Try to extract configuration from CSV first (most reliable)
+
if csv_path and os.path.exists(csv_path):
csv_config = extract_config_from_csv(csv_path)
if csv_config:
result['config'] = csv_config
-
- # If no CSV config available, try to extract from output (fallback)
+
if not result['config']:
- # Look for custom configuration patterns in output
- shape_match = re.search(r'๐ง.*?Using.*?configuration.*?Shape:\s*\(([^)]+)\)', output, re.IGNORECASE)
+ shape_match = re.search(r'Using.*?configuration.*?Shape:\s*\(([^)]+)\)', output, re.IGNORECASE)
if shape_match:
result['config']['shape'] = shape_match.group(1)
-
- dtype_match = re.search(r'๐ง.*?Using.*?configuration.*?Dtype:\s*(bfloat16|float32|int32)', output, re.IGNORECASE)
+
+ dtype_match = re.search(r'Using.*?configuration.*?Dtype:\s*(bfloat16|float32|int32)', output, re.IGNORECASE)
if dtype_match:
result['config']['dtype'] = dtype_match.group(1)
-
- layout_match = re.search(r'๐ง.*?Using.*?configuration.*?Layout:\s*(tile|row_major)', output, re.IGNORECASE)
+
+ layout_match = re.search(r'Using.*?configuration.*?Layout:\s*(tile|row_major)', output, re.IGNORECASE)
if layout_match:
result['config']['layout'] = layout_match.group(1).lower()
-
- memory_config_match = re.search(r'๐ง.*?Using.*?configuration.*?Memory Config:\s*(L1|DRAM|dram|l1)', output, re.IGNORECASE)
- if memory_config_match:
- result['config']['memory_config'] = memory_config_match.group(1).lower()
-
- # For bitwise operations, ensure int32 dtype if not already set from CSV
+
if result['test_name'].startswith('bitwise_') and not result['config'].get('dtype'):
result['config']['dtype'] = 'int32'
-
- # Determine test status from output
- if 'PASSED' in output or 'passed' in output:
+
+ if 'PASSED' in output or '1 passed' in output:
result['status'] = 'PASSED'
- elif 'FAILED' in output or 'failed' in output:
+ elif 'FAILED' in output or '1 failed' in output:
result['status'] = 'FAILED'
elif 'ERROR' in output or 'error' in output:
result['status'] = 'ERROR'
- elif 'collected' in output and 'passed' in output:
- result['status'] = 'PASSED'
- elif 'collected' in output and 'failed' in output:
- result['status'] = 'FAILED'
- elif '1 passed' in output:
- result['status'] = 'PASSED'
- elif '1 failed' in output:
- result['status'] = 'FAILED'
-
+
return result
-def print_test_summary(test_info: dict, csv_path: str, duration: float, custom_config: dict = None):
+def print_test_summary(
+ test_info: dict,
+ csv_path: str,
+ duration: float,
+ custom_config: Optional[dict] = None,
+ quiet: bool = False
+) -> None:
"""Print a comprehensive test summary."""
- print("\n" + "="*60)
- print("๐ TEST SUMMARY")
- print("="*60)
-
- # Test information
- print(f"๐งช Test: {test_info['test_name']}")
- print(f"๐ Status: {test_info['status']}")
-
- # Configuration - prefer custom config if available
+ print("\n" + "=" * 60)
+ print("TEST SUMMARY")
+ print("=" * 60)
+ print(f"Test: {test_info['test_name']}")
+ print(f"Status: {test_info['status']}")
+
if custom_config:
config_str = []
- if 'shape' in custom_config:
- config_str.append(f"shape={custom_config['shape']}")
- if 'dtype' in custom_config:
- config_str.append(f"dtype={custom_config['dtype']}")
- if 'layout' in custom_config:
- config_str.append(f"layout={custom_config['layout']}")
- if 'memory_config' in custom_config:
- config_str.append(f"memory_config={custom_config['memory_config']}")
- print(f"โ๏ธ Configuration: {', '.join(config_str)} (custom)")
+ for key in ('shape', 'dtype', 'layout', 'memory_config'):
+ if key in custom_config:
+ config_str.append(f"{key}={custom_config[key]}")
+ print(f"Configuration: {', '.join(config_str)} (custom)")
elif test_info['config']:
config_str = []
- if 'shape' in test_info['config']:
- config_str.append(f"shape={test_info['config']['shape']}")
- if 'dtype' in test_info['config']:
- config_str.append(f"dtype={test_info['config']['dtype']}")
- if 'layout' in test_info['config']:
- config_str.append(f"layout={test_info['config']['layout']}")
- if 'memory_config' in test_info['config']:
- config_str.append(f"memory_config={test_info['config']['memory_config']}")
- print(f"โ๏ธ Configuration: {', '.join(config_str)}")
+ for key in ('shape', 'dtype', 'layout', 'memory_config'):
+ if key in test_info['config']:
+ config_str.append(f"{key}={test_info['config'][key]}")
+ print(f"Configuration: {', '.join(config_str)}")
else:
- # Try to show expected configuration based on operation name
expected_config = get_expected_config_for_operation(test_info['test_name'])
if expected_config:
config_str = []
- if 'shape' in expected_config:
- config_str.append(f"shape={expected_config['shape']}")
- if 'dtype' in expected_config:
- config_str.append(f"dtype={expected_config['dtype']}")
- if 'layout' in expected_config:
- config_str.append(f"layout={expected_config['layout']}")
- if 'memory_config' in expected_config:
- config_str.append(f"memory_config={expected_config['memory_config']}")
- print(f"โ๏ธ Configuration: {', '.join(config_str)} (expected)")
+ for key in ('shape', 'dtype', 'layout', 'memory_config'):
+ if key in expected_config:
+ config_str.append(f"{key}={expected_config[key]}")
+ print(f"Configuration: {', '.join(config_str)} (expected)")
else:
- print("โ๏ธ Configuration: Not detected")
-
- # Performance metrics
- print(f"๐ CSV Path: {csv_path}")
- print(f"โฑ๏ธ DEVICE KERNEL DURATION [ns] total: {duration:.2f} ns")
- print("="*60)
+ print("Configuration: Not detected")
+ print(f"CSV Path: {csv_path}")
+ print(f"DEVICE KERNEL DURATION [ns] total: {duration:.2f} ns")
+ print("=" * 60)
-def main():
+
+def main() -> None:
if len(sys.argv) < 2:
print_help()
sys.exit(1)
- name, test_cmd, debug, custom_config = parse_args(sys.argv[1:])
+ name, test_cmd, debug, custom_config, quiet, output_dir = parse_args(sys.argv[1:])
profile_cmd = build_profile_command(name, test_cmd)
+ logger.debug("Profile command: %s", profile_cmd)
+
if debug:
- print(f"โถ๏ธ Running: {profile_cmd}\n")
+ print(f"Running: {profile_cmd}\n")
else:
- print(f"โถ๏ธ Running test...")
+ print(f"Running test...")
process = subprocess.Popen(
profile_cmd,
@@ -711,11 +688,11 @@ def main():
bufsize=1,
)
- output_lines = []
+ output_lines: List[str] = []
try:
for line in process.stdout:
if debug:
- print(line, end="") # Real-time output only in debug mode
+ print(line, end="")
output_lines.append(line)
except KeyboardInterrupt:
process.terminate()
@@ -723,22 +700,22 @@ def main():
sys.exit(1)
process.wait()
-
- # Combine all output for post-analysis
full_output = "".join(output_lines)
- # Extract CSV path and duration
try:
csv_path = extract_csv_path(full_output)
duration = get_device_kernel_duration(csv_path)
-
- # Extract test configuration and status
test_info = extract_test_config_and_status(full_output, csv_path)
-
- # Print comprehensive summary
- print_test_summary(test_info, csv_path, duration, custom_config)
-
+ print_test_summary(test_info, csv_path, duration, custom_config, quiet=quiet)
+
+ if output_dir:
+ os.makedirs(output_dir, exist_ok=True)
+ dest = os.path.join(output_dir, os.path.basename(csv_path))
+ shutil.copy2(csv_path, dest)
+ print(f"CSV copied to: {dest}")
+
except Exception as e:
+ logger.debug("Exception during result processing", exc_info=True)
print(f"\nโ Error processing results: {e}")
print("Raw output:")
print(full_output)
@@ -746,4 +723,4 @@ def main():
if __name__ == "__main__":
- main()
\ No newline at end of file
+ main()