Skip to content

Mcp

McpToolSpec #

Bases: BaseToolSpec, TypeResolutionMixin, TypeCreationMixin, FieldExtractionMixin

MCPToolSpec will get the tools from MCP Client (only need to implement ClientSession) and convert them to LlamaIndex's FunctionTool objects.

Parameters:

Name Type Description Default
client ClientSession

An MCP client instance implementing ClientSession, and it should support the following methods in ClientSession: - list_tools: List all tools. - call_tool: Call a tool. - list_resources: List all resources. - read_resource: Read a resource.

required
allowed_tools Optional[List[str]]

If set, only return tools with the specified names.

None
include_resources bool

Whether to include resources in the tool list.

False
Source code in llama_index/tools/mcp/base.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
class McpToolSpec(
    BaseToolSpec, TypeResolutionMixin, TypeCreationMixin, FieldExtractionMixin
):
    """
    MCPToolSpec will get the tools from MCP Client (only need to implement ClientSession) and convert them to LlamaIndex's FunctionTool objects.

    Args:
        client: An MCP client instance implementing ClientSession, and it should support the following methods in ClientSession:
            - list_tools: List all tools.
            - call_tool: Call a tool.
            - list_resources: List all resources.
            - read_resource: Read a resource.
        allowed_tools: If set, only return tools with the specified names.
        include_resources: Whether to include resources in the tool list.

    """

    def __init__(
        self,
        client: ClientSession,
        allowed_tools: Optional[List[str]] = None,
        include_resources: bool = False,
    ) -> None:
        self.client = client
        self.allowed_tools = allowed_tools
        self.include_resources = include_resources
        self.properties_cache = {}

    async def fetch_tools(self) -> List[Any]:
        """
        An asynchronous method to get the tools list from MCP Client. If allowed_tools is set, it will filter the tools.

        Returns:
            A list of tools, each tool object needs to contain name, description, inputSchema properties.

        """
        response = await self.client.list_tools()
        tools = response.tools if hasattr(response, "tools") else []

        if self.allowed_tools is None:
            # get all tools by default
            return tools

        if any(self.allowed_tools):
            return [tool for tool in tools if tool.name in self.allowed_tools]

        logging.warning(
            "Returning an empty tool list due to the empty `allowed_tools` list. Please ensure `allowed_tools` is set appropriately."
        )
        return []

    async def fetch_resources(self) -> List[Resource]:
        """
        An asynchronous method to get the resources list from MCP Client.
        """
        static_response = await self.client.list_resources()
        dynamic_response = await self.client.list_resource_templates()
        static_resources = (
            static_response.resources if hasattr(static_response, "resources") else []
        )
        dynamic_resources = (
            dynamic_response.resourceTemplates
            if hasattr(dynamic_response, "resourceTemplates")
            else []
        )
        resources = static_resources + dynamic_resources
        if self.allowed_tools is None:
            return resources

        if any(self.allowed_tools):
            return [
                resource
                for resource in resources
                if resource.name in self.allowed_tools
            ]

        logging.warning(
            "Returning an empty resource list due to the empty `allowed_tools` list. Please ensure `allowed_tools` is set appropriately."
        )
        return []

    def _create_tool_fn(self, tool_name: str) -> Callable:
        """
        Create a tool call function for a specified MCP tool name. The function internally wraps the call_tool call to the MCP Client.
        """

        async def async_tool_fn(**kwargs):
            return await self.client.call_tool(tool_name, kwargs)

        return async_tool_fn

    def _create_resource_fn(self, resource_uri: str) -> Callable:
        """
        Create a resource call function for a specified MCP resource name. The function internally wraps the read_resource call to the MCP Client.
        """

        async def async_resource_fn():
            return await self.client.read_resource(resource_uri)

        return async_resource_fn

    async def to_tool_list_async(self) -> List[FunctionTool]:
        """
        Asynchronous method to convert MCP tools to FunctionTool objects.

        Returns:
            A list of FunctionTool objects.

        """
        tools_list = await self.fetch_tools()
        function_tool_list: List[FunctionTool] = []
        for tool in tools_list:
            fn = self._create_tool_fn(tool.name)
            # Create a Pydantic model based on the tool inputSchema
            model_schema = self.create_model_from_json_schema(
                tool.inputSchema, model_name=f"{tool.name}_Schema"
            )
            metadata = ToolMetadata(
                name=tool.name,
                description=tool.description,
                fn_schema=model_schema,
            )
            function_tool = FunctionTool.from_defaults(
                async_fn=fn, tool_metadata=metadata
            )
            function_tool_list.append(function_tool)

        if self.include_resources:
            resources_list = await self.fetch_resources()
            for resource in resources_list:
                if hasattr(resource, "uri"):
                    uri = resource.uri
                elif hasattr(resource, "template"):
                    uri = resource.template
                fn = self._create_resource_fn(uri)
                function_tool_list.append(
                    FunctionTool.from_defaults(
                        async_fn=fn,
                        name=resource.name.replace("/", "_"),
                        description=resource.description,
                    )
                )

        return function_tool_list

    def to_tool_list(self) -> List[FunctionTool]:
        """
        Synchronous interface: Convert MCP Client tools to FunctionTool objects.
        Note: This method should not be called in an asynchronous environment, otherwise an exception will be thrown. Use to_tool_list_async instead.

        Returns:
            A list of FunctionTool objects.

        """
        return patch_sync(self.to_tool_list_async)()

    def create_model_from_json_schema(
        self,
        schema: dict[str, Any],
        model_name: str = "DynamicModel",
    ) -> type[BaseModel]:
        """
        To create a Pydantic model from the JSON Schema of MCP tools.

        Args:
            schema: A JSON Schema dictionary containing properties and required fields.
            model_name: The name of the model.

        Returns:
            A Pydantic model class.

        """
        defs = schema.get("$defs", {})

        # Process all type definitions
        for cls_name, cls_schema in defs.items():
            self.properties_cache[cls_name] = self._create_model(
                cls_schema,
                cls_name,
                defs,
            )

        return self._create_model(schema, model_name)

    def _create_model(
        self,
        schema: dict,
        model_name: str,
        defs: dict = {},
    ) -> type[BaseModel]:
        """Create a Pydantic model from a schema."""
        if model_name in self.properties_cache:
            return self.properties_cache[model_name]

        fields = self._extract_fields(schema, defs)
        model = create_model(model_name, **fields)
        self.properties_cache[model_name] = model
        return model

fetch_tools async #

fetch_tools() -> List[Any]

An asynchronous method to get the tools list from MCP Client. If allowed_tools is set, it will filter the tools.

Returns:

Type Description
List[Any]

A list of tools, each tool object needs to contain name, description, inputSchema properties.

Source code in llama_index/tools/mcp/base.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
async def fetch_tools(self) -> List[Any]:
    """
    An asynchronous method to get the tools list from MCP Client. If allowed_tools is set, it will filter the tools.

    Returns:
        A list of tools, each tool object needs to contain name, description, inputSchema properties.

    """
    response = await self.client.list_tools()
    tools = response.tools if hasattr(response, "tools") else []

    if self.allowed_tools is None:
        # get all tools by default
        return tools

    if any(self.allowed_tools):
        return [tool for tool in tools if tool.name in self.allowed_tools]

    logging.warning(
        "Returning an empty tool list due to the empty `allowed_tools` list. Please ensure `allowed_tools` is set appropriately."
    )
    return []

fetch_resources async #

fetch_resources() -> List[Resource]

An asynchronous method to get the resources list from MCP Client.

Source code in llama_index/tools/mcp/base.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
async def fetch_resources(self) -> List[Resource]:
    """
    An asynchronous method to get the resources list from MCP Client.
    """
    static_response = await self.client.list_resources()
    dynamic_response = await self.client.list_resource_templates()
    static_resources = (
        static_response.resources if hasattr(static_response, "resources") else []
    )
    dynamic_resources = (
        dynamic_response.resourceTemplates
        if hasattr(dynamic_response, "resourceTemplates")
        else []
    )
    resources = static_resources + dynamic_resources
    if self.allowed_tools is None:
        return resources

    if any(self.allowed_tools):
        return [
            resource
            for resource in resources
            if resource.name in self.allowed_tools
        ]

    logging.warning(
        "Returning an empty resource list due to the empty `allowed_tools` list. Please ensure `allowed_tools` is set appropriately."
    )
    return []

to_tool_list_async async #

to_tool_list_async() -> List[FunctionTool]

Asynchronous method to convert MCP tools to FunctionTool objects.

Returns:

Type Description
List[FunctionTool]

A list of FunctionTool objects.

Source code in llama_index/tools/mcp/base.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
async def to_tool_list_async(self) -> List[FunctionTool]:
    """
    Asynchronous method to convert MCP tools to FunctionTool objects.

    Returns:
        A list of FunctionTool objects.

    """
    tools_list = await self.fetch_tools()
    function_tool_list: List[FunctionTool] = []
    for tool in tools_list:
        fn = self._create_tool_fn(tool.name)
        # Create a Pydantic model based on the tool inputSchema
        model_schema = self.create_model_from_json_schema(
            tool.inputSchema, model_name=f"{tool.name}_Schema"
        )
        metadata = ToolMetadata(
            name=tool.name,
            description=tool.description,
            fn_schema=model_schema,
        )
        function_tool = FunctionTool.from_defaults(
            async_fn=fn, tool_metadata=metadata
        )
        function_tool_list.append(function_tool)

    if self.include_resources:
        resources_list = await self.fetch_resources()
        for resource in resources_list:
            if hasattr(resource, "uri"):
                uri = resource.uri
            elif hasattr(resource, "template"):
                uri = resource.template
            fn = self._create_resource_fn(uri)
            function_tool_list.append(
                FunctionTool.from_defaults(
                    async_fn=fn,
                    name=resource.name.replace("/", "_"),
                    description=resource.description,
                )
            )

    return function_tool_list

to_tool_list #

to_tool_list() -> List[FunctionTool]

Synchronous interface: Convert MCP Client tools to FunctionTool objects. Note: This method should not be called in an asynchronous environment, otherwise an exception will be thrown. Use to_tool_list_async instead.

Returns:

Type Description
List[FunctionTool]

A list of FunctionTool objects.

Source code in llama_index/tools/mcp/base.py
164
165
166
167
168
169
170
171
172
173
def to_tool_list(self) -> List[FunctionTool]:
    """
    Synchronous interface: Convert MCP Client tools to FunctionTool objects.
    Note: This method should not be called in an asynchronous environment, otherwise an exception will be thrown. Use to_tool_list_async instead.

    Returns:
        A list of FunctionTool objects.

    """
    return patch_sync(self.to_tool_list_async)()

create_model_from_json_schema #

create_model_from_json_schema(schema: dict[str, Any], model_name: str = 'DynamicModel') -> type[BaseModel]

To create a Pydantic model from the JSON Schema of MCP tools.

Parameters:

Name Type Description Default
schema dict[str, Any]

A JSON Schema dictionary containing properties and required fields.

required
model_name str

The name of the model.

'DynamicModel'

Returns:

Type Description
type[BaseModel]

A Pydantic model class.

Source code in llama_index/tools/mcp/base.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def create_model_from_json_schema(
    self,
    schema: dict[str, Any],
    model_name: str = "DynamicModel",
) -> type[BaseModel]:
    """
    To create a Pydantic model from the JSON Schema of MCP tools.

    Args:
        schema: A JSON Schema dictionary containing properties and required fields.
        model_name: The name of the model.

    Returns:
        A Pydantic model class.

    """
    defs = schema.get("$defs", {})

    # Process all type definitions
    for cls_name, cls_schema in defs.items():
        self.properties_cache[cls_name] = self._create_model(
            cls_schema,
            cls_name,
            defs,
        )

    return self._create_model(schema, model_name)

BasicMCPClient #

Bases: ClientSession

Basic MCP client that can be used to connect to an MCP server.

This is useful for connecting to any MCP server.

Parameters:

Name Type Description Default
command_or_url str

The command to run or the URL to connect to.

required
args Optional[List[str]]

The arguments to pass to StdioServerParameters.

None
env Optional[Dict[str, str]]

The environment variables to set for StdioServerParameters.

None
timeout int

The timeout for the command in seconds.

30
auth Optional[OAuthClientProvider]

Optional OAuth client provider for authentication.

None
sampling_callback Optional[Callable[[CreateMessageRequestParams], Awaitable[CreateMessageResult]]]

Optional callback for handling sampling messages.

None
headers Optional[Dict[str, Any]]

Optional headers to pass by sse client or streamable http client

None
tool_call_logs_callback Optional[Callable[[List[str]], Awaitable[Any]]]

Async function to store the logs deriving from an MCP tool call: logs are provided as a list of strings, representing log messages. Defaults to None.

None
Source code in llama_index/tools/mcp/client.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
class BasicMCPClient(ClientSession):
    """
    Basic MCP client that can be used to connect to an MCP server.

    This is useful for connecting to any MCP server.

    Args:
        command_or_url: The command to run or the URL to connect to.
        args: The arguments to pass to StdioServerParameters.
        env: The environment variables to set for StdioServerParameters.
        timeout: The timeout for the command in seconds.
        auth: Optional OAuth client provider for authentication.
        sampling_callback: Optional callback for handling sampling messages.
        headers: Optional headers to pass by sse client or streamable http client
        tool_call_logs_callback: Async function to store the logs deriving from an MCP tool call: logs are provided as a list of strings, representing log messages. Defaults to None.

    """

    def __init__(
        self,
        command_or_url: str,
        args: Optional[List[str]] = None,
        env: Optional[Dict[str, str]] = None,
        timeout: int = 30,
        auth: Optional[OAuthClientProvider] = None,
        sampling_callback: Optional[
            Callable[
                [types.CreateMessageRequestParams], Awaitable[types.CreateMessageResult]
            ]
        ] = None,
        headers: Optional[Dict[str, Any]] = None,
        tool_call_logs_callback: Optional[Callable[[List[str]], Awaitable[Any]]] = None,
    ):
        self.command_or_url = command_or_url
        self.args = args or []
        self.env = env or {}
        self.timeout = timeout
        self.auth = auth
        self.sampling_callback = sampling_callback
        self.headers = headers
        self.tool_call_logs_callback = tool_call_logs_callback

    @classmethod
    def with_oauth(
        cls,
        command_or_url: str,
        client_name: str,
        redirect_uris: List[str],
        redirect_handler: Callable[[str], None],
        callback_handler: Callable[[], Tuple[str, Optional[str]]],
        args: Optional[List[str]] = None,
        env: Optional[Dict[str, str]] = None,
        timeout: int = 30,
        token_storage: Optional[TokenStorage] = None,
        tool_call_logs_callback: Optional[Callable[[List[str]], Awaitable[Any]]] = None,
    ) -> "BasicMCPClient":
        """
        Create a client with OAuth authentication.

        Args:
            command_or_url: The command to run or the URL to connect to
            client_name: The name of the OAuth client
            redirect_uris: The redirect URIs for the OAuth flow
            redirect_handler: Function that handles the redirect URL
            callback_handler: Function that returns the auth code and state
            token_storage: Optional token storage for OAuth client. If not provided,
                           a default in-memory storage is used (tokens will be lost on restart).
            args: The arguments to pass to StdioServerParameters.
            env: The environment variables to set for StdioServerParameters.
            timeout: The timeout for the command in seconds.
            tool_call_logs_callback: Async function to store the logs deriving from an MCP tool call: logs are provided as a list of strings, representing log messages. Defaults to None.

        Returns:
            An authenticated MCP client

        """
        # Use default in-memory storage if none provided
        if token_storage is None:
            token_storage = DefaultInMemoryTokenStorage()
            warnings.warn(
                "Using default in-memory token storage. Tokens will be lost on restart.",
                UserWarning,
            )

        oauth_auth = OAuthClientProvider(
            server_url=command_or_url if urlparse(command_or_url).scheme else None,
            client_metadata=OAuthClientMetadata(
                client_name=client_name,
                redirect_uris=redirect_uris,
                grant_types=["authorization_code", "refresh_token"],
                response_types=["code"],
            ),
            redirect_handler=redirect_handler,
            callback_handler=callback_handler,
            storage=token_storage,
        )

        return cls(
            command_or_url,
            auth=oauth_auth,
            args=args,
            env=env,
            timeout=timeout,
            tool_call_logs_callback=tool_call_logs_callback,
        )

    @asynccontextmanager
    async def _run_session(self) -> AsyncIterator[ClientSession]:
        """Create and initialize a session with the MCP server."""
        url = urlparse(self.command_or_url)
        scheme = url.scheme

        if scheme in ("http", "https"):
            # Check if this is a streamable HTTP endpoint (default) or SSE
            if enable_sse(self.command_or_url):
                # SSE transport
                async with sse_client(
                    self.command_or_url, auth=self.auth, headers=self.headers
                ) as streams:
                    async with ClientSession(
                        *streams,
                        read_timeout_seconds=timedelta(seconds=self.timeout),
                        sampling_callback=self.sampling_callback,
                    ) as session:
                        await session.initialize()
                        yield session
            else:
                # Streamable HTTP transport (recommended)
                async with streamablehttp_client(
                    self.command_or_url, auth=self.auth, headers=self.headers
                ) as (read, write, _):
                    async with ClientSession(
                        read,
                        write,
                        read_timeout_seconds=timedelta(seconds=self.timeout),
                        sampling_callback=self.sampling_callback,
                    ) as session:
                        await session.initialize()
                        yield session
        else:
            # stdio transport
            server_parameters = StdioServerParameters(
                command=self.command_or_url, args=self.args, env=self.env
            )
            async with stdio_client(server_parameters) as streams:
                async with ClientSession(
                    *streams,
                    read_timeout_seconds=timedelta(seconds=self.timeout),
                    sampling_callback=self.sampling_callback,
                ) as session:
                    await session.initialize()
                    yield session

    def _configure_tool_call_logs_callback(self) -> io.StringIO:
        handler = io.StringIO()
        stream_handler = logging.StreamHandler(handler)

        # Configure logging to capture all events
        logging.basicConfig(
            level=logging.DEBUG,  # Capture all log levels
            format="%(asctime)s - %(name)s - %(levelname)s - %(message)s\n",
            handlers=[
                stream_handler,
            ],
        )
        # Also enable logging for specific FastMCP components
        fastmcp_logger = logging.getLogger("fastmcp")
        fastmcp_logger.setLevel(logging.DEBUG)

        # Enable HTTP transport logging to see network details
        http_logger = logging.getLogger("httpx")
        http_logger.setLevel(logging.DEBUG)

        return handler

    # Tool methods
    async def call_tool(
        self,
        tool_name: str,
        arguments: Optional[dict] = None,
        progress_callback: Optional[ProgressFnT] = None,
    ) -> types.CallToolResult:
        """Call a tool on the MCP server."""
        if self.tool_call_logs_callback is not None:
            # we use a string stream so that we can recover all logs at the end of the session
            handler = self._configure_tool_call_logs_callback()

            async with self._run_session() as session:
                result = await session.call_tool(
                    tool_name, arguments=arguments, progress_callback=progress_callback
                )

                # get all logs by dividing the string with \n, since the format of the log has an \n at the end of the log message
                extra_values = handler.getvalue().split("\n")

                # pipe the logs list into tool_call_logs_callback
                await self.tool_call_logs_callback(extra_values)

                return result
        else:
            async with self._run_session() as session:
                return await session.call_tool(
                    tool_name, arguments=arguments, progress_callback=progress_callback
                )

    async def list_tools(self) -> types.ListToolsResult:
        """List all available tools on the MCP server."""
        async with self._run_session() as session:
            return await session.list_tools()

    # Resource methods
    async def list_resources(self) -> types.ListToolsResult:
        """List all available resources on the MCP server."""
        async with self._run_session() as session:
            return await session.list_resources()

    async def list_resource_templates(self) -> types.ListToolsResult:
        """List all dynamic available resources on the MCP server."""
        async with self._run_session() as session:
            return await session.list_resource_templates()

    async def read_resource(self, resource_uri: AnyUrl) -> types.ReadResourceResult:
        """
        Read a resource from the MCP server.

        Returns:
            Tuple containing the resource content as bytes and the MIME type

        """
        async with self._run_session() as session:
            return await session.read_resource(resource_uri)

    ## ----- Prompt methods -----

    async def list_prompts(self) -> List[types.Prompt]:
        """List all available prompts on the MCP server."""
        async with self._run_session() as session:
            return await session.list_prompts()

    async def get_prompt(
        self, prompt_name: str, arguments: Optional[Dict[str, str]] = None
    ) -> List[ChatMessage]:
        """
        Get a prompt from the MCP server.

        Args:
            prompt_name: The name of the prompt to get
            arguments: Optional arguments to pass to the prompt

        Returns:
            The prompt as a list of llama-index ChatMessage objects

        """
        async with self._run_session() as session:
            prompt = await session.get_prompt(prompt_name, arguments)
            llama_messages = []
            for message in prompt.messages:
                if isinstance(message.content, types.TextContent):
                    llama_messages.append(
                        ChatMessage(
                            role=message.role,
                            blocks=[TextBlock(text=message.content.text)],
                        )
                    )
                elif isinstance(message.content, types.ImageContent):
                    llama_messages.append(
                        ChatMessage(
                            role=message.role,
                            blocks=[
                                ImageBlock(
                                    image=message.content.data,
                                    image_mimetype=message.content.mimeType,
                                )
                            ],
                        )
                    )
                elif isinstance(message.content, types.EmbeddedResource):
                    raise NotImplementedError(
                        "Embedded resources are not supported yet"
                    )
                else:
                    raise ValueError(
                        f"Unsupported content type: {type(message.content)}"
                    )

            return llama_messages

