llama-stack-mirror/llama_stack/core/utils/context.py
Ashwin Bharambe 39f33f7f12
feat(cherry-pick): fixes for 0.3.1 release (#3998)
## Summary

Cherry-picks 5 critical fixes from main to the release-0.3.x branch for
the v0.3.1 release, plus CI workflow updates.

**Note**: This recreates the cherry-picks from the closed PR #3991, now
targeting the renamed `release-0.3.x` branch (previously
`release-0.3.x-maint`).

## Commits

1. **2c56a8560** - fix(context): prevent provider data leak between
streaming requests (#3924)
- **CRITICAL SECURITY FIX**: Prevents provider credentials from leaking
between requests
   - Fixed import path for 0.3.0 compatibility

2. **ddd32b187** - fix(inference): enable routing of models with
provider_data alone (#3928)
   - Enables routing for fully qualified model IDs with provider_data
   - Resolved merge conflicts, adapted for 0.3.0 structure

3. **f7c2973aa** - fix: Avoid BadRequestError due to invalid max_tokens
(#3667)
- Fixes failures with Gemini and other providers that reject
max_tokens=0
   - Non-breaking API change

4. **d7f9da616** - fix(responses): sync conversation before yielding
terminal events in streaming (#3888)
- Ensures conversation sync executes even when streaming consumers break
early

5. **0ffa8658b** - fix(logging): ensure logs go to stderr, loggers obey
levels (#3885)
   - Fixes logging infrastructure

6. **75b49cb3c** - ci: support release branches and match client branch
(#3990)
   - Updates CI workflows to support release-X.Y.x branches
- Matches client branch from llama-stack-client-python for release
testing
   - Fixes artifact name collisions

## Adaptations for 0.3.0

- Fixed import paths: `llama_stack.core.telemetry.tracing` →
`llama_stack.providers.utils.telemetry.tracing`
- Fixed import paths: `llama_stack.core.telemetry.telemetry` →
`llama_stack.apis.telemetry`
- Changed `self.telemetry_enabled` → `self.telemetry` (0.3.0 attribute
name)
- Removed `rerank()` method that doesn't exist in 0.3.0

## Testing

All imports verified and tests should pass once CI is set up.
2025-10-30 21:51:42 -07:00

84 lines
3.6 KiB
Python

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from collections.abc import AsyncGenerator
from contextvars import ContextVar
from llama_stack.providers.utils.telemetry.tracing import CURRENT_TRACE_CONTEXT
_MISSING = object()
def preserve_contexts_async_generator[T](
gen: AsyncGenerator[T, None], context_vars: list[ContextVar]
) -> AsyncGenerator[T, None]:
"""
Wraps an async generator to preserve context variables across iterations.
This is needed because we start a new asyncio event loop for each streaming request,
and we need to preserve the context across the event loop boundary.
"""
# Capture initial context values
initial_context_values = {context_var.name: context_var.get() for context_var in context_vars}
async def wrapper() -> AsyncGenerator[T, None]:
while True:
previous_values: dict[ContextVar, object] = {}
tokens: dict[ContextVar, object] = {}
# Restore ALL context values before any await and capture previous state
# This is needed to propagate context across async generator boundaries
for context_var in context_vars:
try:
previous_values[context_var] = context_var.get()
except LookupError:
previous_values[context_var] = _MISSING
tokens[context_var] = context_var.set(initial_context_values[context_var.name])
def _restore_context_var(context_var: ContextVar, *, _tokens=tokens, _prev=previous_values) -> None:
token = _tokens.get(context_var)
previous_value = _prev.get(context_var, _MISSING)
if token is not None:
try:
context_var.reset(token)
return
except (RuntimeError, ValueError):
pass
if previous_value is _MISSING:
context_var.set(None)
else:
context_var.set(previous_value)
try:
item = await gen.__anext__()
except StopAsyncIteration:
# Restore all context vars before exiting to prevent leaks
# Use _restore_context_var for all vars to properly restore to previous values
for context_var in context_vars:
_restore_context_var(context_var)
break
except Exception:
# Restore all context vars on exception
for context_var in context_vars:
_restore_context_var(context_var)
raise
try:
yield item
# Update our tracked values with any changes made during this iteration
# Only for non-trace context vars - trace context must persist across yields
# to allow nested span tracking for telemetry
for context_var in context_vars:
if context_var is not CURRENT_TRACE_CONTEXT:
initial_context_values[context_var.name] = context_var.get()
finally:
# Restore non-trace context vars after each yield to prevent leaks between requests
# CURRENT_TRACE_CONTEXT is NOT restored here to preserve telemetry span stack
for context_var in context_vars:
if context_var is not CURRENT_TRACE_CONTEXT:
_restore_context_var(context_var)
return wrapper()