chore: move benchmarking related code (#3406)

# What does this PR do?
- moving things and some formatting changes


## Test Plan
This commit is contained in:
ehhuang 2025-09-10 13:19:44 -07:00 committed by GitHub
parent d2f88a10fb
commit c04f1c1e8c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 156 additions and 149 deletions

View file

@ -34,13 +34,12 @@ This data enables data-driven architectural decisions and performance optimizati
**1. Deploy base k8s infrastructure:**
```bash
cd ../k8s
cd ../../docs/source/distributions/k8s
./apply.sh
```
**2. Deploy benchmark components:**
```bash
cd ../k8s-benchmark
./apply.sh
```
@ -56,7 +55,6 @@ kubectl get pods
**Benchmark Llama Stack (default):**
```bash
cd docs/source/distributions/k8s-benchmark/
./run-benchmark.sh
```

View file

@ -14,7 +14,7 @@ import os
import random
import statistics
import time
from typing import Tuple
import aiohttp
@ -55,50 +55,50 @@ class BenchmarkStats:
total_time = self.end_time - self.start_time
success_rate = (self.success_count / self.total_requests) * 100
print(f"\n{'='*60}")
print(f"BENCHMARK RESULTS")
print(f"\nResponse Time Statistics:")
print(f"\n{'=' * 60}")
print("BENCHMARK RESULTS")
print("\nResponse Time Statistics:")
print(f" Mean: {statistics.mean(self.response_times):.3f}s")
print(f" Median: {statistics.median(self.response_times):.3f}s")
print(f" Min: {min(self.response_times):.3f}s")
print(f" Max: {max(self.response_times):.3f}s")
if len(self.response_times) > 1:
print(f" Std Dev: {statistics.stdev(self.response_times):.3f}s")
percentiles = [50, 90, 95, 99]
sorted_times = sorted(self.response_times)
print(f"\nPercentiles:")
print("\nPercentiles:")
for p in percentiles:
idx = int(len(sorted_times) * p / 100) - 1
idx = max(0, min(idx, len(sorted_times) - 1))
print(f" P{p}: {sorted_times[idx]:.3f}s")
if self.ttft_times:
print(f"\nTime to First Token (TTFT) Statistics:")
print("\nTime to First Token (TTFT) Statistics:")
print(f" Mean: {statistics.mean(self.ttft_times):.3f}s")
print(f" Median: {statistics.median(self.ttft_times):.3f}s")
print(f" Min: {min(self.ttft_times):.3f}s")
print(f" Max: {max(self.ttft_times):.3f}s")
if len(self.ttft_times) > 1:
print(f" Std Dev: {statistics.stdev(self.ttft_times):.3f}s")
sorted_ttft = sorted(self.ttft_times)
print(f"\nTTFT Percentiles:")
print("\nTTFT Percentiles:")
for p in percentiles:
idx = int(len(sorted_ttft) * p / 100) - 1
idx = max(0, min(idx, len(sorted_ttft) - 1))
print(f" P{p}: {sorted_ttft[idx]:.3f}s")
if self.chunks_received:
print(f"\nStreaming Statistics:")
print("\nStreaming Statistics:")
print(f" Mean chunks per response: {statistics.mean(self.chunks_received):.1f}")
print(f" Total chunks received: {sum(self.chunks_received)}")
print(f"{'='*60}")
print(f"{'=' * 60}")
print(f"Total time: {total_time:.2f}s")
print(f"Concurrent users: {self.concurrent_users}")
print(f"Total requests: {self.total_requests}")
@ -106,16 +106,16 @@ class BenchmarkStats:
print(f"Failed requests: {len(self.errors)}")
print(f"Success rate: {success_rate:.1f}%")
print(f"Requests per second: {self.success_count / total_time:.2f}")
if self.errors:
print(f"\nErrors (showing first 5):")
print("\nErrors (showing first 5):")
for error in self.errors[:5]:
print(f" {error}")
class LlamaStackBenchmark:
def __init__(self, base_url: str, model_id: str):
self.base_url = base_url.rstrip('/')
self.base_url = base_url.rstrip("/")
self.model_id = model_id
self.headers = {"Content-Type": "application/json"}
self.test_messages = [
@ -126,74 +126,67 @@ class LlamaStackBenchmark:
[
{"role": "user", "content": "What is machine learning?"},
{"role": "assistant", "content": "Machine learning is a subset of AI..."},
{"role": "user", "content": "Can you give me a practical example?"}
]
{"role": "user", "content": "Can you give me a practical example?"},
],
]
async def make_async_streaming_request(self) -> Tuple[float, int, float | None, str | None]:
async def make_async_streaming_request(self) -> tuple[float, int, float | None, str | None]:
"""Make a single async streaming chat completion request."""
messages = random.choice(self.test_messages)
payload = {
"model": self.model_id,
"messages": messages,
"stream": True,
"max_tokens": 100
}
payload = {"model": self.model_id, "messages": messages, "stream": True, "max_tokens": 100}
start_time = time.time()
chunks_received = 0
ttft = None
error = None
session = aiohttp.ClientSession()
try:
async with session.post(
f"{self.base_url}/chat/completions",
headers=self.headers,
json=payload,
timeout=aiohttp.ClientTimeout(total=30)
timeout=aiohttp.ClientTimeout(total=30),
) as response:
if response.status == 200:
async for line in response.content:
if line:
line_str = line.decode('utf-8').strip()
if line_str.startswith('data: '):
line_str = line.decode("utf-8").strip()
if line_str.startswith("data: "):
chunks_received += 1
if ttft is None:
ttft = time.time() - start_time
if line_str == 'data: [DONE]':
if line_str == "data: [DONE]":
break
if chunks_received == 0:
error = "No streaming chunks received"
else:
text = await response.text()
error = f"HTTP {response.status}: {text[:100]}"
except Exception as e:
error = f"Request error: {str(e)}"
finally:
await session.close()
response_time = time.time() - start_time
return response_time, chunks_received, ttft, error
async def run_benchmark(self, duration: int, concurrent_users: int) -> BenchmarkStats:
"""Run benchmark using async requests for specified duration."""
stats = BenchmarkStats()
stats.concurrent_users = concurrent_users
stats.start_time = time.time()
print(f"Starting benchmark: {duration}s duration, {concurrent_users} concurrent users")
print(f"Target URL: {self.base_url}/chat/completions")
print(f"Model: {self.model_id}")
connector = aiohttp.TCPConnector(limit=concurrent_users)
async with aiohttp.ClientSession(connector=connector) as session:
async with aiohttp.ClientSession(connector=connector):
async def worker(worker_id: int):
"""Worker that sends requests sequentially until canceled."""
request_count = 0
@ -202,12 +195,12 @@ class LlamaStackBenchmark:
response_time, chunks, ttft, error = await self.make_async_streaming_request()
await stats.add_result(response_time, chunks, ttft, error)
request_count += 1
except asyncio.CancelledError:
break
except Exception as e:
await stats.add_result(0, 0, None, f"Worker {worker_id} error: {str(e)}")
# Progress reporting task
async def progress_reporter():
last_report_time = time.time()
@ -216,48 +209,52 @@ class LlamaStackBenchmark:
await asyncio.sleep(1) # Report every second
if time.time() >= last_report_time + 10: # Report every 10 seconds
elapsed = time.time() - stats.start_time
print(f"Completed: {stats.total_requests} requests in {elapsed:.1f}s, RPS: {stats.total_requests / elapsed:.1f}")
print(
f"Completed: {stats.total_requests} requests in {elapsed:.1f}s, RPS: {stats.total_requests / elapsed:.1f}"
)
last_report_time = time.time()
except asyncio.CancelledError:
break
# Spawn concurrent workers
tasks = [asyncio.create_task(worker(i)) for i in range(concurrent_users)]
progress_task = asyncio.create_task(progress_reporter())
tasks.append(progress_task)
# Wait for duration then cancel all tasks
await asyncio.sleep(duration)
for task in tasks:
task.cancel()
# Wait for all tasks to complete
await asyncio.gather(*tasks, return_exceptions=True)
stats.end_time = time.time()
return stats
def main():
parser = argparse.ArgumentParser(description="Llama Stack Benchmark Tool")
parser.add_argument("--base-url", default=os.getenv("BENCHMARK_BASE_URL", "http://localhost:8000/v1/openai/v1"),
help="Base URL for the API (default: http://localhost:8000/v1/openai/v1)")
parser.add_argument("--model", default=os.getenv("INFERENCE_MODEL", "test-model"),
help="Model ID to use for requests")
parser.add_argument("--duration", type=int, default=60,
help="Duration in seconds to run benchmark (default: 60)")
parser.add_argument("--concurrent", type=int, default=10,
help="Number of concurrent users (default: 10)")
parser.add_argument(
"--base-url",
default=os.getenv("BENCHMARK_BASE_URL", "http://localhost:8000/v1/openai/v1"),
help="Base URL for the API (default: http://localhost:8000/v1/openai/v1)",
)
parser.add_argument(
"--model", default=os.getenv("INFERENCE_MODEL", "test-model"), help="Model ID to use for requests"
)
parser.add_argument("--duration", type=int, default=60, help="Duration in seconds to run benchmark (default: 60)")
parser.add_argument("--concurrent", type=int, default=10, help="Number of concurrent users (default: 10)")
args = parser.parse_args()
benchmark = LlamaStackBenchmark(args.base_url, args.model)
try:
stats = asyncio.run(benchmark.run_benchmark(args.duration, args.concurrent))
stats.print_summary()
except KeyboardInterrupt:
print("\nBenchmark interrupted by user")
except Exception as e:

View file

@ -11,180 +11,192 @@ OpenAI-compatible mock server that returns:
- Valid OpenAI-formatted chat completion responses with dynamic content
"""
from flask import Flask, request, jsonify, Response
import time
import random
import uuid
import json
import argparse
import json
import os
import random
import time
import uuid
from flask import Flask, Response, jsonify, request
app = Flask(__name__)
# Models from environment variables
def get_models():
models_str = os.getenv("MOCK_MODELS", "meta-llama/Llama-3.2-3B-Instruct")
model_ids = [m.strip() for m in models_str.split(",") if m.strip()]
return {
"object": "list",
"data": [
{
"id": model_id,
"object": "model",
"created": 1234567890,
"owned_by": "vllm"
}
for model_id in model_ids
]
{"id": model_id, "object": "model", "created": 1234567890, "owned_by": "vllm"} for model_id in model_ids
],
}
def generate_random_text(length=50):
"""Generate random but coherent text for responses."""
words = [
"Hello", "there", "I'm", "an", "AI", "assistant", "ready", "to", "help", "you",
"with", "your", "questions", "and", "tasks", "today", "Let", "me","know", "what",
"you'd", "like", "to", "discuss", "or", "explore", "together", "I", "can", "assist",
"with", "various", "topics", "including", "coding", "writing", "analysis", "and", "more"
"Hello",
"there",
"I'm",
"an",
"AI",
"assistant",
"ready",
"to",
"help",
"you",
"with",
"your",
"questions",
"and",
"tasks",
"today",
"Let",
"me",
"know",
"what",
"you'd",
"like",
"to",
"discuss",
"or",
"explore",
"together",
"I",
"can",
"assist",
"with",
"various",
"topics",
"including",
"coding",
"writing",
"analysis",
"and",
"more",
]
return " ".join(random.choices(words, k=length))
@app.route('/v1/models', methods=['GET'])
@app.route("/v1/models", methods=["GET"])
def list_models():
models = get_models()
print(f"[MOCK] Returning models: {[m['id'] for m in models['data']]}")
return jsonify(models)
@app.route('/v1/chat/completions', methods=['POST'])
@app.route("/v1/chat/completions", methods=["POST"])
def chat_completions():
"""Return OpenAI-formatted chat completion responses."""
data = request.get_json()
default_model = get_models()['data'][0]['id']
model = data.get('model', default_model)
messages = data.get('messages', [])
stream = data.get('stream', False)
default_model = get_models()["data"][0]["id"]
model = data.get("model", default_model)
messages = data.get("messages", [])
stream = data.get("stream", False)
print(f"[MOCK] Chat completion request - model: {model}, stream: {stream}")
if stream:
return handle_streaming_completion(model, messages)
else:
return handle_non_streaming_completion(model, messages)
def handle_non_streaming_completion(model, messages):
response_text = generate_random_text(random.randint(20, 80))
# Calculate realistic token counts
prompt_tokens = sum(len(str(msg.get('content', '')).split()) for msg in messages)
prompt_tokens = sum(len(str(msg.get("content", "")).split()) for msg in messages)
completion_tokens = len(response_text.split())
response = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": response_text
},
"finish_reason": "stop"
}
],
"choices": [{"index": 0, "message": {"role": "assistant", "content": response_text}, "finish_reason": "stop"}],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens
}
"total_tokens": prompt_tokens + completion_tokens,
},
}
return jsonify(response)
def handle_streaming_completion(model, messages):
def generate_stream():
# Generate response text
full_response = generate_random_text(random.randint(30, 100))
words = full_response.split()
# Send initial chunk
initial_chunk = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": ""}
}
]
"choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}}],
}
yield f"data: {json.dumps(initial_chunk)}\n\n"
# Send word by word
for i, word in enumerate(words):
chunk = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion.chunk",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": f"{word} " if i < len(words) - 1 else word}
}
]
"choices": [{"index": 0, "delta": {"content": f"{word} " if i < len(words) - 1 else word}}],
}
yield f"data: {json.dumps(chunk)}\n\n"
# Configurable delay to simulate realistic streaming
stream_delay = float(os.getenv("STREAM_DELAY_SECONDS", "0.005"))
time.sleep(stream_delay)
# Send final chunk
final_chunk = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": ""},
"finish_reason": "stop"
}
]
"choices": [{"index": 0, "delta": {"content": ""}, "finish_reason": "stop"}],
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"
return Response(
generate_stream(),
mimetype='text/event-stream',
mimetype="text/event-stream",
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
}
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
},
)
@app.route('/health', methods=['GET'])
@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy", "type": "openai-mock"})
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='OpenAI-compatible mock server')
parser.add_argument('--port', type=int, default=8081,
help='Port to run the server on (default: 8081)')
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="OpenAI-compatible mock server")
parser.add_argument("--port", type=int, default=8081, help="Port to run the server on (default: 8081)")
args = parser.parse_args()
port = args.port
models = get_models()
print("Starting OpenAI-compatible mock server...")
print(f"- /models endpoint with: {[m['id'] for m in models['data']]}")
print("- OpenAI-formatted chat/completion responses with dynamic content")
print("- Streaming support with valid SSE format")
print(f"- Listening on: http://0.0.0.0:{port}")
app.run(host='0.0.0.0', port=port, debug=False)
app.run(host="0.0.0.0", port=port, debug=False)

View file

@ -35,5 +35,5 @@ testing/record-replay
### Benchmarking
```{include} ../../../docs/source/distributions/k8s-benchmark/README.md
```{include} ../../../benchmarking/k8s-benchmark/README.md
```