with_oauth classmethod #

with_oauth(command_or_url: str, client_name: str, redirect_uris: List[str], redirect_handler: Callable[[str], None], callback_handler: Callable[[], Tuple[str, Optional[str]]], args: Optional[List[str]] = None, env: Optional[Dict[str, str]] = None, timeout: int = 30, token_storage: Optional[TokenStorage] = None, tool_call_logs_callback: Optional[Callable[[List[str]], Awaitable[Any]]] = None) -> BasicMCPClient

Create a client with OAuth authentication.

Parameters:

Name Type Description Default
command_or_url str

The command to run or the URL to connect to

required
client_name str

The name of the OAuth client

required
redirect_uris List[str]

The redirect URIs for the OAuth flow

required
redirect_handler Callable[[str], None]

Function that handles the redirect URL

required
callback_handler Callable[[], Tuple[str, Optional[str]]]

Function that returns the auth code and state

required
token_storage Optional[TokenStorage]

Optional token storage for OAuth client. If not provided, a default in-memory storage is used (tokens will be lost on restart).

None
args Optional[List[str]]

The arguments to pass to StdioServerParameters.

None
env Optional[Dict[str, str]]

The environment variables to set for StdioServerParameters.

None
timeout int

The timeout for the command in seconds.

30
tool_call_logs_callback Optional[Callable[[List[str]], Awaitable[Any]]]

Async function to store the logs deriving from an MCP tool call: logs are provided as a list of strings, representing log messages. Defaults to None.

None

Returns:

Type Description
BasicMCPClient

An authenticated MCP client

Source code in llama_index/tools/mcp/client.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
@classmethod
def with_oauth(
    cls,
    command_or_url: str,
    client_name: str,
    redirect_uris: List[str],
    redirect_handler: Callable[[str], None],
    callback_handler: Callable[[], Tuple[str, Optional[str]]],
    args: Optional[List[str]] = None,
    env: Optional[Dict[str, str]] = None,
    timeout: int = 30,
    token_storage: Optional[TokenStorage] = None,
    tool_call_logs_callback: Optional[Callable[[List[str]], Awaitable[Any]]] = None,
) -> "BasicMCPClient":
    """
    Create a client with OAuth authentication.

    Args:
        command_or_url: The command to run or the URL to connect to
        client_name: The name of the OAuth client
        redirect_uris: The redirect URIs for the OAuth flow
        redirect_handler: Function that handles the redirect URL
        callback_handler: Function that returns the auth code and state
        token_storage: Optional token storage for OAuth client. If not provided,
                       a default in-memory storage is used (tokens will be lost on restart).
        args: The arguments to pass to StdioServerParameters.
        env: The environment variables to set for StdioServerParameters.
        timeout: The timeout for the command in seconds.
        tool_call_logs_callback: Async function to store the logs deriving from an MCP tool call: logs are provided as a list of strings, representing log messages. Defaults to None.

    Returns:
        An authenticated MCP client

    """
    # Use default in-memory storage if none provided
    if token_storage is None:
        token_storage = DefaultInMemoryTokenStorage()
        warnings.warn(
            "Using default in-memory token storage. Tokens will be lost on restart.",
            UserWarning,
        )

    oauth_auth = OAuthClientProvider(
        server_url=command_or_url if urlparse(command_or_url).scheme else None,
        client_metadata=OAuthClientMetadata(
            client_name=client_name,
            redirect_uris=redirect_uris,
            grant_types=["authorization_code", "refresh_token"],
            response_types=["code"],
        ),
        redirect_handler=redirect_handler,
        callback_handler=callback_handler,
        storage=token_storage,
    )

    return cls(
        command_or_url,
        auth=oauth_auth,
        args=args,
        env=env,
        timeout=timeout,
        tool_call_logs_callback=tool_call_logs_callback,
    )

call_tool async #

call_tool(tool_name: str, arguments: Optional[dict] = None, progress_callback: Optional[ProgressFnT] = None) -> CallToolResult

Call a tool on the MCP server.

Source code in llama_index/tools/mcp/client.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
async def call_tool(
    self,
    tool_name: str,
    arguments: Optional[dict] = None,
    progress_callback: Optional[ProgressFnT] = None,
) -> types.CallToolResult:
    """Call a tool on the MCP server."""
    if self.tool_call_logs_callback is not None:
        # we use a string stream so that we can recover all logs at the end of the session
        handler = self._configure_tool_call_logs_callback()

        async with self._run_session() as session:
            result = await session.call_tool(
                tool_name, arguments=arguments, progress_callback=progress_callback
            )

            # get all logs by dividing the string with \n, since the format of the log has an \n at the end of the log message
            extra_values = handler.getvalue().split("\n")

            # pipe the logs list into tool_call_logs_callback
            await self.tool_call_logs_callback(extra_values)

            return result
    else:
        async with self._run_session() as session:
            return await session.call_tool(
                tool_name, arguments=arguments, progress_callback=progress_callback
            )

list_tools async #

list_tools() -> ListToolsResult

List all available tools on the MCP server.

Source code in llama_index/tools/mcp/client.py
302
303
304
305
async def list_tools(self) -> types.ListToolsResult:
    """List all available tools on the MCP server."""
    async with self._run_session() as session:
        return await session.list_tools()

list_resources async #

list_resources() -> ListToolsResult

List all available resources on the MCP server.

