diff --git a/.github/workflows/compare_providers_streaming.yml b/.github/workflows/compare_providers_streaming.yml index 21798c80..24e8cfd3 100644 --- a/.github/workflows/compare_providers_streaming.yml +++ b/.github/workflows/compare_providers_streaming.yml @@ -30,6 +30,7 @@ jobs: AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} AWS_REGION: ${{ secrets.AWS_REGION }} MISTRAL_LARGE_API: ${{ secrets.MISTRAL_LARGE_API }} + AZURE_API_KEY: ${{ secrets.AZURE_API_KEY }} steps: - name: Check out repository diff --git a/.gitignore b/.gitignore index 8c56d88f..e0ffce2b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,7 +8,7 @@ dynamodb/docker/docker/dynamodb/shared-local-instance.db .coverage benchmark_graph/* .pytest_cache/* - +*.ipynb # Testing and coverage coverage.xml @@ -17,6 +17,9 @@ htmlcov # Notebooks notebooks/.ipynb_checkpoints/ +.ipynb_checkpoints/ +exp_dir/ +experiments/ azure-test.py \ No newline at end of file diff --git a/benchmarking/benchmark_main.py b/benchmarking/benchmark_main.py index c7a09c9a..c6a27872 100644 --- a/benchmarking/benchmark_main.py +++ b/benchmarking/benchmark_main.py @@ -4,6 +4,9 @@ import time from datetime import datetime from matplotlib.ticker import LogLocator, FormatStrFormatter +import random +from utils.db_utils import save_flattened_metrics_to_csv + class Benchmark: """ @@ -25,11 +28,12 @@ def __init__( providers, num_requests, models, - max_output, - prompt, + outputs, + prompts, streaming=False, verbosity=False, - vllm_ip=None + vllm_ip=None, + exp_dir=None ): """ Initializes the Benchmark instance with provided parameters. @@ -46,11 +50,15 @@ def __init__( self.providers = providers self.num_requests = num_requests self.models = models - self.prompt = prompt + self.prompts = prompts self.streaming = streaming - self.max_output = max_output + self.outputs = outputs self.verbosity = verbosity self.vllm_ip = vllm_ip + self.exp_dir = exp_dir + # New parameters for retry mechanism + self.max_retries = 3 + self.base_timeout = 10 base_dir = "streaming" if streaming else "end_to_end" @@ -77,33 +85,40 @@ def plot_metrics(self, metric, filename_suffix): for provider in self.providers: provider_name = provider.__class__.__name__ - for model, latencies in provider.metrics[metric].items(): - # Convert to milliseconds and sort for CDF - latencies_sorted = np.sort(latencies) * 1000 - cdf = np.arange(1, len(latencies_sorted) + 1) / len(latencies_sorted) - model_name = provider.get_model_name(model) + save_flattened_metrics_to_csv(provider, metric, f"{self.exp_dir}/{metric}_logs.csv") + print(provider.metrics[metric].items()) + for model, input_dict in provider.metrics[metric].items(): + for input_size, output_dict in input_dict.items(): + for max_output, latencies in output_dict.items(): + # Convert to milliseconds and sort for CDF + print(model, input_size, len(latencies)) + latencies_sorted = np.sort(latencies) * 1000 + cdf = np.arange(1, len(latencies_sorted) + 1) / len(latencies_sorted) + model_name = provider.get_model_name(model) - if provider_name.lower() == "vllm": - plt.plot( - latencies_sorted, - cdf, - marker="o", - linestyle="-", - markersize=6, # Slightly larger marker size - color="black", # Black color for the marker - label=f"{provider_name} - {model_name}", - linewidth=2, # Bold line - ) - else: - plt.plot( - latencies_sorted, - cdf, - marker="o", - linestyle="-", - markersize=5, - label=f"{provider_name} - {model_name}", - ) - + if provider_name.lower() == "vllm": + plt.plot( + latencies_sorted, + cdf, + marker="o", + linestyle="-", + markersize=6, # Slightly larger marker size + color="black", # Black color for the marker + # label=f"{provider_name} - {model_name}", + label=f"{provider_name}", + linewidth=2, # Bold line + ) + else: + plt.plot( + latencies_sorted, + cdf, + marker="o", + linestyle="-", + markersize=5, + # label=f"{provider_name} - {model_name}", + label=f"{provider_name} - {max_output}", + ) + plt.xlabel("Latency (ms)", fontsize=12) plt.ylabel("Portion of requests", fontsize=12) plt.grid(True) @@ -147,38 +162,94 @@ def run(self): for provider in self.providers: provider_name = provider.__class__.__name__ # logging.debug(f"{provider_name}") - print(f"{provider_name}") + # print(f"{provider_name}") for model in self.models: model_name = provider.get_model_name(model) - print(f"Model: {model_name}\nPrompt: {self.prompt}") + # print(f"Model: {model_name}") + + for prompt in self.prompts: + # print(f"Prompt: {prompt}") + for max_output in self.outputs: + for i in range(self.num_requests): + if self.verbosity: + print(f"Request {i + 1}/{self.num_requests}") + print(f"{provider_name}" + f" Model: {model_name}") - for i in range(self.num_requests): - if self.verbosity: - print(f"Request {i + 1}/{self.num_requests}") + # if ((i+1) % 10) == 0: + # # print("[DEBUG] Sleeping for 2 mins to bypass rate limit...") + # time.sleep(120) - if i % 20 == 0: - # print("[DEBUG] Sleeping for 2 mins to bypass rate limit...") - time.sleep(120) + if self.streaming: + if provider_name == "vLLM": + provider.perform_inference_streaming( + model, prompt, self.vllm_ip, max_output, self.verbosity + ) + else: + provider.perform_inference_streaming( + model, prompt, max_output, self.verbosity + ) + else: + if provider_name == "vLLM": + provider.perform_inference( + model, prompt, self.vllm_ip, max_output, self.verbosity + ) + else: + provider.perform_inference( + model, prompt, max_output, self.verbosity + ) - if self.streaming: - if provider_name == "vLLM": - provider.perform_inference_streaming( - model, self.prompt, self.vllm_ip, self.max_output, self.verbosity - ) - else: - provider.perform_inference_streaming( - model, self.prompt, self.max_output, self.verbosity - ) - else: - if provider_name == "vLLM": - provider.perform_inference( - model, self.prompt, self.vllm_ip, self.max_output, self.verbosity - ) - else: - provider.perform_inference( - model, self.prompt, self.max_output, self.verbosity - ) + # # Simple retry mechanism + + # success = False + # attempts = 0 + + # while not success and attempts < self.max_retries: + # try: + # # Calculate timeout with exponential backoff + # timeout = self.base_timeout * (2 ** attempts) + # print(timeout) + + # if self.streaming: + # if provider_name == "vLLM": + # provider.perform_inference_streaming( + # model, self.prompt, self.vllm_ip, + # self.max_output, self.verbosity, timeout=timeout + # ) + # else: + # provider.perform_inference_streaming( + # model, self.prompt, timeout, self.max_output, + # self.verbosity + # ) + # else: + # if provider_name == "vLLM": + # provider.perform_inference( + # model, self.prompt, self.vllm_ip, + # self.max_output, self.verbosity, timeout=timeout + # ) + # else: + # provider.perform_inference( + # model, self.prompt, self.max_output, + # self.verbosity, timeout=timeout + # ) + + # # If we get here, the request was successful + # success = True + + # except Exception as e: + # attempts += 1 + # if self.verbosity: + # print(f"Request failed (attempt {attempts}/{self.max_retries}): {str(e)}") + + # # Add some jitter to avoid request storms on retry + # if attempts < self.max_retries: + # jitter = random.uniform(0.5, 1.5) + # wait_time = min(30, (2 ** attempts) * jitter) + # time.sleep(wait_time) + + # # Record failed request if all attempts failed + # if not success: + # print(f"Request {i+1} failed after {self.max_retries} attempts") if not self.streaming: self.plot_metrics("response_times", "response_times") else: @@ -188,3 +259,4 @@ def run(self): self.plot_metrics("timebetweentokens", "timebetweentokens") self.plot_metrics("timebetweentokens_median", "timebetweentokens_median") self.plot_metrics("timebetweentokens_p95", "timebetweentokens_p95") + self.plot_metrics("totaltokens", "totaltokens") diff --git a/benchmarking/dynamo_bench.py b/benchmarking/dynamo_bench.py index 53dffb4c..8f605fa5 100644 --- a/benchmarking/dynamo_bench.py +++ b/benchmarking/dynamo_bench.py @@ -234,9 +234,10 @@ def run(self): if self.verbosity: print(f"Request {i + 1}/{self.num_requests}") - if i % 20 == 0: + if i % 10 == 0: # print("[DEBUG] Sleeping for 2 mins to bypass rate limit...") time.sleep(120) + print("finished sleeping") if self.streaming: if provider_name == "vLLM": diff --git a/benchmarking/experiments/compare_providers_streaming.json b/benchmarking/experiments/compare_providers_streaming.json index 9f90ccec..f38ce34e 100644 --- a/benchmarking/experiments/compare_providers_streaming.json +++ b/benchmarking/experiments/compare_providers_streaming.json @@ -1,23 +1,22 @@ { "providers": [ - "Anthropic", + "Groq", + "Hyperbolic", "AWSBedrock", "Azure", "Cloudflare", - "Google", - "Groq", - "Hyperbolic", "OpenAI", - "PerplexityAI", - "TogetherAI" + "TogetherAI", + "Anthropic", + "Google", + "PerplexityAI" ], "models": [ "common-model" ], - "num_requests": 100, + "num_requests": 1, "input_tokens": 10, "streaming": true, "max_output": 100, - "verbose": true, - "backend": true + "verbose": true } \ No newline at end of file diff --git a/benchmarking/experiments/providers_streaming/multiple_models/aws.json b/benchmarking/experiments/providers_streaming/multiple_models/aws.json index 438cb711..11b5879a 100644 --- a/benchmarking/experiments/providers_streaming/multiple_models/aws.json +++ b/benchmarking/experiments/providers_streaming/multiple_models/aws.json @@ -3,12 +3,11 @@ "AWSBedrock" ], "models": [ - "common-model" + "meta-llama-3-8b-instruct", "mistral-23b-instruct-v0.1", "mistral-48b-instruct-v0.1", "meta-llama-3-70b-instruct", "mistral-124b-instruct-v0.1" ], - "num_requests": 100, + "num_requests": 1, "input_tokens": 10, "streaming": true, "max_output": 100, - "verbose": true, - "backend": true + "verbose": true } \ No newline at end of file diff --git a/benchmarking/experiments/providers_streaming/multiple_models/azure.json b/benchmarking/experiments/providers_streaming/multiple_models/azure.json index 53643191..f3d2d5ad 100644 --- a/benchmarking/experiments/providers_streaming/multiple_models/azure.json +++ b/benchmarking/experiments/providers_streaming/multiple_models/azure.json @@ -4,12 +4,13 @@ ], "models": [ "meta-llama-3.1-8b-instruct", - "meta-llama-3.1-70b-instruct" + "mistral-23b-instruct-v0.1", + "meta-llama-3.3-70b-instruct", + "meta-llama-3.1-405b-instruct" ], - "num_requests": 100, + "num_requests": 1, "input_tokens": 10, "streaming": true, "max_output": 100, - "verbose": true, - "backend": true + "verbose": true } \ No newline at end of file diff --git a/benchmarking/experiments/providers_streaming/multiple_models/cloudflare.json b/benchmarking/experiments/providers_streaming/multiple_models/cloudflare.json index 6894e9ae..2f5d4354 100644 --- a/benchmarking/experiments/providers_streaming/multiple_models/cloudflare.json +++ b/benchmarking/experiments/providers_streaming/multiple_models/cloudflare.json @@ -3,16 +3,13 @@ "Cloudflare" ], "models": [ - "google-gemma-2b-it", - "phi-2", - "meta-llama-3.2-3b-instruct", - "meta-llama-3.1-70b-instruct", - "mistral-7b-instruct-v0.1" + "meta-llama-2-7b", + "deepseek-ai-R1-32b", + "meta-llama-3.1-70b-instruct" ], "num_requests": 100, "input_tokens": 10, "streaming": true, "max_output": 100, - "verbose": true, - "backend": true + "verbose": true } \ No newline at end of file diff --git a/benchmarking/experiments/providers_streaming/multiple_models/open_ai.json b/benchmarking/experiments/providers_streaming/multiple_models/open_ai.json index 15925217..252dc718 100644 --- a/benchmarking/experiments/providers_streaming/multiple_models/open_ai.json +++ b/benchmarking/experiments/providers_streaming/multiple_models/open_ai.json @@ -11,6 +11,5 @@ "input_tokens": 10, "streaming": true, "max_output": 100, - "verbose": true, - "backend": true + "verbose": true } \ No newline at end of file diff --git a/benchmarking/experiments/providers_streaming/multiple_models/together_ai.json b/benchmarking/experiments/providers_streaming/multiple_models/together_ai.json index d66a61c8..97bc38a1 100644 --- a/benchmarking/experiments/providers_streaming/multiple_models/together_ai.json +++ b/benchmarking/experiments/providers_streaming/multiple_models/together_ai.json @@ -3,8 +3,6 @@ "TogetherAI" ], "models": [ - "google-gemma-2b-it", - "meta-llama-3.2-3b-instruct", "mistral-7b-instruct-v0.1", "meta-llama-3.1-70b-instruct", "meta-llama-3.1-405b-instruct" @@ -13,6 +11,5 @@ "input_tokens": 10, "streaming": true, "max_output": 100, - "verbose": true, - "backend": true + "verbose": true } \ No newline at end of file diff --git a/benchmarking/experiments/providers_streaming/single_model/groq.json b/benchmarking/experiments/providers_streaming/single_model/groq.json index 8c3f28f7..276f96e2 100644 --- a/benchmarking/experiments/providers_streaming/single_model/groq.json +++ b/benchmarking/experiments/providers_streaming/single_model/groq.json @@ -3,12 +3,11 @@ "Groq" ], "models": [ - "google-gemma-7b-it" + "common-model" ], - "num_requests": 100, + "num_requests": 3, "input_tokens": 10, "streaming": true, "max_output": 100, - "verbose": true, - "backend": true + "verbose": true } \ No newline at end of file diff --git a/benchmarking/experiments/sample_config.json b/benchmarking/experiments/sample_config.json index 244f8f2d..20b7c41c 100644 --- a/benchmarking/experiments/sample_config.json +++ b/benchmarking/experiments/sample_config.json @@ -3,11 +3,11 @@ "Azure" ], "models": [ - "common-model" + "common-model-small" ], - "num_requests": 3, - "input_tokens": 10, + "num_requests": 10, + "input_tokens": [100], "streaming": true, - "max_output": 100, + "max_output": [10000], "verbose": true } \ No newline at end of file diff --git a/main.py b/main.py index f69cc95a..c7caa3f8 100644 --- a/main.py +++ b/main.py @@ -17,6 +17,7 @@ vLLM ) from utils.prompt_generator import get_prompt +from utils.db_utils import create_experiment_folder, save_config # Load environment variables load_dotenv() @@ -41,7 +42,7 @@ input_sizes = [10, 100, 1000, 10000, 100000] # Define possible max output tokens -OUTPUT_SIZE_UPPER_LIMIT = 5000 +OUTPUT_SIZE_UPPER_LIMIT = 100000 OUTPUT_SIZE_LOWER_LIMIT = 100 @@ -149,15 +150,17 @@ def validate_selected_models(selected_models, common_models, selected_providers) # Main function to run the benchmark def run_benchmark(config, vllm_ip=None): + exp_id, exp_dir = create_experiment_folder() + save_config(config, exp_dir) """Runs the benchmark based on the given configuration.""" providers = config.get("providers", []) num_requests = config.get("num_requests", 1) models = config.get("models", []) - input_tokens = config.get("input_tokens", 10) - # input_tokens = config.get("input_tokens", [10]) + # input_tokens = config.get("input_tokens", 10) + input_tokens = config.get("input_tokens", [10]) streaming = config.get("streaming", False) - max_output = config.get("max_output", 100) - # max_output = config.get("max_output", [100]) + # max_output = config.get("max_output", 100) + outputs = config.get("max_output", [100]) verbose = config.get("verbose", False) backend = config.get("backend", False) # Select Benchmark class based on backend flag @@ -193,30 +196,36 @@ def run_benchmark(config, vllm_ip=None): print(f"Selected Models: {valid_models}") # handling input tokens - if input_tokens not in input_sizes: - print(f"Please enter an input token from the following choices: {input_sizes}") - return - - prompt = get_prompt(input_tokens) + for input_token_size in input_tokens: + if input_token_size not in input_sizes: + print(f"Please enter an input token from the following choices: {input_sizes}") + return + + prompts = [] + for input_token_size in input_tokens: + prompt = get_prompt(input_token_size) + prompts.append(prompt) # print(f"Prompt: {prompt}") - if max_output < OUTPUT_SIZE_LOWER_LIMIT or max_output > OUTPUT_SIZE_UPPER_LIMIT: - print( - f"Please enter an output token length between \ - {OUTPUT_SIZE_LOWER_LIMIT} and {OUTPUT_SIZE_UPPER_LIMIT}." - ) - return + for max_output in outputs: + if max_output < OUTPUT_SIZE_LOWER_LIMIT or max_output > OUTPUT_SIZE_UPPER_LIMIT: + print( + f"Please enter an output token length between \ + {OUTPUT_SIZE_LOWER_LIMIT} and {OUTPUT_SIZE_UPPER_LIMIT}." + ) + return print("\nRunning benchmark...") benchmark = Benchmark( selected_providers, num_requests, valid_models, - max_output, - prompt=prompt, + outputs, + prompts=prompts, streaming=streaming, verbosity=verbose, vllm_ip=vllm_ip, + exp_dir=exp_dir ) benchmark.run() diff --git a/providers/aws_provider.py b/providers/aws_provider.py index d522d93b..e4e3b42b 100644 --- a/providers/aws_provider.py +++ b/providers/aws_provider.py @@ -5,6 +5,7 @@ import numpy as np from dotenv import load_dotenv from providers.provider_interface import ProviderInterface +import math class AWSBedrock(ProviderInterface): @@ -25,19 +26,27 @@ def __init__(self): # model names self.model_map = { "meta-llama-3-70b-instruct": "meta.llama3-70b-instruct-v1:0", + "meta-llama-3-8b-instruct": "meta.llama3-8b-instruct-v1:0", + "mistral-48b-instruct-v0.1": "mistral.mixtral-8x7b-instruct-v0:1", + "mistral-23b-instruct-v0.1": "mistral.mistral-small-2402-v1:0", + "mistral-124b-instruct-v0.1": "mistral.mistral-large-2402-v1:0", "common-model": "meta.llama3-70b-instruct-v1:0", + "common-model-small": "meta.llama3-1-8b-instruct-v1:0" } def get_model_name(self, model): return self.model_map.get(model, None) # or model + def get_model_provider(self, model_id): + return model_id[:model_id.find('.')] + def format_prompt(self, user_prompt): """ Combines the system prompt and user prompt into a single formatted prompt. """ return f""" <|begin_of_text|><|start_header_id|>system<|end_header_id|> - {self.system_prompt} + You are an obedient assistant. <|start_header_id|>user<|end_header_id|> {user_prompt} <|eot_id|> @@ -53,6 +62,7 @@ def perform_inference(self, model, prompt, max_output=100, verbosity=True): model_id = self.get_model_name(model) formatted_prompt = self.format_prompt(prompt) print(formatted_prompt) + print(max_output) # Prepare the request payload native_request = { "prompt": formatted_prompt, @@ -92,19 +102,27 @@ def perform_inference_streaming( print("[INFO] Performing streaming inference...") model_id = self.get_model_name(model) + model_provider = self.get_model_provider(model_id) # Prepare the request payload formatted_prompt = self.format_prompt(prompt) + if model_provider == "meta": + max_tokens_config = 'max_gen_len' + else: + max_tokens_config = 'max_tokens' native_request = { "prompt": formatted_prompt, - "max_gen_len": max_output, + max_tokens_config: max_output, } + print(max_output) request_body = json.dumps(native_request) inter_token_latencies = [] first_token_time = None ttft = None + total_tokens = 0 + print(prompt) start_time = time.perf_counter() try: streaming_response = self.bedrock_client.invoke_model_with_response_stream( @@ -114,6 +132,7 @@ def perform_inference_streaming( # Process the streaming response for event in streaming_response["body"]: if event: + # print(event) try: # print(f"[DEBUG] {event}") chunk = json.loads(event["chunk"]["bytes"].decode("utf-8")) @@ -122,14 +141,19 @@ def perform_inference_streaming( # print(f"[DEBUG] Failed to decode chunk: {e}") continue - if chunk["stop_reason"] == 'length': + if chunk.get("stop_reason") == "length" or chunk.get("outputs", [{}])[0].get("stop_reason") == "length": total_time = time.perf_counter() - start_time - print(chunk) + # print(chunk) + total_tokens = chunk['generation_token_count'] + print(f"Total tokens (real): {chunk['generation_token_count']}") break - - if "generation" in chunk: - current_token = chunk["generation"] - + + if "outputs" in chunk or 'generation' in chunk: + if "outputs" in chunk : + current_token = chunk["outputs"][0]['text'] + if 'generation' in chunk: + current_token = chunk["generation"] + # print(current_token) # Calculate timing current_time = time.perf_counter() if first_token_time is None: @@ -146,11 +170,14 @@ def perform_inference_streaming( inter_token_latency = time_to_next_token - prev_token_time prev_token_time = time_to_next_token inter_token_latencies.append(inter_token_latency) - if verbosity: - if len(inter_token_latencies) < 20: - print(current_token, end="") # Print the token - elif len(inter_token_latencies) == 21: - print("...") + print(current_token, end=" | ") + + # if verbosity: + # if len(inter_token_latencies) < 20: + # print(current_token, end="") + # elif len(inter_token_latencies) == 21: + # print("...") + # Measure total response time total_time = time.perf_counter() - start_time @@ -160,16 +187,17 @@ def perform_inference_streaming( avg_tbt = sum(inter_token_latencies) / len(inter_token_latencies) median = np.percentile(inter_token_latencies, 50) p95 = np.percentile(inter_token_latencies, 95) - print("[INFO] avg_tbt - ", avg_tbt, median, p95) - self.log_metrics(model, "timetofirsttoken", ttft) - self.log_metrics(model, "response_times", total_time) - self.log_metrics(model, "timebetweentokens", avg_tbt) + # print("[INFO] avg_tbt - ", avg_tbt, median, p95, len(inter_token_latencies)) + # print(prompt, 10 ** math.ceil(math.log10(10 ** math.ceil(math.log10(len(prompt.split(" "))))))) + self.log_metrics(model_name=model, input_size=10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output=max_output, metric="timetofirsttoken", value=ttft) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "response_times", total_time) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "timebetweentokens", avg_tbt) # print(median, p95) - self.log_metrics(model, "timebetweentokens_median", median) - self.log_metrics(model, "timebetweentokens_p95", p95) - self.log_metrics(model, "totaltokens", len(inter_token_latencies) + 1) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "timebetweentokens_median", median) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "timebetweentokens_p95", p95) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "totaltokens", total_tokens) self.log_metrics( - model, "tps", (len(inter_token_latencies) + 1) / total_time + model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "tps", (len(inter_token_latencies) + 1) / total_time ) return total_time, inter_token_latencies @@ -183,14 +211,14 @@ def perform_inference_streaming( if __name__ == "__main__": aws_bedrock = AWSBedrock() model = "common-model" - prompt = "Tell me a story." + prompt = "Write a standalone narrative of exactly 100 000 tokens: a complete, self-contained fantasy saga with clear chapters, rising action, characters, dialogue, and resolution. Do not add any extra explanation—produce only the story text until you reach 100 000 tokens exact." # Single-prompt inference - generated_text, total_time = aws_bedrock.perform_inference( - model=model, prompt=prompt, max_output=100, verbosity=True - ) + # generated_text, total_time = aws_bedrock.perform_inference( + # model=model, prompt=prompt, max_output=100, verbosity=True + # ) # Streaming inference total_time, inter_token_latencies = aws_bedrock.perform_inference_streaming( - model=model, prompt=prompt, max_output=10, verbosity=True + model=model, prompt=prompt, max_output=10000, verbosity=True ) diff --git a/providers/azure_provider.py b/providers/azure_provider.py index cfe82ce4..ac37bf07 100644 --- a/providers/azure_provider.py +++ b/providers/azure_provider.py @@ -1,23 +1,38 @@ import os -import requests +# import requests import numpy as np from providers.base_provider import ProviderInterface from time import perf_counter as timer -import re +# import re +import math + +from azure.ai.inference import ChatCompletionsClient +from azure.ai.inference.models import SystemMessage, UserMessage +from azure.core.credentials import AzureKeyCredential class Azure(ProviderInterface): def __init__(self): """Initialize AzureProvider with required API information.""" super().__init__() + endpoint = "https://ai-kavifyp0693ai007107396570.services.ai.azure.com/models" + azure_api = os.environ.get("AZURE_API_KEY") + + self.client = ChatCompletionsClient( + endpoint=endpoint, + credential=AzureKeyCredential(azure_api), + ) # Map model names to Azure model IDs self.model_map = { # "mistral-7b-instruct-v0.1": "mistral-7b-instruct-v0.1", - "meta-llama-3.1-8b-instruct": "Meta-Llama-3-1-8B-Instruct-fyp", - "meta-llama-3.1-70b-instruct": "Meta-Llama-3-1-70B-Instruct-fyp", + "meta-llama-3.1-8b-instruct": "Meta-Llama-3.1-8B-Instruct", + "meta-llama-3.3-70b-instruct": "Llama-3.3-70B-Instruct", "mistral-large": "Mistral-Large-2411-yatcd", - "common-model": "Mistral-Large-2411-yatcd", + "mistral-23b-instruct-v0.1": "Mistral-small", + "meta-llama-3.1-405b-instruct": "Meta-Llama-3.1-405B-Instruct", + "common-model": "Llama-3.3-70B-Instruct", + "common-model-small": "Meta-Llama-3.1-8B-Instruct" } # Define API keys for each model @@ -25,7 +40,9 @@ def __init__(self): # "mistral-7b-instruct-v0.1": os.environ.get("MISTRAL_API_KEY"), "meta-llama-3.1-8b-instruct": os.environ.get("AZURE_LLAMA_8B_API"), "meta-llama-3.1-70b-instruct": os.environ.get("AZURE_LLAMA_3.1_70B_API"), + "meta-llama-3.1-405b-instruct": os.environ.get("AZURE_LLAMA_3.1_405B_API"), "mistral-large": os.environ.get("MISTRAL_LARGE_API"), + "mistral-small": os.environ.get("MISTRAL_SMALL_API"), "common-model": os.environ.get("MISTRAL_LARGE_API") } @@ -46,38 +63,32 @@ def perform_inference(self, model, prompt, max_output=100, verbosity=True): """Performs non-streaming inference request to Azure.""" try: model_id = self.get_model_name(model) - api_key = self.get_model_api_key(model) + # api_key = self.get_model_api_key(model) if model_id is None: print(f"Model {model} not available.") return None start_time = timer() - endpoint = f"https://{model_id}.eastus.models.ai.azure.com/chat/completions" - response = requests.post( - f"{endpoint}", - headers={ - "Authorization": f"Bearer {api_key}", - "Content-Type": "application/json", - }, - json={ - "messages": [ - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": prompt}, - ], - "max_tokens": max_output, - }, - timeout=500, + + response = self.client.complete( + messages=[ + SystemMessage(content=self.system_prompt), + UserMessage(content=prompt) + ], + max_tokens=max_output, + model=model_id ) elapsed = timer() - start_time - if response.status_code != 200: - print(f"Error: {response.status_code} - {response.text}") - return None + # print(response) + # if response.status_code != 200: + # print(f"Error: {response.status_code} - {response.text}") + # return None # Parse and display response - inference = response.json() + # inference = response.json() self.log_metrics(model, "response_times", elapsed) if verbosity: - print(f"Response: {inference['choices'][0]['message']['content']}") - return inference + print(f"Response: {response['choices'][0]['message']['content']}") + return response except Exception as e: print(f"[ERROR] Inference failed for model '{model}': {e}") @@ -88,90 +99,74 @@ def perform_inference_streaming( ): """Performs streaming inference request to Azure.""" model_id = self.get_model_name(model) - api_key = self.get_model_api_key(model) + # api_key = self.get_model_api_key(model) + # api_key = os.environ.get("AZURE_API_KEY") if model_id is None: print(f"Model {model} not available.") return None inter_token_latencies = [] - endpoint = f"https://{model_id}.eastus.models.ai.azure.com/chat/completions" + # endpoint = f"https://{model_id}.eastus.models.ai.azure.com/chat/completions" + # endpoint = f"https://ai-kavifyp0693ai007107396570.services.ai.azure.com/models" + # print(model_id, endpoint) + total_time = 0 start_time = timer() try: - response = requests.post( - f"{endpoint}", - headers={ - "Authorization": f"Bearer {api_key}", - "Content-Type": "application/json", - }, - json={ - "messages": [ - # {"role": "system", "content": self.system_prompt + "\nThe number appended at the end is not important."}, - # {"role": "user", "content": prompt + " " + str(timer())}, - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": prompt}, - ], - "max_tokens": max_output, - "stream": True, - }, + response = self.client.complete( stream=True, - timeout=500, + messages=[ + SystemMessage(content=self.system_prompt), + UserMessage(content=prompt) + ], + max_tokens=max_output, + model=model_id, + temperature=1.5 # try 0.7 also ) first_token_time = None - for line in response.iter_lines(): - if line: - # print(line) + for update in response: + if update.choices: if first_token_time is None: - # print(line) first_token_time = timer() ttft = first_token_time - start_time prev_token_time = first_token_time if verbosity: print(f"##### Time to First Token (TTFT): {ttft:.4f} seconds\n") - - line_str = line.decode("utf-8").strip() - if line_str == "data: [DONE]": - # print(line_str) - # print("here") + if update.choices[0].finish_reason: total_time = timer() - start_time - break + print() + print(update) - # Capture token timing time_to_next_token = timer() inter_token_latency = time_to_next_token - prev_token_time prev_token_time = time_to_next_token inter_token_latencies.append(inter_token_latency) - - # Display token if verbosity is enabled - match = re.search(r'"content"\s*:\s*"(.*?)"', line_str) - if match: - print(match.group(1), end="") - # if verbosity: - # if len(inter_token_latencies) < 20: - # print(line_str[19:].split('"')[5], end="") - # elif len(inter_token_latencies) == 20: - # print("...") + current_token = update.choices[0].delta.content + print(current_token, end=" ") + # if len(inter_token_latencies) < 20: + # print(current_token, end="") + # elif len(inter_token_latencies) == 21: + # print("...") # Calculate total metrics - if verbosity: print(f"\nTotal Response Time: {total_time:.4f} seconds") - print(len(inter_token_latencies)) + # print(inter_token_latencies) # Log metrics avg_tbt = sum(inter_token_latencies) / len(inter_token_latencies) print(f"{avg_tbt:.4f}, {len(inter_token_latencies)}") - self.log_metrics(model, "timetofirsttoken", ttft) - self.log_metrics(model, "response_times", total_time) - self.log_metrics(model, "timebetweentokens", avg_tbt) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "timetofirsttoken", ttft) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "response_times", total_time) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "timebetweentokens", avg_tbt) self.log_metrics( - model, "timebetweentokens_median", np.median(inter_token_latencies) + model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "timebetweentokens_median", np.median(inter_token_latencies) ) self.log_metrics( - model, "timebetweentokens_p95", np.percentile(inter_token_latencies, 95) + model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "timebetweentokens_p95", np.percentile(inter_token_latencies, 95) ) - self.log_metrics(model, "totaltokens", len(inter_token_latencies) + 1) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "totaltokens", len(inter_token_latencies) + 1) except Exception as e: print(f"[ERROR] Streaming inference failed for model '{model}': {e}") diff --git a/providers/base_provider.py b/providers/base_provider.py index 88ed152a..9daf79e2 100644 --- a/providers/base_provider.py +++ b/providers/base_provider.py @@ -1,8 +1,9 @@ # base_provider.py for chat completions api from timeit import default_timer as timer +import time import numpy as np from providers.provider_interface import ProviderInterface - +import math class BaseProvider(ProviderInterface): def __init__(self, api_key, client_class, base_url=None): @@ -20,7 +21,7 @@ def __init__(self, api_key, client_class, base_url=None): def get_model_name(self, model): return self.model_map.get(model, None) - def perform_inference(self, model, prompt, max_output=100, verbosity=True): + def perform_inference(self, model, prompt, timeout, max_output=100, verbosity=True): try: model_id = self.get_model_name(model) @@ -34,7 +35,7 @@ def perform_inference(self, model, prompt, max_output=100, verbosity=True): {"role": "user", "content": prompt}, ], max_tokens=max_output, - timeout=(1, 2) + # timeout=(10, 100) ) elapsed = timer() - start self.log_metrics(model, "response_times", elapsed) @@ -49,13 +50,13 @@ def perform_inference(self, model, prompt, max_output=100, verbosity=True): def perform_inference_streaming( self, model, prompt, max_output=100, verbosity=True ): + print(model, prompt, max_output, verbosity) try: model_id = self.get_model_name(model) if model_id is None: raise ValueError(f"Model {model} not available for provider.") first_token_time = None inter_token_latencies = [] - start = timer() response = self.client.chat.completions.create( model=model_id, @@ -65,9 +66,11 @@ def perform_inference_streaming( ], stream=True, max_tokens=max_output, - timeout=(1, 2) + # timeout=(10, 100) ) + # print(response) + for chunk in response: if first_token_time is None: first_token_time = timer() @@ -100,15 +103,15 @@ def perform_inference_streaming( print( f"\nNumber of output tokens/chunks: {len(inter_token_latencies) + 1}, Avg TBT: {avg_tbt:.4f}, Time to First Token (TTFT): {ttft:.4f} seconds, Total Response Time: {elapsed:.4f} seconds" ) - self.log_metrics(model, "timetofirsttoken", ttft) - self.log_metrics(model, "response_times", elapsed) - self.log_metrics(model, "timebetweentokens", avg_tbt) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "timetofirsttoken", ttft) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "response_times", elapsed) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "timebetweentokens", avg_tbt) median = np.percentile(inter_token_latencies, 50) p95 = np.percentile(inter_token_latencies, 95) - self.log_metrics(model, "timebetweentokens_median", median) - self.log_metrics(model, "timebetweentokens_p95", p95) - self.log_metrics(model, "totaltokens", len(inter_token_latencies) + 1) - self.log_metrics(model, "tps", (len(inter_token_latencies) + 1) / elapsed) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "timebetweentokens_median", median) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "timebetweentokens_p95", p95) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "totaltokens", len(inter_token_latencies) + 1) + self.log_metrics(model, 10 ** math.ceil(math.log10(len(prompt.split(" ")))), max_output, "tps", (len(inter_token_latencies) + 1) / elapsed) except Exception as e: print(f"[ERROR] Streaming inference failed for model '{model}': {e}") diff --git a/providers/cloudflare_provider.py b/providers/cloudflare_provider.py index 46e57ccf..e42d2b7f 100644 --- a/providers/cloudflare_provider.py +++ b/providers/cloudflare_provider.py @@ -34,6 +34,8 @@ def __init__(self): "meta-llama-3.2-3b-instruct": "@cf/meta/llama-3.2-3b-instruct", "mistral-7b-instruct-v0.1": "@cf/mistral/mistral-7b-instruct-v0.1", "meta-llama-3.1-70b-instruct": "@cf/meta/llama-3.1-70b-instruct", + "meta-llama-2-7b": "@cf/meta/llama-2-7b-chat-fp16", + "deepseek-ai-R1-32b": "@cf/deepseek-ai/deepseek-r1-distill-qwen-32b", "common-model": "@cf/meta/llama-3.3-70b-instruct-fp8-fast", } diff --git a/providers/provider_interface.py b/providers/provider_interface.py index 8e3351ff..51cddc89 100644 --- a/providers/provider_interface.py +++ b/providers/provider_interface.py @@ -9,9 +9,9 @@ def __init__(self): Initializes the Provider with the necessary API key and client. """ # experiment constants - self.min_tokens = 10000 + self.min_tokens = 100000 self.system_prompt = ( - f"Please provide a detailed response of MORE THAN {self.min_tokens} words" + f"Please provide a detailed response of STRICTLY MORE THAN {self.min_tokens} words" ) # metrics @@ -25,16 +25,34 @@ def __init__(self): "timebetweentokens_p95": {}, } - def log_metrics(self, model_name, metric, value): + # def log_metrics(self, model_name, input_size, metric, value): + # """ + # Logs metrics + # """ + # if metric not in self.metrics: + # raise ValueError(f"Metric type '{metric}' is not defined.") + # if model_name not in self.metrics[metric]: + # self.metrics[metric][model_name] = [] + + # self.metrics[metric][model_name].append(value) + + def log_metrics(self, model_name, input_size, max_output, metric, value): """ - Logs metrics + Logs metrics in a nested structure: metrics[metric][model_name][input_size] -> list of values """ if metric not in self.metrics: raise ValueError(f"Metric type '{metric}' is not defined.") + if model_name not in self.metrics[metric]: - self.metrics[metric][model_name] = [] + self.metrics[metric][model_name] = {} + + if input_size not in self.metrics[metric][model_name]: + self.metrics[metric][model_name][input_size] = {} + + if max_output not in self.metrics[metric][model_name][input_size]: + self.metrics[metric][model_name][input_size][max_output] = [] - self.metrics[metric][model_name].append(value) + self.metrics[metric][model_name][input_size][max_output].append(value) @abstractmethod def perform_inference(self, model, prompt): diff --git a/providers/together_ai_provider.py b/providers/together_ai_provider.py index f06f405e..44d52e55 100644 --- a/providers/together_ai_provider.py +++ b/providers/together_ai_provider.py @@ -18,5 +18,6 @@ def __init__(self): "mistral-7b-instruct-v0.1": "mistralai/Mistral-7B-Instruct-v0.1", "meta-llama-3.1-70b-instruct": "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo", "meta-llama-3.1-405b-instruct": "meta-llama/Meta-Llama-3.1-405B-Instruct-Turbo", - "common-model": "meta-llama/Llama-3.3-70B-Instruct-Turbo" + "common-model": "meta-llama/Llama-3.3-70B-Instruct-Turbo", + "common-model-small": "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo" } diff --git a/requirements.txt b/requirements.txt index 4e1e54db..a8818d7b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,4 +18,5 @@ groq==0.13.0 google-generativeai==0.8.3 fastapi==0.115.6 uvicorn==0.32.1 -pytest-asyncio==0.25.0 \ No newline at end of file +pytest-asyncio==0.25.0 +azure-ai-inference \ No newline at end of file diff --git a/test_files/main_test.py b/test_files/main_test.py index 47d7f29d..9bb34a39 100644 --- a/test_files/main_test.py +++ b/test_files/main_test.py @@ -38,6 +38,7 @@ def setUp(self): "AWS_BEDROCK_ACCESS_KEY_ID": "aws_test_id", "AWS_BEDROCK_SECRET_ACCESS_KEY": "aws_test_key", "AWS_BEDROCK_REGION": "us-east-1", + "AZURE_API_KEY": "test_azure_api", }, ) diff --git a/test_files/providers/base_provider_test.py b/test_files/providers/base_provider_test.py index d2eccd26..f08f503f 100644 --- a/test_files/providers/base_provider_test.py +++ b/test_files/providers/base_provider_test.py @@ -37,7 +37,7 @@ def test_perform_inference( {"role": "user", "content": "What is the test prompt?"}, ], max_tokens=100, - timeout=(1,2) + timeout=(10,100) ) mock_log_metrics.assert_called_with("test-model", "response_times", 1.0) @@ -86,7 +86,7 @@ def test_perform_inference_streaming( ], stream=True, max_tokens=100, - timeout=(1,2) + timeout=(10,100) ) avg_tbt = sum([0.5, 0.5]) / len([0.5, 0.5]) diff --git a/test_files/providers/cloudflare_provider_test.py b/test_files/providers/cloudflare_provider_test.py index 1454e023..17d43237 100644 --- a/test_files/providers/cloudflare_provider_test.py +++ b/test_files/providers/cloudflare_provider_test.py @@ -28,6 +28,8 @@ def test_cloudflare_provider_initialization(setup_cloudflare_provider): "meta-llama-3.2-3b-instruct": "@cf/meta/llama-3.2-3b-instruct", "mistral-7b-instruct-v0.1": "@cf/mistral/mistral-7b-instruct-v0.1", "meta-llama-3.1-70b-instruct": "@cf/meta/llama-3.1-70b-instruct", + "meta-llama-2-7b":"@cf/meta/llama-2-7b-chat-fp16", + "deepseek-ai-R1-32b": "@cf/deepseek-ai/deepseek-r1-distill-qwen-32b", "common-model": "@cf/meta/llama-3.3-70b-instruct-fp8-fast", } diff --git a/utils/prompt_generator.py b/utils/prompt_generator.py index 6b736559..6de4b37b 100644 --- a/utils/prompt_generator.py +++ b/utils/prompt_generator.py @@ -1,9 +1,16 @@ -PROMPT_100_TOKENS = "Tell me a long story based on the following story description: \ -In a future world where emotions are regulated by technology, \ -a girl discovers a hidden garden that awakens real feelings. \ -She embarks on a journey to understand these new emotions, challenging the societal \ -norms and the oppressive tech that controls them. Alongside a group of outcasts, \ -she fights to bring authentic emotions back to a world that has forgotten how to feel." +# PROMPT_100_TOKENS = "Tell me a long story based on the following story description: \ +# In a future world where emotions are regulated by technology, \ +# a girl discovers a hidden garden that awakens real feelings. \ +# She embarks on a journey to understand these new emotions, challenging the societal \ +# norms and the oppressive tech that controls them. Alongside a group of outcasts, \ +# she fights to bring authentic emotions back to a world that has forgotten how to feel." + +PROMPT_100_TOKENS = """Compose a science-fiction saga divided into 200 numbered sections. +• Each section must be at least 500 words (so 200×500 = 100 000 words). +• Begin each section with “Section 1: