Added Streamable HTTP support

This adds a new CLI argument, --transport, with the following values: http-first (the default), http-only, sse-first, and sse-only. Any of the -first tags attempts to connect to the URL as either an HTTP or SSE server and falls back to the other.
This commit is contained in:
Glen Maddern 2025-04-16 16:59:36 +10:00 committed by Glen Maddern
parent 504aa26761
commit 04e3d255b1
6 changed files with 373 additions and 231 deletions

View file

@ -11,25 +11,36 @@
import { EventEmitter } from 'events'
import { Client } from '@modelcontextprotocol/sdk/client/index.js'
import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js'
import { ListResourcesResultSchema, ListToolsResultSchema } from '@modelcontextprotocol/sdk/types.js'
import { UnauthorizedError } from '@modelcontextprotocol/sdk/client/auth.js'
import { NodeOAuthClientProvider } from './lib/node-oauth-client-provider'
import { parseCommandLineArgs, setupSignalHandlers, log, MCP_REMOTE_VERSION, getServerUrlHash } from './lib/utils'
import { coordinateAuth } from './lib/coordination'
import {
parseCommandLineArgs,
setupSignalHandlers,
log,
MCP_REMOTE_VERSION,
getServerUrlHash,
connectToRemoteServer,
TransportStrategy,
} from './lib/utils'
import { createLazyAuthCoordinator } from './lib/coordination'
/**
* Main function to run the client
*/
async function runClient(serverUrl: string, callbackPort: number, headers: Record<string, string>) {
async function runClient(
serverUrl: string,
callbackPort: number,
headers: Record<string, string>,
transportStrategy: TransportStrategy = 'http-first',
) {
// Set up event emitter for auth flow
const events = new EventEmitter()
// Get the server URL hash for lockfile operations
const serverUrlHash = getServerUrlHash(serverUrl)
// Coordinate authentication with other instances
const { server, waitForAuthCode, skipBrowserAuth } = await coordinateAuth(serverUrlHash, callbackPort, events)
// Create a lazy auth coordinator
const authCoordinator = createLazyAuthCoordinator(serverUrlHash, callbackPort, events)
// Create the OAuth client provider
const authProvider = new NodeOAuthClientProvider({
@ -38,14 +49,6 @@ async function runClient(serverUrl: string, callbackPort: number, headers: Recor
clientName: 'MCP CLI Client',
})
// If auth was completed by another instance, just log that we'll use the auth from disk
if (skipBrowserAuth) {
log('Authentication was completed by another instance - will use tokens from disk...')
// TODO: remove, the callback is happening before the tokens are exchanged
// so we're slightly too early
await new Promise((res) => setTimeout(res, 1_000))
}
// Create the client
const client = new Client(
{
@ -57,10 +60,33 @@ async function runClient(serverUrl: string, callbackPort: number, headers: Recor
},
)
// Create the transport factory
const url = new URL(serverUrl)
function initTransport() {
const transport = new SSEClientTransport(url, { authProvider, requestInit: { headers } })
// Keep track of the server instance for cleanup
let server: any = null
// Define an auth initializer function
const authInitializer = async () => {
const authState = await authCoordinator.initializeAuth()
// Store server in outer scope for cleanup
server = authState.server
// If auth was completed by another instance, just log that we'll use the auth from disk
if (authState.skipBrowserAuth) {
log('Authentication was completed by another instance - will use tokens from disk...')
// TODO: remove, the callback is happening before the tokens are exchanged
// so we're slightly too early
await new Promise((res) => setTimeout(res, 1_000))
}
return {
waitForAuthCode: authState.waitForAuthCode,
skipBrowserAuth: authState.skipBrowserAuth,
}
}
try {
// Connect to remote server with lazy authentication
const transport = await connectToRemoteServer(client, serverUrl, authProvider, headers, authInitializer, transportStrategy)
// Set up message and error handlers
transport.onmessage = (message) => {
@ -75,89 +101,59 @@ async function runClient(serverUrl: string, callbackPort: number, headers: Recor
log('Connection closed.')
process.exit(0)
}
return transport
}
const transport = initTransport()
// Set up cleanup handler
const cleanup = async () => {
log('\nClosing connection...')
await client.close()
server.close()
}
setupSignalHandlers(cleanup)
// Try to connect
try {
log('Connecting to server...')
await client.connect(transport)
log('Connected successfully!')
} catch (error) {
if (error instanceof UnauthorizedError || (error instanceof Error && error.message.includes('Unauthorized'))) {
log('Authentication required. Waiting for authorization...')
// Wait for the authorization code from the callback or another instance
const code = await waitForAuthCode()
try {
log('Completing authorization...')
await transport.finishAuth(code)
// Reconnect after authorization with a new transport
log('Connecting after authorization...')
await client.connect(initTransport())
log('Connected successfully!')
// Request tools list after auth
log('Requesting tools list...')
const tools = await client.request({ method: 'tools/list' }, ListToolsResultSchema)
log('Tools:', JSON.stringify(tools, null, 2))
// Request resources list after auth
log('Requesting resource list...')
const resources = await client.request({ method: 'resources/list' }, ListResourcesResultSchema)
log('Resources:', JSON.stringify(resources, null, 2))
log('Listening for messages. Press Ctrl+C to exit.')
} catch (authError) {
log('Authorization error:', authError)
// Set up cleanup handler
const cleanup = async () => {
log('\nClosing connection...')
await client.close()
// If auth was initialized and server was created, close it
if (server) {
server.close()
process.exit(1)
}
} else {
log('Connection error:', error)
server.close()
process.exit(1)
}
}
setupSignalHandlers(cleanup)
try {
// Request tools list
log('Requesting tools list...')
const tools = await client.request({ method: 'tools/list' }, ListToolsResultSchema)
log('Tools:', JSON.stringify(tools, null, 2))
} catch (e) {
log('Error requesting tools list:', e)
}
log('Connected successfully!')
try {
// Request resources list
log('Requesting resource list...')
const resources = await client.request({ method: 'resources/list' }, ListResourcesResultSchema)
log('Resources:', JSON.stringify(resources, null, 2))
} catch (e) {
log('Error requesting resources list:', e)
}
try {
// Request tools list
log('Requesting tools list...')
const tools = await client.request({ method: 'tools/list' }, ListToolsResultSchema)
log('Tools:', JSON.stringify(tools, null, 2))
} catch (e) {
log('Error requesting tools list:', e)
}
log('Listening for messages. Press Ctrl+C to exit.')
try {
// Request resources list
log('Requesting resource list...')
const resources = await client.request({ method: 'resources/list' }, ListResourcesResultSchema)
log('Resources:', JSON.stringify(resources, null, 2))
} catch (e) {
log('Error requesting resources list:', e)
}
// log('Listening for messages. Press Ctrl+C to exit.')
log('Exiting OK...')
// Only close the server if it was initialized
if (server) {
server.close()
}
process.exit(0)
} catch (error) {
log('Fatal error:', error)
// Only close the server if it was initialized
if (server) {
server.close()
}
process.exit(1)
}
}
// Parse command-line arguments and run the client
parseCommandLineArgs(process.argv.slice(2), 3333, 'Usage: npx tsx client.ts <https://server-url> [callback-port]')
.then(({ serverUrl, callbackPort, headers }) => {
return runClient(serverUrl, callbackPort, headers)
.then(({ serverUrl, callbackPort, headers, transportStrategy }) => {
return runClient(serverUrl, callbackPort, headers, transportStrategy)
})
.catch((error) => {
console.error('Fatal error:', error)

View file

@ -5,6 +5,10 @@ import express from 'express'
import { AddressInfo } from 'net'
import { log, setupOAuthCallbackServerWithLongPoll } from './utils'
export type AuthCoordinator = {
initializeAuth: () => Promise<{ server: Server; waitForAuthCode: () => Promise<string>; skipBrowserAuth: boolean }>
}
/**
* Checks if a process with the given PID is running
* @param pid The process ID to check
@ -88,6 +92,36 @@ export async function waitForAuthentication(port: number): Promise<boolean> {
}
}
/**
* Creates a lazy auth coordinator that will only initiate auth when needed
* @param serverUrlHash The hash of the server URL
* @param callbackPort The port to use for the callback server
* @param events The event emitter to use for signaling
* @returns An AuthCoordinator object with an initializeAuth method
*/
export function createLazyAuthCoordinator(
serverUrlHash: string,
callbackPort: number,
events: EventEmitter
): AuthCoordinator {
let authState: { server: Server; waitForAuthCode: () => Promise<string>; skipBrowserAuth: boolean } | null = null
return {
initializeAuth: async () => {
// If auth has already been initialized, return the existing state
if (authState) {
return authState
}
log('Initializing auth coordination on-demand')
// Initialize auth using the existing coordinateAuth logic
authState = await coordinateAuth(serverUrlHash, callbackPort, events)
return authState
}
}
}
/**
* Coordinates authentication between multiple instances of the client/proxy
* @param serverUrlHash The hash of the server URL

View file

@ -1,6 +1,15 @@
import { OAuthClientProvider, UnauthorizedError } from '@modelcontextprotocol/sdk/client/auth.js'
import { Client } from '@modelcontextprotocol/sdk/client/index.js'
import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js'
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'
import { Transport } from '@modelcontextprotocol/sdk/shared/transport.js'
// Connection constants
export const REASON_AUTH_NEEDED = 'authentication-needed'
export const REASON_TRANSPORT_FALLBACK = 'falling-back-to-alternate-transport'
// Transport strategy types
export type TransportStrategy = 'sse-only' | 'http-only' | 'sse-first' | 'http-first'
import { OAuthCallbackServerOptions } from './types'
import express from 'express'
import net from 'net'
@ -65,21 +74,33 @@ export function mcpProxy({ transportToClient, transportToServer }: { transportTo
}
/**
* Creates and connects to a remote SSE server with OAuth authentication
* Type for the auth initialization function
*/
export type AuthInitializer = () => Promise<{
waitForAuthCode: () => Promise<string>
skipBrowserAuth: boolean
}>
/**
* Creates and connects to a remote server with OAuth authentication
* @param client The client to connect with
* @param serverUrl The URL of the remote server
* @param authProvider The OAuth client provider
* @param headers Additional headers to send with the request
* @param waitForAuthCode Function to wait for the auth code
* @param skipBrowserAuth Whether to skip browser auth and use shared auth
* @returns The connected SSE client transport
* @param authInitializer Function to initialize authentication when needed
* @param transportStrategy Strategy for selecting transport type ('sse-only', 'http-only', 'sse-first', 'http-first')
* @param recursionReasons Set of reasons for recursive calls (internal use)
* @returns The connected transport
*/
export async function connectToRemoteServer(
client: Client | null,
serverUrl: string,
authProvider: OAuthClientProvider,
headers: Record<string, string>,
waitForAuthCode: () => Promise<string>,
skipBrowserAuth: boolean = false,
): Promise<SSEClientTransport> {
authInitializer: AuthInitializer,
transportStrategy: TransportStrategy = 'http-first',
recursionReasons: Set<string> = new Set(),
): Promise<Transport> {
log(`[${pid}] Connecting to remote server: ${serverUrl}`)
const url = new URL(serverUrl)
@ -93,25 +114,88 @@ export async function connectToRemoteServer(
...(init?.headers as Record<string, string> | undefined),
...headers,
...(tokens?.access_token ? { Authorization: `Bearer ${tokens.access_token}` } : {}),
Accept: "text/event-stream",
Accept: 'text/event-stream',
} as Record<string, string>,
})
);
}),
)
},
};
}
const transport = new SSEClientTransport(url, {
authProvider,
requestInit: { headers },
eventSourceInit,
})
log(`Using transport strategy: ${transportStrategy}`)
// Determine if we should attempt to fallback on error
// Choose transport based on user strategy and recursion history
const shouldAttemptFallback = transportStrategy === 'http-first' || transportStrategy === 'sse-first'
// Create transport instance based on the strategy
const sseTransport = transportStrategy === 'sse-only' || transportStrategy === 'sse-first'
const transport = sseTransport
? new SSEClientTransport(url, {
authProvider,
requestInit: { headers },
eventSourceInit,
})
: new StreamableHTTPClientTransport(url, {
authProvider,
requestInit: { headers },
})
try {
await transport.start()
log('Connected to remote server')
if (client) {
await client.connect(transport)
} else {
await transport.start()
if (!sseTransport) {
// Extremely hacky, but we didn't actually send a request when calling transport.start() above, so we don't
// know if we're even talking to an HTTP server. But if we forced that now we'd get an error later saying that
// the client is already connected. So let's just create a one-off client to make a single request and figure
// out if we're actually talking to an HTTP server or not.
const testTransport = new StreamableHTTPClientTransport(url, { authProvider, requestInit: { headers } })
const testClient = new Client({ name: 'mcp-remote-fallback-test', version: '0.0.0' }, { capabilities: {} })
await testClient.connect(testTransport)
}
}
log(`Connected to remote server using ${transport.constructor.name}`)
return transport
} catch (error) {
if (error instanceof UnauthorizedError || (error instanceof Error && error.message.includes('Unauthorized'))) {
// Check if it's a protocol error and we should attempt fallback
if (
error instanceof Error &&
shouldAttemptFallback &&
(sseTransport
? error.message.includes('405') || error.message.includes('Method Not Allowed')
: error.message.includes('404') || error.message.includes('Not Found'))
) {
log(`Received error: ${error.message}`)
// If we've already tried falling back once, throw an error
if (recursionReasons.has(REASON_TRANSPORT_FALLBACK)) {
const errorMessage = `Already attempted transport fallback. Giving up.`
log(errorMessage)
throw new Error(errorMessage)
}
log(`Recursively reconnecting for reason: ${REASON_TRANSPORT_FALLBACK}`)
// Add to recursion reasons set
recursionReasons.add(REASON_TRANSPORT_FALLBACK)
// Recursively call connectToRemoteServer with the updated recursion tracking
return connectToRemoteServer(
client,
serverUrl,
authProvider,
headers,
authInitializer,
sseTransport ? 'http-only' : 'sse-only',
recursionReasons,
)
} else if (error instanceof UnauthorizedError || (error instanceof Error && error.message.includes('Unauthorized'))) {
log('Authentication required. Initializing auth...')
// Initialize authentication on-demand
const { waitForAuthCode, skipBrowserAuth } = await authInitializer()
if (skipBrowserAuth) {
log('Authentication required but skipping browser auth - using shared auth')
} else {
@ -125,11 +209,18 @@ export async function connectToRemoteServer(
log('Completing authorization...')
await transport.finishAuth(code)
// Create a new transport after auth
const newTransport = new SSEClientTransport(url, { authProvider, requestInit: { headers } })
await newTransport.start()
log('Connected to remote server after authentication')
return newTransport
if (recursionReasons.has(REASON_AUTH_NEEDED)) {
const errorMessage = `Already attempted reconnection for reason: ${REASON_AUTH_NEEDED}. Giving up.`
log(errorMessage)
throw new Error(errorMessage)
}
// Track this reason for recursion
recursionReasons.add(REASON_AUTH_NEEDED)
log(`Recursively reconnecting for reason: ${REASON_AUTH_NEEDED}`)
// Recursively call connectToRemoteServer with the updated recursion tracking
return connectToRemoteServer(client, serverUrl, authProvider, headers, authInitializer, transportStrategy, recursionReasons)
} catch (authError) {
log('Authorization error:', authError)
throw authError
@ -301,6 +392,19 @@ export async function parseCommandLineArgs(args: string[], defaultPort: number,
const specifiedPort = args[1] ? parseInt(args[1]) : undefined
const allowHttp = args.includes('--allow-http')
// Parse transport strategy
let transportStrategy: TransportStrategy = 'http-first' // Default
const transportIndex = args.indexOf('--transport')
if (transportIndex !== -1 && transportIndex < args.length - 1) {
const strategy = args[transportIndex + 1]
if (strategy === 'sse-only' || strategy === 'http-only' || strategy === 'sse-first' || strategy === 'http-first') {
transportStrategy = strategy as TransportStrategy
log(`Using transport strategy: ${transportStrategy}`)
} else {
log(`Warning: Ignoring invalid transport strategy: ${strategy}. Valid values are: sse-only, http-only, sse-first, http-first`)
}
}
if (!serverUrl) {
log(usage)
process.exit(1)
@ -343,7 +447,7 @@ export async function parseCommandLineArgs(args: string[], defaultPort: number,
})
}
return { serverUrl, callbackPort, headers }
return { serverUrl, callbackPort, headers, transportStrategy }
}
/**

View file

@ -11,22 +11,36 @@
import { EventEmitter } from 'events'
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'
import { connectToRemoteServer, log, mcpProxy, parseCommandLineArgs, setupSignalHandlers, getServerUrlHash } from './lib/utils'
import {
connectToRemoteServer,
log,
mcpProxy,
parseCommandLineArgs,
setupSignalHandlers,
getServerUrlHash,
MCP_REMOTE_VERSION,
TransportStrategy,
} from './lib/utils'
import { NodeOAuthClientProvider } from './lib/node-oauth-client-provider'
import { coordinateAuth } from './lib/coordination'
import { createLazyAuthCoordinator } from './lib/coordination'
/**
* Main function to run the proxy
*/
async function runProxy(serverUrl: string, callbackPort: number, headers: Record<string, string>) {
async function runProxy(
serverUrl: string,
callbackPort: number,
headers: Record<string, string>,
transportStrategy: TransportStrategy = 'http-first',
) {
// Set up event emitter for auth flow
const events = new EventEmitter()
// Get the server URL hash for lockfile operations
const serverUrlHash = getServerUrlHash(serverUrl)
// Coordinate authentication with other instances
const { server, waitForAuthCode, skipBrowserAuth } = await coordinateAuth(serverUrlHash, callbackPort, events)
// Create a lazy auth coordinator
const authCoordinator = createLazyAuthCoordinator(serverUrlHash, callbackPort, events)
// Create the OAuth client provider
const authProvider = new NodeOAuthClientProvider({
@ -35,20 +49,36 @@ async function runProxy(serverUrl: string, callbackPort: number, headers: Record
clientName: 'MCP CLI Proxy',
})
// If auth was completed by another instance, just log that we'll use the auth from disk
if (skipBrowserAuth) {
log('Authentication was completed by another instance - will use tokens from disk')
// TODO: remove, the callback is happening before the tokens are exchanged
// so we're slightly too early
await new Promise((res) => setTimeout(res, 1_000))
}
// Create the STDIO transport for local connections
const localTransport = new StdioServerTransport()
// Keep track of the server instance for cleanup
let server: any = null
// Define an auth initializer function
const authInitializer = async () => {
const authState = await authCoordinator.initializeAuth()
// Store server in outer scope for cleanup
server = authState.server
// If auth was completed by another instance, just log that we'll use the auth from disk
if (authState.skipBrowserAuth) {
log('Authentication was completed by another instance - will use tokens from disk')
// TODO: remove, the callback is happening before the tokens are exchanged
// so we're slightly too early
await new Promise((res) => setTimeout(res, 1_000))
}
return {
waitForAuthCode: authState.waitForAuthCode,
skipBrowserAuth: authState.skipBrowserAuth,
}
}
try {
// Connect to remote server with authentication
const remoteTransport = await connectToRemoteServer(serverUrl, authProvider, headers, waitForAuthCode, skipBrowserAuth)
// Connect to remote server with lazy authentication
const remoteTransport = await connectToRemoteServer(null, serverUrl, authProvider, headers, authInitializer, transportStrategy)
// Set up bidirectional proxy between local and remote transports
mcpProxy({
@ -59,14 +89,17 @@ async function runProxy(serverUrl: string, callbackPort: number, headers: Record
// Start the local STDIO server
await localTransport.start()
log('Local STDIO server running')
log('Proxy established successfully between local STDIO and remote SSE')
log(`Proxy established successfully between local STDIO and remote ${remoteTransport.constructor.name}`)
log('Press Ctrl+C to exit')
// Setup cleanup handler
const cleanup = async () => {
await remoteTransport.close()
await localTransport.close()
server.close()
// Only close the server if it was initialized
if (server) {
server.close()
}
}
setupSignalHandlers(cleanup)
} catch (error) {
@ -93,15 +126,18 @@ to the CA certificate file. If using claude_desktop_config.json, this might look
}
`)
}
server.close()
// Only close the server if it was initialized
if (server) {
server.close()
}
process.exit(1)
}
}
// Parse command-line arguments and run the proxy
parseCommandLineArgs(process.argv.slice(2), 3334, 'Usage: npx tsx proxy.ts <https://server-url> [callback-port]')
.then(({ serverUrl, callbackPort, headers }) => {
return runProxy(serverUrl, callbackPort, headers)
.then(({ serverUrl, callbackPort, headers, transportStrategy }) => {
return runProxy(serverUrl, callbackPort, headers, transportStrategy)
})
.catch((error) => {
log('Fatal error:', error)