Source code in llama_index/tools/mcp/client.py
308
309
310
311
async def list_resources(self) -> types.ListToolsResult:
    """List all available resources on the MCP server."""
    async with self._run_session() as session:
        return await session.list_resources()

list_resource_templates async #

list_resource_templates() -> ListToolsResult

List all dynamic available resources on the MCP server.

Source code in llama_index/tools/mcp/client.py
313
314
315
316
async def list_resource_templates(self) -> types.ListToolsResult:
    """List all dynamic available resources on the MCP server."""
    async with self._run_session() as session:
        return await session.list_resource_templates()

read_resource async #

read_resource(resource_uri: AnyUrl) -> ReadResourceResult

Read a resource from the MCP server.

Returns:

Type Description
ReadResourceResult

Tuple containing the resource content as bytes and the MIME type

Source code in llama_index/tools/mcp/client.py
318
319
320
321
322
323
324
325
326
327
async def read_resource(self, resource_uri: AnyUrl) -> types.ReadResourceResult:
    """
    Read a resource from the MCP server.

    Returns:
        Tuple containing the resource content as bytes and the MIME type

    """
    async with self._run_session() as session:
        return await session.read_resource(resource_uri)

list_prompts async #

list_prompts() -> List[Prompt]

List all available prompts on the MCP server.

Source code in llama_index/tools/mcp/client.py
331
332
333
334
async def list_prompts(self) -> List[types.Prompt]:
    """List all available prompts on the MCP server."""
    async with self._run_session() as session:
        return await session.list_prompts()

get_prompt async #

get_prompt(prompt_name: str, arguments: Optional[Dict[str, str]] = None) -> List[ChatMessage]

Get a prompt from the MCP server.

Parameters:

Name Type Description Default
prompt_name str

The name of the prompt to get

required
arguments Optional[Dict[str, str]]

Optional arguments to pass to the prompt

None

Returns:

Type Description
List[ChatMessage]

The prompt as a list of llama-index ChatMessage objects

Source code in llama_index/tools/mcp/client.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
async def get_prompt(
    self, prompt_name: str, arguments: Optional[Dict[str, str]] = None
) -> List[ChatMessage]:
    """
    Get a prompt from the MCP server.

    Args:
        prompt_name: The name of the prompt to get
        arguments: Optional arguments to pass to the prompt

    Returns:
        The prompt as a list of llama-index ChatMessage objects

    """
    async with self._run_session() as session:
        prompt = await session.get_prompt(prompt_name, arguments)
        llama_messages = []
        for message in prompt.messages:
            if isinstance(message.content, types.TextContent):
                llama_messages.append(
                    ChatMessage(
                        role=message.role,
                        blocks=[TextBlock(text=message.content.text)],
                    )
                )
            elif isinstance(message.content, types.ImageContent):
                llama_messages.append(
                    ChatMessage(
                        role=message.role,
                        blocks=[
                            ImageBlock(
                                image=message.content.data,
                                image_mimetype=message.content.mimeType,
                            )
                        ],
                    )
                )
            elif isinstance(message.content, types.EmbeddedResource):
                raise NotImplementedError(
                    "Embedded resources are not supported yet"
                )
            else:
                raise ValueError(
                    f"Unsupported content type: {type(message.content)}"
                )

        return llama_messages

workflow_as_mcp #

workflow_as_mcp(workflow: Workflow, workflow_name: Optional[str] = None, workflow_description: Optional[str] = None, start_event_model: Optional[BaseModel] = None, **fastmcp_init_kwargs: Any) -> FastMCP

Convert a workflow to an MCP app.

This will convert any Workflow to an MCP app. It will expose the workflow as a tool within MCP, which will

Parameters:

Name Type Description Default
workflow Workflow

The workflow to convert.

required
workflow_name optional

The name of the workflow. Defaults to the workflow class name.

None
workflow_description optional

The description of the workflow. Defaults to the workflow docstring.

None
start_event_model optional

The start event model of the workflow. Can be a BaseModel or a StartEvent class. Defaults to the workflow's custom StartEvent class.

None
**fastmcp_init_kwargs Any

Additional keyword arguments to pass to the FastMCP constructor.

{}

Returns:

Type Description
FastMCP

The MCP app object.

Source code in llama_index/tools/mcp/utils.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def workflow_as_mcp(
    workflow: Workflow,
    workflow_name: Optional[str] = None,
    workflow_description: Optional[str] = None,
    start_event_model: Optional[BaseModel] = None,
    **fastmcp_init_kwargs: Any,
) -> FastMCP:
    """
    Convert a workflow to an MCP app.

    This will convert any `Workflow` to an MCP app. It will expose the workflow as a tool
    within MCP, which will

    Args:
        workflow:
            The workflow to convert.
        workflow_name (optional):
            The name of the workflow. Defaults to the workflow class name.
        workflow_description (optional):
            The description of the workflow. Defaults to the workflow docstring.
        start_event_model (optional):
            The start event model of the workflow. Can be a `BaseModel` or a `StartEvent` class.
            Defaults to the workflow's custom `StartEvent` class.
        **fastmcp_init_kwargs:
            Additional keyword arguments to pass to the FastMCP constructor.

    Returns:
        The MCP app object.

    """
    app = FastMCP(**fastmcp_init_kwargs)

    # Dynamically get the start event class -- this is a bit of a hack
    StartEventCLS = start_event_model or workflow._start_event_class
    if StartEventCLS == StartEvent:
        raise ValueError(
            "Must declare a custom StartEvent class in your workflow or provide a start_event_model."
        )

    # Get the workflow name and description
    workflow_name = workflow_name or workflow.__class__.__name__
    workflow_description = workflow_description or workflow.__doc__

    @app.tool(name=workflow_name, description=workflow_description)
    async def _workflow_tool(run_args: StartEventCLS, context: Context) -> Any:
        # Handle edge cases where the start event is an Event or a BaseModel
        # If the workflow does not have a custom StartEvent class, then we need to handle the event differently

        if isinstance(run_args, Event) and workflow._start_event_class != StartEvent:
            handler = workflow.run(start_event=run_args)
        elif isinstance(run_args, BaseModel):
            handler = workflow.run(**run_args.model_dump())
        elif isinstance(run_args, dict):
            start_event = StartEventCLS.model_validate(run_args)
            handler = workflow.run(start_event=start_event)
        else:
            raise ValueError(f"Invalid start event type: {type(run_args)}")

        async for event in handler.stream_events():
            if not isinstance(event, StopEvent):
                await context.log("info", message=event.model_dump_json())

        return await handler

    return app

get_tools_from_mcp_url #

get_tools_from_mcp_url(command_or_url: str, client: Optional[ClientSession] = None, allowed_tools: Optional[List[str]] = None, include_resources: bool = False) -> List[FunctionTool]

Get tools from an MCP server or command.

Parameters:

Name Type Description Default
command_or_url str

The command to run or the URL to connect to.

required
client optional

The client to use to connect to the MCP server.

None
allowed_tools optional

The tool names to allow from the MCP server.

None
include_resources optional

Whether to include resources in the tool list.

False
Source code in llama_index/tools/mcp/utils.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def get_tools_from_mcp_url(
    command_or_url: str,
    client: Optional[ClientSession] = None,
    allowed_tools: Optional[List[str]] = None,
    include_resources: bool = False,
) -> List[FunctionTool]:
    """
    Get tools from an MCP server or command.

    Args:
        command_or_url: The command to run or the URL to connect to.
        client (optional): The client to use to connect to the MCP server.
        allowed_tools (optional): The tool names to allow from the MCP server.
        include_resources (optional): Whether to include resources in the tool list.

    """
    client = client or BasicMCPClient(command_or_url)
    tool_spec = McpToolSpec(
        client, allowed_tools=allowed_tools, include_resources=include_resources
    )
    return tool_spec.to_tool_list()

aget_tools_from_mcp_url async #

aget_tools_from_mcp_url(command_or_url: str, client: Optional[ClientSession] = None, allowed_tools: Optional[List[str]] = None, include_resources: bool = False) -> List[FunctionTool]

Get tools from an MCP server or command.

Parameters:

Name Type Description Default
command_or_url str

The command to run or the URL to connect to.

required
client optional

The client to use to connect to the MCP server.

None
allowed_tools optional

The tool names to allow from the MCP server.

None
include_resources optional

Whether to include resources in the tool list.

False
Source code in llama_index/tools/mcp/utils.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async def aget_tools_from_mcp_url(
    command_or_url: str,
    client: Optional[ClientSession] = None,
    allowed_tools: Optional[List[str]] = None,
    include_resources: bool = False,
) -> List[FunctionTool]:
    """
    Get tools from an MCP server or command.

    Args:
        command_or_url: The command to run or the URL to connect to.
        client (optional): The client to use to connect to the MCP server.
        allowed_tools (optional): The tool names to allow from the MCP server.
        include_resources (optional): Whether to include resources in the tool list.

    """
    client = client or BasicMCPClient(command_or_url)
    tool_spec = McpToolSpec(
        client, allowed_tools=allowed_tools, include_resources=include_resources
    )
    return await tool_spec.to_tool_list_async()

options: members: - McpToolSpec