diff --git a/problems/conv2d-divide-leaky-relu/def.py b/problems/conv2d-divide-leaky-relu/def.py new file mode 100644 index 0000000..15e8592 --- /dev/null +++ b/problems/conv2d-divide-leaky-relu/def.py @@ -0,0 +1,229 @@ +import torch +import torch.nn.functional as F +from typing import Any, Dict, List, Tuple + +from problem import Problem + + +class conv2d_divide_leaky_relu(Problem): + """Exact KernelBench Level 2 Conv2d -> divide -> LeakyReLU port.""" + + is_exact = True + + parameters = [ + {"name": "x", "type": "float", "pointer": True, "const": True}, + {"name": "weight", "type": "float", "pointer": True, "const": True}, + {"name": "bias", "type": "float", "pointer": True, "const": True}, + {"name": "divisor", "type": "float", "pointer": False, "const": True}, + {"name": "negative_slope", "type": "float", "pointer": False, "const": True}, + {"name": "output", "type": "float", "pointer": True, "const": False}, + {"name": "batch_size", "type": "size_t", "pointer": False, "const": False}, + {"name": "in_channels", "type": "size_t", "pointer": False, "const": False}, + {"name": "height", "type": "size_t", "pointer": False, "const": False}, + {"name": "width", "type": "size_t", "pointer": False, "const": False}, + {"name": "out_channels", "type": "size_t", "pointer": False, "const": False}, + {"name": "kernel_size", "type": "size_t", "pointer": False, "const": False}, + ] + + def __init__(self): + super().__init__(name="conv2d-divide-leaky-relu") + + @staticmethod + def _make_input( + batch_size: int, + in_channels: int, + height: int, + width: int, + seed: int, + dtype: torch.dtype, + ) -> torch.Tensor: + generator = torch.Generator().manual_seed(seed) + return torch.rand( + (batch_size, in_channels, height, width), + generator=generator, + dtype=torch.float32, + ).to(device="cuda", dtype=dtype) + + @staticmethod + def _make_conv_state( + in_channels: int, + out_channels: int, + kernel_size: int, + seed: int, + dtype: torch.dtype, + ) -> Tuple[torch.Tensor, torch.Tensor]: + with torch.random.fork_rng(): + torch.manual_seed(seed) + conv = torch.nn.Conv2d(in_channels, out_channels, kernel_size, bias=True) + weight = conv.weight.detach().to(device="cuda", dtype=dtype).contiguous() + bias = conv.bias.detach().to(device="cuda", dtype=dtype).contiguous() + return weight, bias + + def reference_solution( + self, + x: torch.Tensor, + weight: torch.Tensor, + bias: torch.Tensor, + divisor: float, + negative_slope: float, + ) -> torch.Tensor: + with torch.no_grad(), torch.autocast("cuda", enabled=False): + conv_out = F.conv2d(x, weight, bias) + return F.leaky_relu(conv_out / divisor, negative_slope=negative_slope) + + def generate_test_cases(self) -> List[Dict[str, Any]]: + dtype = self.param_dtype(0) + divisor = 2.0 + negative_slope = 0.01 + test_configs = [ + (4, 8, 32, 32, 16, 3), + (8, 8, 48, 40, 24, 3), + (4, 16, 64, 64, 32, 5), + (2, 32, 72, 60, 48, 3), + ] + + test_cases = [] + for batch_size, in_channels, height, width, out_channels, kernel_size in test_configs: + case_name = ( + f"B={batch_size}, Cin={in_channels}, H={height}, " + f"W={width}, Cout={out_channels}, K={kernel_size}" + ) + input_seed = Problem.get_seed(f"{self.name}_{case_name}_input") + init_seed = Problem.get_seed(f"{self.name}_{case_name}_init") + test_cases.append( + { + "name": case_name, + "batch_size": batch_size, + "in_channels": in_channels, + "height": height, + "width": width, + "out_channels": out_channels, + "kernel_size": kernel_size, + "divisor": divisor, + "negative_slope": negative_slope, + "create_inputs": lambda b=batch_size, cin=in_channels, h=height, w=width, cout=out_channels, k=kernel_size, d=divisor, slope=negative_slope, input_seed=input_seed, init_seed=init_seed, dtype=dtype: ( + self._make_input(b, cin, h, w, input_seed, dtype), + *self._make_conv_state(cin, cout, k, init_seed, dtype), + d, + slope, + ), + } + ) + return test_cases + + def generate_sample(self) -> Dict[str, Any]: + dtype = self.param_dtype(0) + return { + "name": "sample", + "batch_size": 1, + "in_channels": 2, + "height": 4, + "width": 4, + "out_channels": 2, + "kernel_size": 3, + "divisor": 2.0, + "negative_slope": 0.01, + "create_inputs": lambda d=dtype: ( + torch.tensor( + [ + [ + [[1.0, -1.0, 0.5, 2.0], [0.0, 1.5, -0.5, 1.0], [1.0, 0.5, -1.5, 0.0], [2.0, -0.5, 1.0, -1.0]], + [[-1.0, 0.5, 1.0, -0.5], [1.5, -1.0, 0.0, 0.5], [0.5, 1.0, -0.5, -1.5], [0.0, -1.0, 2.0, 1.0]], + ] + ], + device="cuda", + dtype=d, + ), + torch.tensor( + [ + [ + [[0.5, -0.25, 0.0], [1.0, -0.5, 0.25], [-0.75, 0.5, 0.25]], + [[-0.5, 0.25, 0.75], [0.0, -1.0, 0.5], [0.25, 0.5, -0.25]], + ], + [ + [[-0.25, 0.5, -0.75], [0.5, 0.25, -0.5], [1.0, -0.25, 0.0]], + [[0.75, -0.5, 0.25], [-0.25, 0.5, -1.0], [0.5, 0.0, 0.25]], + ], + ], + device="cuda", + dtype=d, + ), + torch.tensor([0.25, -0.5], device="cuda", dtype=d), + 2.0, + 0.01, + ), + } + + def verify_result( + self, expected_output: torch.Tensor, actual_output: torch.Tensor + ) -> Tuple[bool, Dict[str, Any]]: + if expected_output.shape != actual_output.shape: + return False, { + "message": f"Shape mismatch: expected {tuple(expected_output.shape)}, got {tuple(actual_output.shape)}" + } + + is_close = torch.allclose(actual_output, expected_output, rtol=3e-4, atol=4e-5) + if is_close: + return True, {} + + diff = actual_output - expected_output + flat_diff = diff.flatten() + _, top_indices = torch.topk(torch.abs(flat_diff), min(5, flat_diff.numel())) + + out_width = expected_output.shape[-1] + out_height = expected_output.shape[-2] + out_channels = expected_output.shape[1] + sample_diffs = {} + for idx in top_indices.tolist(): + spatial = idx % (out_height * out_width) + col = spatial % out_width + row = spatial // out_width + channel = (idx // (out_height * out_width)) % out_channels + batch = idx // (out_channels * out_height * out_width) + sample_diffs[f"(b={batch}, c={channel}, y={row}, x={col})"] = { + "expected": expected_output[batch, channel, row, col].item(), + "actual": actual_output[batch, channel, row, col].item(), + "diff": diff[batch, channel, row, col].item(), + } + + debug_info = { + "max_difference": torch.max(torch.abs(diff)).item(), + "mean_difference": torch.mean(torch.abs(diff)).item(), + "expected_negative": int((expected_output < 0).sum().item()), + "actual_negative": int((actual_output < 0).sum().item()), + "sample_differences": sample_diffs, + } + return False, debug_info + + def get_flops(self, test_case: Dict[str, Any]) -> int: + batch_size = test_case["batch_size"] + in_channels = test_case["in_channels"] + height = test_case["height"] + width = test_case["width"] + out_channels = test_case["out_channels"] + kernel_size = test_case["kernel_size"] + out_height = height - kernel_size + 1 + out_width = width - kernel_size + 1 + + conv_flops = ( + 2 + * batch_size + * out_channels + * out_height + * out_width + * in_channels + * kernel_size + * kernel_size + ) + pointwise_flops = 2 * batch_size * out_channels * out_height * out_width + return conv_flops + pointwise_flops + + def get_extra_params(self, test_case: Dict[str, Any]) -> List[Any]: + return [ + test_case["batch_size"], + test_case["in_channels"], + test_case["height"], + test_case["width"], + test_case["out_channels"], + test_case["kernel_size"], + ] diff --git a/problems/conv2d-divide-leaky-relu/problem.md b/problems/conv2d-divide-leaky-relu/problem.md new file mode 100644 index 0000000..bbca816 --- /dev/null +++ b/problems/conv2d-divide-leaky-relu/problem.md @@ -0,0 +1,30 @@ +--- +slug: "conv2d-divide-leaky-relu" +title: "Conv2d with Divide and Leaky ReLU" +difficulty: "MEDIUM" +author: "codex" +tags: ["kernelbench", "convolution", "activation-function", "exact-port"] +--- + +Perform a learned 2D convolution, divide the result by a scalar, and apply Leaky ReLU: +$$ +Y = \mathrm{LeakyReLU}\left(\frac{\mathrm{Conv2d}(X, W, b)}{d}, \alpha\right) +$$ + +This is an exact-port-style Tensara adaptation of a KernelBench Level 2 module. The learned +convolution weights and bias are materialized as deterministic testcase inputs. + +## Input +- `x` of shape `(batch_size, in_channels, height, width)` +- `weight` of shape `(out_channels, in_channels, kernel_size, kernel_size)` +- `bias` of shape `(out_channels,)` +- `divisor` as a scalar float +- `negative_slope` as a scalar float + +## Output +- `output` of shape `(batch_size, out_channels, height - kernel_size + 1, width - kernel_size + 1)` + +## Notes +- Convolution uses stride `1`, padding `0`, dilation `1`, and groups `1` +- The negative slope is fixed to `0.01` in the source task +- This problem is adapted from [KernelBench](https://github.com/ScalingIntelligence/KernelBench/blob/main/KernelBench/level2/71_Conv2d_Divide_LeakyReLU.py) diff --git a/problems/conv2d-hardswish-relu/def.py b/problems/conv2d-hardswish-relu/def.py new file mode 100644 index 0000000..2aa1436 --- /dev/null +++ b/problems/conv2d-hardswish-relu/def.py @@ -0,0 +1,213 @@ +import torch +import torch.nn.functional as F +from typing import Any, Dict, List, Tuple + +from problem import Problem + + +class conv2d_hardswish_relu(Problem): + """Exact KernelBench Level 2 Conv2d -> HardSwish -> ReLU port.""" + + is_exact = True + + parameters = [ + {"name": "x", "type": "float", "pointer": True, "const": True}, + {"name": "weight", "type": "float", "pointer": True, "const": True}, + {"name": "bias", "type": "float", "pointer": True, "const": True}, + {"name": "output", "type": "float", "pointer": True, "const": False}, + {"name": "batch_size", "type": "size_t", "pointer": False, "const": False}, + {"name": "in_channels", "type": "size_t", "pointer": False, "const": False}, + {"name": "height", "type": "size_t", "pointer": False, "const": False}, + {"name": "width", "type": "size_t", "pointer": False, "const": False}, + {"name": "out_channels", "type": "size_t", "pointer": False, "const": False}, + {"name": "kernel_size", "type": "size_t", "pointer": False, "const": False}, + ] + + def __init__(self): + super().__init__(name="conv2d-hardswish-relu") + + @staticmethod + def _make_input( + batch_size: int, + in_channels: int, + height: int, + width: int, + seed: int, + dtype: torch.dtype, + ) -> torch.Tensor: + generator = torch.Generator().manual_seed(seed) + return torch.rand( + (batch_size, in_channels, height, width), + generator=generator, + dtype=torch.float32, + ).to(device="cuda", dtype=dtype) + + @staticmethod + def _make_conv_state( + in_channels: int, + out_channels: int, + kernel_size: int, + seed: int, + dtype: torch.dtype, + ) -> Tuple[torch.Tensor, torch.Tensor]: + with torch.random.fork_rng(): + torch.manual_seed(seed) + conv = torch.nn.Conv2d(in_channels, out_channels, kernel_size, bias=True) + weight = conv.weight.detach().to(device="cuda", dtype=dtype).contiguous() + bias = conv.bias.detach().to(device="cuda", dtype=dtype).contiguous() + return weight, bias + + def reference_solution( + self, x: torch.Tensor, weight: torch.Tensor, bias: torch.Tensor + ) -> torch.Tensor: + with torch.no_grad(), torch.autocast("cuda", enabled=False): + conv_out = F.conv2d(x, weight, bias) + return F.relu(F.hardswish(conv_out)) + + def generate_test_cases(self) -> List[Dict[str, Any]]: + dtype = self.param_dtype(0) + test_configs = [ + (4, 8, 32, 32, 16, 3), + (8, 8, 48, 48, 24, 3), + (4, 16, 56, 40, 32, 5), + (2, 32, 64, 64, 48, 3), + ] + + test_cases = [] + for batch_size, in_channels, height, width, out_channels, kernel_size in test_configs: + case_name = ( + f"B={batch_size}, Cin={in_channels}, H={height}, " + f"W={width}, Cout={out_channels}, K={kernel_size}" + ) + input_seed = Problem.get_seed(f"{self.name}_{case_name}_input") + init_seed = Problem.get_seed(f"{self.name}_{case_name}_init") + test_cases.append( + { + "name": case_name, + "batch_size": batch_size, + "in_channels": in_channels, + "height": height, + "width": width, + "out_channels": out_channels, + "kernel_size": kernel_size, + "create_inputs": lambda b=batch_size, cin=in_channels, h=height, w=width, cout=out_channels, k=kernel_size, input_seed=input_seed, init_seed=init_seed, dtype=dtype: ( + self._make_input(b, cin, h, w, input_seed, dtype), + *self._make_conv_state(cin, cout, k, init_seed, dtype), + ), + } + ) + return test_cases + + def generate_sample(self) -> Dict[str, Any]: + dtype = self.param_dtype(0) + return { + "name": "sample", + "batch_size": 1, + "in_channels": 2, + "height": 4, + "width": 4, + "out_channels": 2, + "kernel_size": 3, + "create_inputs": lambda d=dtype: ( + torch.tensor( + [ + [ + [[0.5, -1.0, 1.5, 0.0], [1.0, -0.5, 0.5, -1.5], [0.25, 1.25, -0.75, 0.5], [-1.0, 0.0, 1.0, 1.5]], + [[-0.5, 0.25, 1.0, -1.0], [1.5, -1.5, 0.0, 0.5], [0.5, 1.0, -0.25, -0.5], [1.0, -0.75, 0.25, 0.0]], + ] + ], + device="cuda", + dtype=d, + ), + torch.tensor( + [ + [ + [[0.25, -0.5, 0.75], [1.0, -1.0, 0.25], [-0.5, 0.5, -0.25]], + [[-0.25, 0.75, -0.5], [0.5, -0.25, 0.25], [0.75, -0.5, 0.0]], + ], + [ + [[-0.75, 0.25, 0.5], [0.5, -0.5, 0.75], [0.0, 0.25, -1.0]], + [[0.25, -0.75, 0.5], [-0.5, 1.0, -0.25], [0.5, 0.25, -0.5]], + ], + ], + device="cuda", + dtype=d, + ), + torch.tensor([-0.25, 0.5], device="cuda", dtype=d), + ), + } + + def verify_result( + self, expected_output: torch.Tensor, actual_output: torch.Tensor + ) -> Tuple[bool, Dict[str, Any]]: + if expected_output.shape != actual_output.shape: + return False, { + "message": f"Shape mismatch: expected {tuple(expected_output.shape)}, got {tuple(actual_output.shape)}" + } + + is_close = torch.allclose(actual_output, expected_output, rtol=3e-4, atol=4e-5) + if is_close: + return True, {} + + diff = actual_output - expected_output + flat_diff = diff.flatten() + _, top_indices = torch.topk(torch.abs(flat_diff), min(5, flat_diff.numel())) + + out_width = expected_output.shape[-1] + out_height = expected_output.shape[-2] + out_channels = expected_output.shape[1] + sample_diffs = {} + for idx in top_indices.tolist(): + spatial = idx % (out_height * out_width) + col = spatial % out_width + row = spatial // out_width + channel = (idx // (out_height * out_width)) % out_channels + batch = idx // (out_channels * out_height * out_width) + sample_diffs[f"(b={batch}, c={channel}, y={row}, x={col})"] = { + "expected": expected_output[batch, channel, row, col].item(), + "actual": actual_output[batch, channel, row, col].item(), + "diff": diff[batch, channel, row, col].item(), + } + + debug_info = { + "max_difference": torch.max(torch.abs(diff)).item(), + "mean_difference": torch.mean(torch.abs(diff)).item(), + "expected_positive": int((expected_output > 0).sum().item()), + "actual_positive": int((actual_output > 0).sum().item()), + "sample_differences": sample_diffs, + } + return False, debug_info + + def get_flops(self, test_case: Dict[str, Any]) -> int: + batch_size = test_case["batch_size"] + in_channels = test_case["in_channels"] + height = test_case["height"] + width = test_case["width"] + out_channels = test_case["out_channels"] + kernel_size = test_case["kernel_size"] + out_height = height - kernel_size + 1 + out_width = width - kernel_size + 1 + + conv_flops = ( + 2 + * batch_size + * out_channels + * out_height + * out_width + * in_channels + * kernel_size + * kernel_size + ) + hardswish_flops = 5 * batch_size * out_channels * out_height * out_width + relu_flops = batch_size * out_channels * out_height * out_width + return conv_flops + hardswish_flops + relu_flops + + def get_extra_params(self, test_case: Dict[str, Any]) -> List[Any]: + return [ + test_case["batch_size"], + test_case["in_channels"], + test_case["height"], + test_case["width"], + test_case["out_channels"], + test_case["kernel_size"], + ] diff --git a/problems/conv2d-hardswish-relu/problem.md b/problems/conv2d-hardswish-relu/problem.md new file mode 100644 index 0000000..ef15a15 --- /dev/null +++ b/problems/conv2d-hardswish-relu/problem.md @@ -0,0 +1,29 @@ +--- +slug: "conv2d-hardswish-relu" +title: "Conv2d with HardSwish and ReLU" +difficulty: "MEDIUM" +author: "codex" +tags: ["kernelbench", "convolution", "activation-function", "exact-port"] +--- + +Perform a learned 2D convolution, apply HardSwish, and then apply ReLU: +$$ +Y = \mathrm{ReLU}(\mathrm{HardSwish}(\mathrm{Conv2d}(X, W, b))) +$$ + +This is an exact-port-style Tensara adaptation of a KernelBench Level 2 module. The learned +convolution weights and bias are materialized as deterministic testcase inputs. + +## Input +- `x` of shape `(batch_size, in_channels, height, width)` +- `weight` of shape `(out_channels, in_channels, kernel_size, kernel_size)` +- `bias` of shape `(out_channels,)` + +## Output +- `output` of shape `(batch_size, out_channels, height - kernel_size + 1, width - kernel_size + 1)` + +## Notes +- Convolution uses stride `1`, padding `0`, dilation `1`, and groups `1` +- The activation order matters: HardSwish first, ReLU second +- This problem is distinct from the existing normalized `conv2d-relu-hardswish` problem +- This problem is adapted from [KernelBench](https://github.com/ScalingIntelligence/KernelBench/blob/main/KernelBench/level2/69_Conv2d_HardSwish_ReLU.py) diff --git a/problems/gemm-relu-divide/def.py b/problems/gemm-relu-divide/def.py new file mode 100644 index 0000000..8cfc735 --- /dev/null +++ b/problems/gemm-relu-divide/def.py @@ -0,0 +1,169 @@ +import torch +import torch.nn.functional as F +from typing import Any, Dict, List, Tuple + +from problem import Problem + + +class gemm_relu_divide(Problem): + """Exact KernelBench Level 2 GEMM -> ReLU -> divide port.""" + + is_exact = True + + parameters = [ + {"name": "x", "type": "float", "pointer": True, "const": True}, + {"name": "weight", "type": "float", "pointer": True, "const": True}, + {"name": "bias", "type": "float", "pointer": True, "const": True}, + {"name": "divisor", "type": "float", "pointer": False, "const": True}, + {"name": "output", "type": "float", "pointer": True, "const": False}, + {"name": "batch_size", "type": "size_t", "pointer": False, "const": False}, + {"name": "in_features", "type": "size_t", "pointer": False, "const": False}, + {"name": "out_features", "type": "size_t", "pointer": False, "const": False}, + ] + + def __init__(self): + super().__init__(name="gemm-relu-divide") + + @staticmethod + def _make_input( + batch_size: int, in_features: int, seed: int, dtype: torch.dtype + ) -> torch.Tensor: + generator = torch.Generator().manual_seed(seed) + return torch.rand((batch_size, in_features), generator=generator, dtype=torch.float32).to( + device="cuda", dtype=dtype + ) + + @staticmethod + def _make_linear_state( + in_features: int, out_features: int, seed: int, dtype: torch.dtype + ) -> Tuple[torch.Tensor, torch.Tensor]: + with torch.random.fork_rng(): + torch.manual_seed(seed) + linear = torch.nn.Linear(in_features, out_features, bias=True) + weight = linear.weight.detach().to(device="cuda", dtype=dtype).contiguous() + bias = linear.bias.detach().to(device="cuda", dtype=dtype).contiguous() + return weight, bias + + def reference_solution( + self, + x: torch.Tensor, + weight: torch.Tensor, + bias: torch.Tensor, + divisor: float, + ) -> torch.Tensor: + with torch.no_grad(), torch.autocast("cuda", enabled=False): + logits = F.linear(x, weight, bias) + return F.relu(logits) / divisor + + def generate_test_cases(self) -> List[Dict[str, Any]]: + dtype = self.param_dtype(0) + divisor = 2.0 + test_configs = [ + (64, 512, 384), + (128, 1024, 768), + (192, 1536, 1024), + (256, 2048, 1536), + ] + + test_cases = [] + for batch_size, in_features, out_features in test_configs: + case_name = f"B={batch_size}, I={in_features}, O={out_features}" + input_seed = Problem.get_seed(f"{self.name}_{case_name}_input") + init_seed = Problem.get_seed(f"{self.name}_{case_name}_init") + test_cases.append( + { + "name": case_name, + "batch_size": batch_size, + "in_features": in_features, + "out_features": out_features, + "divisor": divisor, + "create_inputs": lambda b=batch_size, i=in_features, o=out_features, d=divisor, input_seed=input_seed, init_seed=init_seed, dtype=dtype: ( + self._make_input(b, i, input_seed, dtype), + *self._make_linear_state(i, o, init_seed, dtype), + d, + ), + } + ) + return test_cases + + def generate_sample(self) -> Dict[str, Any]: + dtype = self.param_dtype(0) + return { + "name": "sample", + "batch_size": 2, + "in_features": 4, + "out_features": 3, + "divisor": 2.0, + "create_inputs": lambda d=dtype: ( + torch.tensor( + [[1.0, 0.5, -1.0, 2.0], [-0.5, 1.5, 0.25, -2.0]], + device="cuda", + dtype=d, + ), + torch.tensor( + [ + [0.5, -1.0, 0.75, 1.5], + [-0.25, 0.5, 1.0, -0.75], + [1.25, -0.5, -1.0, 0.25], + ], + device="cuda", + dtype=d, + ), + torch.tensor([0.5, -0.75, 0.25], device="cuda", dtype=d), + 2.0, + ), + } + + def verify_result( + self, expected_output: torch.Tensor, actual_output: torch.Tensor + ) -> Tuple[bool, Dict[str, Any]]: + if expected_output.shape != actual_output.shape: + return False, { + "message": f"Shape mismatch: expected {tuple(expected_output.shape)}, got {tuple(actual_output.shape)}" + } + + is_close = torch.allclose(actual_output, expected_output, rtol=2e-4, atol=2e-5) + if is_close: + return True, {} + + diff = actual_output - expected_output + flat_diff = diff.flatten() + _, top_indices = torch.topk(torch.abs(flat_diff), min(5, flat_diff.numel())) + + rows, cols = expected_output.shape + sample_diffs = {} + for idx in top_indices.tolist(): + row = idx // cols + col = idx % cols + sample_diffs[f"({row}, {col})"] = { + "expected": expected_output[row, col].item(), + "actual": actual_output[row, col].item(), + "diff": diff[row, col].item(), + } + + debug_info = { + "max_difference": torch.max(torch.abs(diff)).item(), + "mean_difference": torch.mean(torch.abs(diff)).item(), + "expected_nonzero": int((expected_output > 0).sum().item()), + "actual_nonzero": int((actual_output > 0).sum().item()), + "sample_differences": sample_diffs, + } + return False, debug_info + + def get_flops(self, test_case: Dict[str, Any]) -> int: + batch_size = test_case["batch_size"] + in_features = test_case["in_features"] + out_features = test_case["out_features"] + + # Matrix multiply: 2 * B * I * O + # Bias add: B * O + # ReLU: B * O + # Divide: B * O + return (2 * batch_size * in_features * out_features) + (3 * batch_size * out_features) + + def get_extra_params(self, test_case: Dict[str, Any]) -> List[Any]: + return [ + test_case["batch_size"], + test_case["in_features"], + test_case["out_features"], + ] diff --git a/problems/gemm-relu-divide/problem.md b/problems/gemm-relu-divide/problem.md new file mode 100644 index 0000000..7be3353 --- /dev/null +++ b/problems/gemm-relu-divide/problem.md @@ -0,0 +1,30 @@ +--- +slug: "gemm-relu-divide" +title: "GEMM with ReLU and Divide" +difficulty: "MEDIUM" +author: "codex" +tags: ["kernelbench", "gemm", "activation-function", "exact-port"] +--- + +Perform a matrix multiplication using learned weights and bias, then apply ReLU and divide by a scalar: +$$ +Y = \frac{\mathrm{ReLU}(X W^T + b)}{d} +$$ + +This is an exact-port-style Tensara adaptation of a KernelBench Level 2 module. The learned +`weight` and `bias` tensors are materialized as deterministic testcase inputs so the runtime +contract remains explicit. + +## Input +- `x` of shape `(batch_size, in_features)` +- `weight` of shape `(out_features, in_features)` +- `bias` of shape `(out_features,)` +- `divisor` as a scalar float + +## Output +- `output` of shape `(batch_size, out_features)` + +## Notes +- `weight` and `bias` correspond to a deterministically initialized `nn.Linear` +- ReLU is applied before the scalar divide +- This problem is adapted from [KernelBench](https://github.com/ScalingIntelligence/KernelBench/blob/main/KernelBench/level2/63_Gemm_ReLU_Divide.py) diff --git a/problems/matmul-mish-mish/def.py b/problems/matmul-mish-mish/def.py new file mode 100644 index 0000000..3f31a05 --- /dev/null +++ b/problems/matmul-mish-mish/def.py @@ -0,0 +1,157 @@ +import torch +import torch.nn.functional as F +from typing import Any, Dict, List, Tuple + +from problem import Problem + + +class matmul_mish_mish(Problem): + """Exact KernelBench Level 2 Linear -> Mish -> Mish port.""" + + is_exact = True + + parameters = [ + {"name": "x", "type": "float", "pointer": True, "const": True}, + {"name": "weight", "type": "float", "pointer": True, "const": True}, + {"name": "bias", "type": "float", "pointer": True, "const": True}, + {"name": "output", "type": "float", "pointer": True, "const": False}, + {"name": "batch_size", "type": "size_t", "pointer": False, "const": False}, + {"name": "in_features", "type": "size_t", "pointer": False, "const": False}, + {"name": "out_features", "type": "size_t", "pointer": False, "const": False}, + ] + + def __init__(self): + super().__init__(name="matmul-mish-mish") + + @staticmethod + def _make_input( + batch_size: int, in_features: int, seed: int, dtype: torch.dtype + ) -> torch.Tensor: + generator = torch.Generator().manual_seed(seed) + return torch.rand((batch_size, in_features), generator=generator, dtype=torch.float32).to( + device="cuda", dtype=dtype + ) + + @staticmethod + def _make_linear_state( + in_features: int, out_features: int, seed: int, dtype: torch.dtype + ) -> Tuple[torch.Tensor, torch.Tensor]: + with torch.random.fork_rng(): + torch.manual_seed(seed) + linear = torch.nn.Linear(in_features, out_features, bias=True) + weight = linear.weight.detach().to(device="cuda", dtype=dtype).contiguous() + bias = linear.bias.detach().to(device="cuda", dtype=dtype).contiguous() + return weight, bias + + def reference_solution( + self, x: torch.Tensor, weight: torch.Tensor, bias: torch.Tensor + ) -> torch.Tensor: + with torch.no_grad(), torch.autocast("cuda", enabled=False): + logits = F.linear(x, weight, bias) + return F.mish(F.mish(logits)) + + def generate_test_cases(self) -> List[Dict[str, Any]]: + dtype = self.param_dtype(0) + test_configs = [ + (64, 384, 256), + (128, 768, 512), + (192, 1024, 768), + (96, 1536, 1024), + ] + + test_cases = [] + for batch_size, in_features, out_features in test_configs: + case_name = f"B={batch_size}, I={in_features}, O={out_features}" + input_seed = Problem.get_seed(f"{self.name}_{case_name}_input") + init_seed = Problem.get_seed(f"{self.name}_{case_name}_init") + test_cases.append( + { + "name": case_name, + "batch_size": batch_size, + "in_features": in_features, + "out_features": out_features, + "create_inputs": lambda b=batch_size, i=in_features, o=out_features, input_seed=input_seed, init_seed=init_seed, dtype=dtype: ( + self._make_input(b, i, input_seed, dtype), + *self._make_linear_state(i, o, init_seed, dtype), + ), + } + ) + return test_cases + + def generate_sample(self) -> Dict[str, Any]: + dtype = self.param_dtype(0) + return { + "name": "sample", + "batch_size": 2, + "in_features": 4, + "out_features": 3, + "create_inputs": lambda d=dtype: ( + torch.tensor( + [[-1.0, 0.5, 1.5, -0.25], [0.25, -0.75, 1.0, 2.0]], + device="cuda", + dtype=d, + ), + torch.tensor( + [ + [0.5, -1.0, 0.25, 0.75], + [-0.5, 0.75, -1.25, 0.5], + [1.0, 0.25, -0.75, -0.5], + ], + device="cuda", + dtype=d, + ), + torch.tensor([0.25, -0.5, 0.75], device="cuda", dtype=d), + ), + } + + def verify_result( + self, expected_output: torch.Tensor, actual_output: torch.Tensor + ) -> Tuple[bool, Dict[str, Any]]: + if expected_output.shape != actual_output.shape: + return False, { + "message": f"Shape mismatch: expected {tuple(expected_output.shape)}, got {tuple(actual_output.shape)}" + } + + is_close = torch.allclose(actual_output, expected_output, rtol=3e-4, atol=3e-5) + if is_close: + return True, {} + + diff = actual_output - expected_output + flat_diff = diff.flatten() + _, top_indices = torch.topk(torch.abs(flat_diff), min(5, flat_diff.numel())) + + rows, cols = expected_output.shape + sample_diffs = {} + for idx in top_indices.tolist(): + row = idx // cols + col = idx % cols + sample_diffs[f"({row}, {col})"] = { + "expected": expected_output[row, col].item(), + "actual": actual_output[row, col].item(), + "diff": diff[row, col].item(), + } + + debug_info = { + "max_difference": torch.max(torch.abs(diff)).item(), + "mean_difference": torch.mean(torch.abs(diff)).item(), + "sample_differences": sample_diffs, + } + return False, debug_info + + def get_flops(self, test_case: Dict[str, Any]) -> int: + batch_size = test_case["batch_size"] + in_features = test_case["in_features"] + out_features = test_case["out_features"] + + # Matrix multiply: 2 * B * I * O + # Bias add: B * O + # Two Mish applications, approximated at 6 FLOPs each per element + mish_flops = 12 * batch_size * out_features + return (2 * batch_size * in_features * out_features) + (batch_size * out_features) + mish_flops + + def get_extra_params(self, test_case: Dict[str, Any]) -> List[Any]: + return [ + test_case["batch_size"], + test_case["in_features"], + test_case["out_features"], + ] diff --git a/problems/matmul-mish-mish/problem.md b/problems/matmul-mish-mish/problem.md new file mode 100644 index 0000000..b7417a6 --- /dev/null +++ b/problems/matmul-mish-mish/problem.md @@ -0,0 +1,28 @@ +--- +slug: "matmul-mish-mish" +title: "Matmul with Mish and Mish" +difficulty: "MEDIUM" +author: "codex" +tags: ["kernelbench", "matmul", "activation-function", "exact-port"] +--- + +Perform a learned linear transform and apply Mish twice: +$$ +Y = \mathrm{Mish}(\mathrm{Mish}(X W^T + b)) +$$ + +This is an exact-port-style Tensara adaptation of a KernelBench Level 2 module. The learned +`weight` and `bias` tensors are materialized as deterministic testcase inputs. + +## Input +- `x` of shape `(batch_size, in_features)` +- `weight` of shape `(out_features, in_features)` +- `bias` of shape `(out_features,)` + +## Output +- `output` of shape `(batch_size, out_features)` + +## Notes +- `weight` and `bias` correspond to a deterministically initialized `nn.Linear` +- Mish is applied twice in sequence +- This problem is adapted from [KernelBench](https://github.com/ScalingIntelligence/KernelBench/blob/main/KernelBench/level2/29_Matmul_Mish_Mish.py) diff --git a/staging/validate_kernelbench_level2_ports.py b/staging/validate_kernelbench_level2_ports.py new file mode 100644 index 0000000..12ae31b --- /dev/null +++ b/staging/validate_kernelbench_level2_ports.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +import argparse +import importlib.util +import sys +from pathlib import Path + + +DEFAULT_SLUGS = [ + "gemm-relu-divide", + "conv2d-divide-leaky-relu", + "conv2d-hardswish-relu", + "matmul-mish-mish", +] + + +def convert_slug_to_module_name(slug: str) -> str: + return slug.replace("-", "_") + + +def load_problem(slug: str): + repo_root = Path(__file__).resolve().parents[1] + tensara_engine = repo_root.parents[0] / "tensara" / "engine" + if str(tensara_engine) not in sys.path: + sys.path.insert(0, str(tensara_engine)) + + problem_path = repo_root / "problems" / slug / "def.py" + if not problem_path.exists(): + raise FileNotFoundError(f"Problem definition not found: {problem_path}") + + module_name = convert_slug_to_module_name(slug) + spec = importlib.util.spec_from_file_location(module_name, problem_path) + if spec is None or spec.loader is None: + raise RuntimeError(f"Could not create import spec for {problem_path}") + + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + problem_class = getattr(module, module_name) + return problem_class() + + +def perturb_tensor(tensor): + bad = tensor.clone() + flat = bad.reshape(-1) + delta = 1.0 if tensor.dtype.is_floating_point else 1 + flat[0] = flat[0] + delta + return bad + + +def validate_case(problem, case_name: str, case: dict, reject_wrong: bool) -> None: + inputs = case["create_inputs"]() + expected = problem.reference_solution(*inputs) + + correct_ok, correct_info = problem.verify_result(expected, expected.clone()) + if not correct_ok: + raise AssertionError( + f"{problem.name} {case_name}: verifier rejected reference output: {correct_info}" + ) + + if reject_wrong: + wrong = perturb_tensor(expected) + wrong_ok, wrong_info = problem.verify_result(expected, wrong) + if wrong_ok: + raise AssertionError( + f"{problem.name} {case_name}: verifier accepted intentionally wrong output: {wrong_info}" + ) + + flops = problem.get_flops(case) + if flops is not None and flops <= 0: + raise AssertionError(f"{problem.name} {case_name}: non-positive FLOPs: {flops}") + + +def main() -> int: + parser = argparse.ArgumentParser(description="Validate local KernelBench Level 2 Tensara ports") + parser.add_argument("slugs", nargs="*", default=DEFAULT_SLUGS) + parser.add_argument( + "--all", + action="store_true", + help="Run all generated test cases instead of only the first one plus sample", + ) + args = parser.parse_args() + + import torch + + if not torch.cuda.is_available(): + print("CUDA is not available in this Python environment.", file=sys.stderr) + return 2 + + for slug in args.slugs: + problem = load_problem(slug) + print(f"[validate] {slug}") + + sample = problem.generate_sample() + validate_case(problem, "sample", sample, reject_wrong=True) + print(" sample: ok") + + test_cases = problem.generate_test_cases() + selected_cases = test_cases if args.all else test_cases[:1] + for index, case in enumerate(selected_cases, start=1): + validate_case(problem, f"test#{index}", case, reject_wrong=(index == 1)) + print(f" test#{index}: ok ({case['name']})") + + torch.cuda.empty_cache() + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())