migrate router for memory wip

This commit is contained in:
Xi Yan 2024-09-20 12:19:33 -07:00
parent 7d4135d5fd
commit cda61119ce
8 changed files with 213 additions and 45 deletions

View file

@ -280,7 +280,56 @@ async def resolve_impls_with_routing(
stack_run_config: StackRunConfig,
) -> Dict[Api, Any]:
raise NotImplementedError("This is not implemented yet")
all_providers = api_providers()
specs = {}
for api_str in stack_run_config.apis_to_serve:
api = Api(api_str)
providers = all_providers[api]
# check for regular providers without routing
if api_str in stack_run_config.provider_map:
provider_map_entry = stack_run_config.provider_map[api_str]
if provider_map_entry.provider_id not in providers:
raise ValueError(
f"Unknown provider `{provider_id}` is not available for API `{api}`"
)
specs[api] = providers[provider_map_entry.provider_id]
# check for routing table, we need to pass routing table to the router implementation
if api_str in stack_run_config.provider_routing_table:
router_entry = stack_run_config.provider_routing_table[api_str]
inner_specs = []
for rt_entry in router_entry:
if rt_entry.provider_id not in providers:
raise ValueError(
f"Unknown provider `{rt_entry.provider_id}` is not available for API `{api}`"
)
inner_specs.append(providers[rt_entry.provider_id])
specs[api] = RouterProviderSpec(
api=api,
module=f"llama_stack.distribution.routers.{api.value.lower()}",
api_dependencies=[],
inner_specs=inner_specs,
)
sorted_specs = topological_sort(specs.values())
impls = {}
for spec in sorted_specs:
api = spec.api
deps = {api: impls[api] for api in spec.api_dependencies}
if api.value in stack_run_config.provider_map:
provider_config = stack_run_config.provider_map[api.value]
elif api.value in stack_run_config.provider_routing_table:
provider_config = stack_run_config.provider_routing_table[api.value]
else:
raise ValueError(f"Cannot find provider_config for Api {api.value}")
impl = await instantiate_provider(spec, deps, provider_config)
impls[api] = impl
return impls, specs
async def resolve_impls(
@ -345,6 +394,7 @@ def main(yaml_config: str, port: int = 5000, disable_ipv6: bool = False):
if config.provider_routing_table is not None:
impls, specs = asyncio.run(resolve_impls_with_routing(config))
else:
# keeping this for backwards compatibility,could
impls, specs = asyncio.run(resolve_impls(config.provider_map))
if Api.telemetry in impls: