Skip to content

API Reference

ClientSession

Bases: BaseSession[ClientRequest, ClientNotification, ClientResult, ServerRequest, ServerNotification]

Source code in src/mcp/client/session.py
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
class ClientSession( BaseSession[ types.ClientRequest, types.ClientNotification, types.ClientResult, types.ServerRequest, types.ServerNotification, ] ): def __init__( self, read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], write_stream: MemoryObjectSendStream[SessionMessage], read_timeout_seconds: timedelta | None = None, sampling_callback: SamplingFnT | None = None, elicitation_callback: ElicitationFnT | None = None, list_roots_callback: ListRootsFnT | None = None, logging_callback: LoggingFnT | None = None, message_handler: MessageHandlerFnT | None = None, client_info: types.Implementation | None = None, ) -> None: super().__init__( read_stream, write_stream, types.ServerRequest, types.ServerNotification, read_timeout_seconds=read_timeout_seconds, ) self._client_info = client_info or DEFAULT_CLIENT_INFO self._sampling_callback = sampling_callback or _default_sampling_callback self._elicitation_callback = elicitation_callback or _default_elicitation_callback self._list_roots_callback = list_roots_callback or _default_list_roots_callback self._logging_callback = logging_callback or _default_logging_callback self._message_handler = message_handler or _default_message_handler self._tool_output_schemas: dict[str, dict[str, Any] | None] = {} async def initialize(self) -> types.InitializeResult: sampling = types.SamplingCapability() if self._sampling_callback is not _default_sampling_callback else None elicitation = ( types.ElicitationCapability() if self._elicitation_callback is not _default_elicitation_callback else None ) roots = ( # TODO: Should this be based on whether we # _will_ send notifications, or only whether # they're supported? types.RootsCapability(listChanged=True) if self._list_roots_callback is not _default_list_roots_callback else None ) result = await self.send_request( types.ClientRequest( types.InitializeRequest( params=types.InitializeRequestParams( protocolVersion=types.LATEST_PROTOCOL_VERSION, capabilities=types.ClientCapabilities( sampling=sampling, elicitation=elicitation, experimental=None, roots=roots, ), clientInfo=self._client_info, ), ) ), types.InitializeResult, ) if result.protocolVersion not in SUPPORTED_PROTOCOL_VERSIONS: raise RuntimeError(f"Unsupported protocol version from the server: {result.protocolVersion}") await self.send_notification(types.ClientNotification(types.InitializedNotification())) return result async def send_ping(self) -> types.EmptyResult:  """Send a ping request.""" return await self.send_request( types.ClientRequest(types.PingRequest()), types.EmptyResult, ) async def send_progress_notification( self, progress_token: str | int, progress: float, total: float | None = None, message: str | None = None, ) -> None:  """Send a progress notification.""" await self.send_notification( types.ClientNotification( types.ProgressNotification( params=types.ProgressNotificationParams( progressToken=progress_token, progress=progress, total=total, message=message, ), ), ) ) async def set_logging_level(self, level: types.LoggingLevel) -> types.EmptyResult:  """Send a logging/setLevel request.""" return await self.send_request( types.ClientRequest( types.SetLevelRequest( params=types.SetLevelRequestParams(level=level), ) ), types.EmptyResult, ) @overload @deprecated("Use list_resources(params=PaginatedRequestParams(...)) instead") async def list_resources(self, cursor: str | None) -> types.ListResourcesResult: ... @overload async def list_resources(self, *, params: types.PaginatedRequestParams | None) -> types.ListResourcesResult: ... @overload async def list_resources(self) -> types.ListResourcesResult: ... async def list_resources( self, cursor: str | None = None, *, params: types.PaginatedRequestParams | None = None, ) -> types.ListResourcesResult:  """Send a resources/list request.  Args:  cursor: Simple cursor string for pagination (deprecated, use params instead)  params: Full pagination parameters including cursor and any future fields  """ if params is not None and cursor is not None: raise ValueError("Cannot specify both cursor and params") if params is not None: request_params = params elif cursor is not None: request_params = types.PaginatedRequestParams(cursor=cursor) else: request_params = None return await self.send_request( types.ClientRequest(types.ListResourcesRequest(params=request_params)), types.ListResourcesResult, ) @overload @deprecated("Use list_resource_templates(params=PaginatedRequestParams(...)) instead") async def list_resource_templates(self, cursor: str | None) -> types.ListResourceTemplatesResult: ... @overload async def list_resource_templates( self, *, params: types.PaginatedRequestParams | None ) -> types.ListResourceTemplatesResult: ... @overload async def list_resource_templates(self) -> types.ListResourceTemplatesResult: ... async def list_resource_templates( self, cursor: str | None = None, *, params: types.PaginatedRequestParams | None = None, ) -> types.ListResourceTemplatesResult:  """Send a resources/templates/list request.  Args:  cursor: Simple cursor string for pagination (deprecated, use params instead)  params: Full pagination parameters including cursor and any future fields  """ if params is not None and cursor is not None: raise ValueError("Cannot specify both cursor and params") if params is not None: request_params = params elif cursor is not None: request_params = types.PaginatedRequestParams(cursor=cursor) else: request_params = None return await self.send_request( types.ClientRequest(types.ListResourceTemplatesRequest(params=request_params)), types.ListResourceTemplatesResult, ) async def read_resource(self, uri: AnyUrl) -> types.ReadResourceResult:  """Send a resources/read request.""" return await self.send_request( types.ClientRequest( types.ReadResourceRequest( params=types.ReadResourceRequestParams(uri=uri), ) ), types.ReadResourceResult, ) async def subscribe_resource(self, uri: AnyUrl) -> types.EmptyResult:  """Send a resources/subscribe request.""" return await self.send_request( types.ClientRequest( types.SubscribeRequest( params=types.SubscribeRequestParams(uri=uri), ) ), types.EmptyResult, ) async def unsubscribe_resource(self, uri: AnyUrl) -> types.EmptyResult:  """Send a resources/unsubscribe request.""" return await self.send_request( types.ClientRequest( types.UnsubscribeRequest( params=types.UnsubscribeRequestParams(uri=uri), ) ), types.EmptyResult, ) async def call_tool( self, name: str, arguments: dict[str, Any] | None = None, read_timeout_seconds: timedelta | None = None, progress_callback: ProgressFnT | None = None, *, meta: dict[str, Any] | None = None, ) -> types.CallToolResult:  """Send a tools/call request with optional progress callback support.""" _meta: types.RequestParams.Meta | None = None if meta is not None: _meta = types.RequestParams.Meta(**meta) result = await self.send_request( types.ClientRequest( types.CallToolRequest( params=types.CallToolRequestParams(name=name, arguments=arguments, _meta=_meta), ) ), types.CallToolResult, request_read_timeout_seconds=read_timeout_seconds, progress_callback=progress_callback, ) if not result.isError: await self._validate_tool_result(name, result) return result async def _validate_tool_result(self, name: str, result: types.CallToolResult) -> None:  """Validate the structured content of a tool result against its output schema.""" if name not in self._tool_output_schemas: # refresh output schema cache await self.list_tools() output_schema = None if name in self._tool_output_schemas: output_schema = self._tool_output_schemas.get(name) else: logger.warning(f"Tool {name} not listed by server, cannot validate any structured content") if output_schema is not None: if result.structuredContent is None: raise RuntimeError(f"Tool {name} has an output schema but did not return structured content") try: validate(result.structuredContent, output_schema) except ValidationError as e: raise RuntimeError(f"Invalid structured content returned by tool {name}: {e}") except SchemaError as e: raise RuntimeError(f"Invalid schema for tool {name}: {e}") @overload @deprecated("Use list_prompts(params=PaginatedRequestParams(...)) instead") async def list_prompts(self, cursor: str | None) -> types.ListPromptsResult: ... @overload async def list_prompts(self, *, params: types.PaginatedRequestParams | None) -> types.ListPromptsResult: ... @overload async def list_prompts(self) -> types.ListPromptsResult: ... async def list_prompts( self, cursor: str | None = None, *, params: types.PaginatedRequestParams | None = None, ) -> types.ListPromptsResult:  """Send a prompts/list request.  Args:  cursor: Simple cursor string for pagination (deprecated, use params instead)  params: Full pagination parameters including cursor and any future fields  """ if params is not None and cursor is not None: raise ValueError("Cannot specify both cursor and params") if params is not None: request_params = params elif cursor is not None: request_params = types.PaginatedRequestParams(cursor=cursor) else: request_params = None return await self.send_request( types.ClientRequest(types.ListPromptsRequest(params=request_params)), types.ListPromptsResult, ) async def get_prompt(self, name: str, arguments: dict[str, str] | None = None) -> types.GetPromptResult:  """Send a prompts/get request.""" return await self.send_request( types.ClientRequest( types.GetPromptRequest( params=types.GetPromptRequestParams(name=name, arguments=arguments), ) ), types.GetPromptResult, ) async def complete( self, ref: types.ResourceTemplateReference | types.PromptReference, argument: dict[str, str], context_arguments: dict[str, str] | None = None, ) -> types.CompleteResult:  """Send a completion/complete request.""" context = None if context_arguments is not None: context = types.CompletionContext(arguments=context_arguments) return await self.send_request( types.ClientRequest( types.CompleteRequest( params=types.CompleteRequestParams( ref=ref, argument=types.CompletionArgument(**argument), context=context, ), ) ), types.CompleteResult, ) @overload @deprecated("Use list_tools(params=PaginatedRequestParams(...)) instead") async def list_tools(self, cursor: str | None) -> types.ListToolsResult: ... @overload async def list_tools(self, *, params: types.PaginatedRequestParams | None) -> types.ListToolsResult: ... @overload async def list_tools(self) -> types.ListToolsResult: ... async def list_tools( self, cursor: str | None = None, *, params: types.PaginatedRequestParams | None = None, ) -> types.ListToolsResult:  """Send a tools/list request.  Args:  cursor: Simple cursor string for pagination (deprecated, use params instead)  params: Full pagination parameters including cursor and any future fields  """ if params is not None and cursor is not None: raise ValueError("Cannot specify both cursor and params") if params is not None: request_params = params elif cursor is not None: request_params = types.PaginatedRequestParams(cursor=cursor) else: request_params = None result = await self.send_request( types.ClientRequest(types.ListToolsRequest(params=request_params)), types.ListToolsResult, ) # Cache tool output schemas for future validation # Note: don't clear the cache, as we may be using a cursor for tool in result.tools: self._tool_output_schemas[tool.name] = tool.outputSchema return result async def send_roots_list_changed(self) -> None:  """Send a roots/list_changed notification.""" await self.send_notification(types.ClientNotification(types.RootsListChangedNotification())) async def _received_request(self, responder: RequestResponder[types.ServerRequest, types.ClientResult]) -> None: ctx = RequestContext[ClientSession, Any]( request_id=responder.request_id, meta=responder.request_meta, session=self, lifespan_context=None, ) match responder.request.root: case types.CreateMessageRequest(params=params): with responder: response = await self._sampling_callback(ctx, params) client_response = ClientResponse.validate_python(response) await responder.respond(client_response) case types.ElicitRequest(params=params): with responder: response = await self._elicitation_callback(ctx, params) client_response = ClientResponse.validate_python(response) await responder.respond(client_response) case types.ListRootsRequest(): with responder: response = await self._list_roots_callback(ctx) client_response = ClientResponse.validate_python(response) await responder.respond(client_response) case types.PingRequest(): with responder: return await responder.respond(types.ClientResult(root=types.EmptyResult())) async def _handle_incoming( self, req: RequestResponder[types.ServerRequest, types.ClientResult] | types.ServerNotification | Exception, ) -> None:  """Handle incoming messages by forwarding to the message handler.""" await self._message_handler(req) async def _received_notification(self, notification: types.ServerNotification) -> None:  """Handle notifications from the server.""" # Process specific notification types match notification.root: case types.LoggingMessageNotification(params=params): await self._logging_callback(params) case _: pass 

send_ping async

send_ping() -> EmptyResult 

Send a ping request.

Source code in src/mcp/client/session.py
177 178 179 180 181 182
async def send_ping(self) -> types.EmptyResult:  """Send a ping request.""" return await self.send_request( types.ClientRequest(types.PingRequest()), types.EmptyResult, ) 

send_progress_notification async

send_progress_notification( progress_token: str | int, progress: float, total: float | None = None, message: str | None = None, ) -> None 

Send a progress notification.

Source code in src/mcp/client/session.py
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
async def send_progress_notification( self, progress_token: str | int, progress: float, total: float | None = None, message: str | None = None, ) -> None:  """Send a progress notification.""" await self.send_notification( types.ClientNotification( types.ProgressNotification( params=types.ProgressNotificationParams( progressToken=progress_token, progress=progress, total=total, message=message, ), ), ) ) 

set_logging_level async

set_logging_level(level: LoggingLevel) -> EmptyResult 

Send a logging/setLevel request.

Source code in src/mcp/client/session.py
205 206 207 208 209 210 211 212 213 214
async def set_logging_level(self, level: types.LoggingLevel) -> types.EmptyResult:  """Send a logging/setLevel request.""" return await self.send_request( types.ClientRequest( types.SetLevelRequest( params=types.SetLevelRequestParams(level=level), ) ), types.EmptyResult, ) 

list_resources async

list_resources(cursor: str | None) -> ListResourcesResult 
list_resources( *, params: PaginatedRequestParams | None ) -> ListResourcesResult 
list_resources() -> ListResourcesResult 
list_resources( cursor: str | None = None, *, params: PaginatedRequestParams | None = None ) -> ListResourcesResult 

Send a resources/list request.

Parameters:

Name Type Description Default
cursor str | None

Simple cursor string for pagination (deprecated, use params instead)

None
params PaginatedRequestParams | None

Full pagination parameters including cursor and any future fields

None
Source code in src/mcp/client/session.py
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
async def list_resources( self, cursor: str | None = None, *, params: types.PaginatedRequestParams | None = None, ) -> types.ListResourcesResult:  """Send a resources/list request.  Args:  cursor: Simple cursor string for pagination (deprecated, use params instead)  params: Full pagination parameters including cursor and any future fields  """ if params is not None and cursor is not None: raise ValueError("Cannot specify both cursor and params") if params is not None: request_params = params elif cursor is not None: request_params = types.PaginatedRequestParams(cursor=cursor) else: request_params = None return await self.send_request( types.ClientRequest(types.ListResourcesRequest(params=request_params)), types.ListResourcesResult, ) 

list_resource_templates async

list_resource_templates( cursor: str | None, ) -> ListResourceTemplatesResult 
list_resource_templates( *, params: PaginatedRequestParams | None ) -> ListResourceTemplatesResult 
list_resource_templates() -> ListResourceTemplatesResult 
list_resource_templates( cursor: str | None = None, *, params: PaginatedRequestParams | None = None ) -> ListResourceTemplatesResult 

Send a resources/templates/list request.

Parameters:

Name Type Description Default
cursor str | None

Simple cursor string for pagination (deprecated, use params instead)

None
params PaginatedRequestParams | None

Full pagination parameters including cursor and any future fields

None
Source code in src/mcp/client/session.py
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
async def list_resource_templates( self, cursor: str | None = None, *, params: types.PaginatedRequestParams | None = None, ) -> types.ListResourceTemplatesResult:  """Send a resources/templates/list request.  Args:  cursor: Simple cursor string for pagination (deprecated, use params instead)  params: Full pagination parameters including cursor and any future fields  """ if params is not None and cursor is not None: raise ValueError("Cannot specify both cursor and params") if params is not None: request_params = params elif cursor is not None: request_params = types.PaginatedRequestParams(cursor=cursor) else: request_params = None return await self.send_request( types.ClientRequest(types.ListResourceTemplatesRequest(params=request_params)), types.ListResourceTemplatesResult, ) 

read_resource async

read_resource(uri: AnyUrl) -> ReadResourceResult 

Send a resources/read request.

Source code in src/mcp/client/session.py
292 293 294 295 296 297 298 299 300 301
async def read_resource(self, uri: AnyUrl) -> types.ReadResourceResult:  """Send a resources/read request.""" return await self.send_request( types.ClientRequest( types.ReadResourceRequest( params=types.ReadResourceRequestParams(uri=uri), ) ), types.ReadResourceResult, ) 

subscribe_resource async

subscribe_resource(uri: AnyUrl) -> EmptyResult 

Send a resources/subscribe request.

Source code in src/mcp/client/session.py
303 304 305 306 307 308 309 310 311 312
async def subscribe_resource(self, uri: AnyUrl) -> types.EmptyResult:  """Send a resources/subscribe request.""" return await self.send_request( types.ClientRequest( types.SubscribeRequest( params=types.SubscribeRequestParams(uri=uri), ) ), types.EmptyResult, ) 

unsubscribe_resource async

unsubscribe_resource(uri: AnyUrl) -> EmptyResult 

Send a resources/unsubscribe request.

Source code in src/mcp/client/session.py
314 315 316 317 318 319 320 321 322 323
async def unsubscribe_resource(self, uri: AnyUrl) -> types.EmptyResult:  """Send a resources/unsubscribe request.""" return await self.send_request( types.ClientRequest( types.UnsubscribeRequest( params=types.UnsubscribeRequestParams(uri=uri), ) ), types.EmptyResult, ) 

call_tool async

call_tool( name: str, arguments: dict[str, Any] | None = None, read_timeout_seconds: timedelta | None = None, progress_callback: ProgressFnT | None = None, *, meta: dict[str, Any] | None = None ) -> CallToolResult 

Send a tools/call request with optional progress callback support.

Source code in src/mcp/client/session.py
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
async def call_tool( self, name: str, arguments: dict[str, Any] | None = None, read_timeout_seconds: timedelta | None = None, progress_callback: ProgressFnT | None = None, *, meta: dict[str, Any] | None = None, ) -> types.CallToolResult:  """Send a tools/call request with optional progress callback support.""" _meta: types.RequestParams.Meta | None = None if meta is not None: _meta = types.RequestParams.Meta(**meta) result = await self.send_request( types.ClientRequest( types.CallToolRequest( params=types.CallToolRequestParams(name=name, arguments=arguments, _meta=_meta), ) ), types.CallToolResult, request_read_timeout_seconds=read_timeout_seconds, progress_callback=progress_callback, ) if not result.isError: await self._validate_tool_result(name, result) return result 

list_prompts async

list_prompts(cursor: str | None) -> ListPromptsResult 
list_prompts( *, params: PaginatedRequestParams | None ) -> ListPromptsResult 
list_prompts() -> ListPromptsResult 
list_prompts( cursor: str | None = None, *, params: PaginatedRequestParams | None = None ) -> ListPromptsResult 

Send a prompts/list request.

Parameters:

Name Type Description Default
cursor str | None

Simple cursor string for pagination (deprecated, use params instead)

None
params PaginatedRequestParams | None

Full pagination parameters including cursor and any future fields

None
Source code in src/mcp/client/session.py
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
async def list_prompts( self, cursor: str | None = None, *, params: types.PaginatedRequestParams | None = None, ) -> types.ListPromptsResult:  """Send a prompts/list request.  Args:  cursor: Simple cursor string for pagination (deprecated, use params instead)  params: Full pagination parameters including cursor and any future fields  """ if params is not None and cursor is not None: raise ValueError("Cannot specify both cursor and params") if params is not None: request_params = params elif cursor is not None: request_params = types.PaginatedRequestParams(cursor=cursor) else: request_params = None return await self.send_request( types.ClientRequest(types.ListPromptsRequest(params=request_params)), types.ListPromptsResult, ) 

get_prompt async

get_prompt( name: str, arguments: dict[str, str] | None = None ) -> GetPromptResult 

Send a prompts/get request.

Source code in src/mcp/client/session.py
415 416 417 418 419 420 421 422 423 424
async def get_prompt(self, name: str, arguments: dict[str, str] | None = None) -> types.GetPromptResult:  """Send a prompts/get request.""" return await self.send_request( types.ClientRequest( types.GetPromptRequest( params=types.GetPromptRequestParams(name=name, arguments=arguments), ) ), types.GetPromptResult, ) 

complete async

complete( ref: ResourceTemplateReference | PromptReference, argument: dict[str, str], context_arguments: dict[str, str] | None = None, ) -> CompleteResult 

Send a completion/complete request.

Source code in src/mcp/client/session.py
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448
async def complete( self, ref: types.ResourceTemplateReference | types.PromptReference, argument: dict[str, str], context_arguments: dict[str, str] | None = None, ) -> types.CompleteResult:  """Send a completion/complete request.""" context = None if context_arguments is not None: context = types.CompletionContext(arguments=context_arguments) return await self.send_request( types.ClientRequest( types.CompleteRequest( params=types.CompleteRequestParams( ref=ref, argument=types.CompletionArgument(**argument), context=context, ), ) ), types.CompleteResult, ) 

list_tools async

list_tools(cursor: str | None) -> ListToolsResult 
list_tools( *, params: PaginatedRequestParams | None ) -> ListToolsResult 
list_tools() -> ListToolsResult 
list_tools( cursor: str | None = None, *, params: PaginatedRequestParams | None = None ) -> ListToolsResult 

Send a tools/list request.

Parameters:

Name Type Description Default
cursor str | None

Simple cursor string for pagination (deprecated, use params instead)

None
params PaginatedRequestParams | None

Full pagination parameters including cursor and any future fields

None
Source code in src/mcp/client/session.py
460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
async def list_tools( self, cursor: str | None = None, *, params: types.PaginatedRequestParams | None = None, ) -> types.ListToolsResult:  """Send a tools/list request.  Args:  cursor: Simple cursor string for pagination (deprecated, use params instead)  params: Full pagination parameters including cursor and any future fields  """ if params is not None and cursor is not None: raise ValueError("Cannot specify both cursor and params") if params is not None: request_params = params elif cursor is not None: request_params = types.PaginatedRequestParams(cursor=cursor) else: request_params = None result = await self.send_request( types.ClientRequest(types.ListToolsRequest(params=request_params)), types.ListToolsResult, ) # Cache tool output schemas for future validation # Note: don't clear the cache, as we may be using a cursor for tool in result.tools: self._tool_output_schemas[tool.name] = tool.outputSchema return result 

send_roots_list_changed async

send_roots_list_changed() -> None 

Send a roots/list_changed notification.

Source code in src/mcp/client/session.py
494 495 496
async def send_roots_list_changed(self) -> None:  """Send a roots/list_changed notification.""" await self.send_notification(types.ClientNotification(types.RootsListChangedNotification())) 

ClientSessionGroup

Client for managing connections to multiple MCP servers.

This class is responsible for encapsulating management of server connections. It aggregates tools, resources, and prompts from all connected servers.

For auxiliary handlers, such as resource subscription, this is delegated to the client and can be accessed via the session.

Example Usage

name_fn = lambda name, server_info: f"{(server_info.name)}_{name}" async with ClientSessionGroup(component_name_hook=name_fn) as group: for server_params in server_params: await group.connect_to_server(server_param) ...

Source code in src/mcp/client/session_group.py
 68  69  70  71  72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89  90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
class ClientSessionGroup:  """Client for managing connections to multiple MCP servers.  This class is responsible for encapsulating management of server connections.  It aggregates tools, resources, and prompts from all connected servers.  For auxiliary handlers, such as resource subscription, this is delegated to  the client and can be accessed via the session.  Example Usage:  name_fn = lambda name, server_info: f"{(server_info.name)}_{name}"  async with ClientSessionGroup(component_name_hook=name_fn) as group:  for server_params in server_params:  await group.connect_to_server(server_param)  ...  """ class _ComponentNames(BaseModel):  """Used for reverse index to find components.""" prompts: set[str] = set() resources: set[str] = set() tools: set[str] = set() # Standard MCP components. _prompts: dict[str, types.Prompt] _resources: dict[str, types.Resource] _tools: dict[str, types.Tool] # Client-server connection management. _sessions: dict[mcp.ClientSession, _ComponentNames] _tool_to_session: dict[str, mcp.ClientSession] _exit_stack: contextlib.AsyncExitStack _session_exit_stacks: dict[mcp.ClientSession, contextlib.AsyncExitStack] # Optional fn consuming (component_name, serverInfo) for custom names. # This is provide a means to mitigate naming conflicts across servers. # Example: (tool_name, serverInfo) => "{result.serverInfo.name}.{tool_name}" _ComponentNameHook: TypeAlias = Callable[[str, types.Implementation], str] _component_name_hook: _ComponentNameHook | None def __init__( self, exit_stack: contextlib.AsyncExitStack | None = None, component_name_hook: _ComponentNameHook | None = None, ) -> None:  """Initializes the MCP client.""" self._tools = {} self._resources = {} self._prompts = {} self._sessions = {} self._tool_to_session = {} if exit_stack is None: self._exit_stack = contextlib.AsyncExitStack() self._owns_exit_stack = True else: self._exit_stack = exit_stack self._owns_exit_stack = False self._session_exit_stacks = {} self._component_name_hook = component_name_hook async def __aenter__(self) -> Self: # Enter the exit stack only if we created it ourselves if self._owns_exit_stack: await self._exit_stack.__aenter__() return self async def __aexit__( self, _exc_type: type[BaseException] | None, _exc_val: BaseException | None, _exc_tb: TracebackType | None, ) -> bool | None:  """Closes session exit stacks and main exit stack upon completion.""" # Only close the main exit stack if we created it if self._owns_exit_stack: await self._exit_stack.aclose() # Concurrently close session stacks. async with anyio.create_task_group() as tg: for exit_stack in self._session_exit_stacks.values(): tg.start_soon(exit_stack.aclose) @property def sessions(self) -> list[mcp.ClientSession]:  """Returns the list of sessions being managed.""" return list(self._sessions.keys()) @property def prompts(self) -> dict[str, types.Prompt]:  """Returns the prompts as a dictionary of names to prompts.""" return self._prompts @property def resources(self) -> dict[str, types.Resource]:  """Returns the resources as a dictionary of names to resources.""" return self._resources @property def tools(self) -> dict[str, types.Tool]:  """Returns the tools as a dictionary of names to tools.""" return self._tools async def call_tool(self, name: str, args: dict[str, Any]) -> types.CallToolResult:  """Executes a tool given its name and arguments.""" session = self._tool_to_session[name] session_tool_name = self.tools[name].name return await session.call_tool(session_tool_name, args) async def disconnect_from_server(self, session: mcp.ClientSession) -> None:  """Disconnects from a single MCP server.""" session_known_for_components = session in self._sessions session_known_for_stack = session in self._session_exit_stacks if not session_known_for_components and not session_known_for_stack: raise McpError( types.ErrorData( code=types.INVALID_PARAMS, message="Provided session is not managed or already disconnected.", ) ) if session_known_for_components: component_names = self._sessions.pop(session) # Pop from _sessions tracking # Remove prompts associated with the session. for name in component_names.prompts: if name in self._prompts: del self._prompts[name] # Remove resources associated with the session. for name in component_names.resources: if name in self._resources: del self._resources[name] # Remove tools associated with the session. for name in component_names.tools: if name in self._tools: del self._tools[name] if name in self._tool_to_session: del self._tool_to_session[name] # Clean up the session's resources via its dedicated exit stack if session_known_for_stack: session_stack_to_close = self._session_exit_stacks.pop(session) await session_stack_to_close.aclose() async def connect_with_session( self, server_info: types.Implementation, session: mcp.ClientSession ) -> mcp.ClientSession:  """Connects to a single MCP server.""" await self._aggregate_components(server_info, session) return session async def connect_to_server( self, server_params: ServerParameters, ) -> mcp.ClientSession:  """Connects to a single MCP server.""" server_info, session = await self._establish_session(server_params) return await self.connect_with_session(server_info, session) async def _establish_session( self, server_params: ServerParameters ) -> tuple[types.Implementation, mcp.ClientSession]:  """Establish a client session to an MCP server.""" session_stack = contextlib.AsyncExitStack() try: # Create read and write streams that facilitate io with the server. if isinstance(server_params, StdioServerParameters): client = mcp.stdio_client(server_params) read, write = await session_stack.enter_async_context(client) elif isinstance(server_params, SseServerParameters): client = sse_client( url=server_params.url, headers=server_params.headers, timeout=server_params.timeout, sse_read_timeout=server_params.sse_read_timeout, ) read, write = await session_stack.enter_async_context(client) else: client = streamablehttp_client( url=server_params.url, headers=server_params.headers, timeout=server_params.timeout, sse_read_timeout=server_params.sse_read_timeout, terminate_on_close=server_params.terminate_on_close, ) read, write, _ = await session_stack.enter_async_context(client) session = await session_stack.enter_async_context(mcp.ClientSession(read, write)) result = await session.initialize() # Session successfully initialized. # Store its stack and register the stack with the main group stack. self._session_exit_stacks[session] = session_stack # session_stack itself becomes a resource managed by the # main _exit_stack. await self._exit_stack.enter_async_context(session_stack) return result.serverInfo, session except Exception: # If anything during this setup fails, ensure the session-specific # stack is closed. await session_stack.aclose() raise async def _aggregate_components(self, server_info: types.Implementation, session: mcp.ClientSession) -> None:  """Aggregates prompts, resources, and tools from a given session.""" # Create a reverse index so we can find all prompts, resources, and # tools belonging to this session. Used for removing components from # the session group via self.disconnect_from_server. component_names = self._ComponentNames() # Temporary components dicts. We do not want to modify the aggregate # lists in case of an intermediate failure. prompts_temp: dict[str, types.Prompt] = {} resources_temp: dict[str, types.Resource] = {} tools_temp: dict[str, types.Tool] = {} tool_to_session_temp: dict[str, mcp.ClientSession] = {} # Query the server for its prompts and aggregate to list. try: prompts = (await session.list_prompts()).prompts for prompt in prompts: name = self._component_name(prompt.name, server_info) prompts_temp[name] = prompt component_names.prompts.add(name) except McpError as err: logging.warning(f"Could not fetch prompts: {err}") # Query the server for its resources and aggregate to list. try: resources = (await session.list_resources()).resources for resource in resources: name = self._component_name(resource.name, server_info) resources_temp[name] = resource component_names.resources.add(name) except McpError as err: logging.warning(f"Could not fetch resources: {err}") # Query the server for its tools and aggregate to list. try: tools = (await session.list_tools()).tools for tool in tools: name = self._component_name(tool.name, server_info) tools_temp[name] = tool tool_to_session_temp[name] = session component_names.tools.add(name) except McpError as err: logging.warning(f"Could not fetch tools: {err}") # Clean up exit stack for session if we couldn't retrieve anything # from the server. if not any((prompts_temp, resources_temp, tools_temp)): del self._session_exit_stacks[session] # Check for duplicates. matching_prompts = prompts_temp.keys() & self._prompts.keys() if matching_prompts: raise McpError( types.ErrorData( code=types.INVALID_PARAMS, message=f"{matching_prompts} already exist in group prompts.", ) ) matching_resources = resources_temp.keys() & self._resources.keys() if matching_resources: raise McpError( types.ErrorData( code=types.INVALID_PARAMS, message=f"{matching_resources} already exist in group resources.", ) ) matching_tools = tools_temp.keys() & self._tools.keys() if matching_tools: raise McpError( types.ErrorData( code=types.INVALID_PARAMS, message=f"{matching_tools} already exist in group tools.", ) ) # Aggregate components. self._sessions[session] = component_names self._prompts.update(prompts_temp) self._resources.update(resources_temp) self._tools.update(tools_temp) self._tool_to_session.update(tool_to_session_temp) def _component_name(self, name: str, server_info: types.Implementation) -> str: if self._component_name_hook: return self._component_name_hook(name, server_info) return name 

__init__

__init__( exit_stack: AsyncExitStack | None = None, component_name_hook: _ComponentNameHook | None = None, ) -> None 

Initializes the MCP client.

Source code in src/mcp/client/session_group.py
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
def __init__( self, exit_stack: contextlib.AsyncExitStack | None = None, component_name_hook: _ComponentNameHook | None = None, ) -> None:  """Initializes the MCP client.""" self._tools = {} self._resources = {} self._prompts = {} self._sessions = {} self._tool_to_session = {} if exit_stack is None: self._exit_stack = contextlib.AsyncExitStack() self._owns_exit_stack = True else: self._exit_stack = exit_stack self._owns_exit_stack = False self._session_exit_stacks = {} self._component_name_hook = component_name_hook 

__aexit__ async

__aexit__( _exc_type: type[BaseException] | None, _exc_val: BaseException | None, _exc_tb: TracebackType | None, ) -> bool | None 

Closes session exit stacks and main exit stack upon completion.

Source code in src/mcp/client/session_group.py
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
async def __aexit__( self, _exc_type: type[BaseException] | None, _exc_val: BaseException | None, _exc_tb: TracebackType | None, ) -> bool | None:  """Closes session exit stacks and main exit stack upon completion.""" # Only close the main exit stack if we created it if self._owns_exit_stack: await self._exit_stack.aclose() # Concurrently close session stacks. async with anyio.create_task_group() as tg: for exit_stack in self._session_exit_stacks.values(): tg.start_soon(exit_stack.aclose) 

sessions property

sessions: list[ClientSession] 

Returns the list of sessions being managed.

prompts property

prompts: dict[str, Prompt] 

Returns the prompts as a dictionary of names to prompts.

resources property

resources: dict[str, Resource] 

Returns the resources as a dictionary of names to resources.

tools property

tools: dict[str, Tool] 

Returns the tools as a dictionary of names to tools.

call_tool async

call_tool( name: str, args: dict[str, Any] ) -> CallToolResult 

Executes a tool given its name and arguments.

Source code in src/mcp/client/session_group.py
175 176 177 178 179
async def call_tool(self, name: str, args: dict[str, Any]) -> types.CallToolResult:  """Executes a tool given its name and arguments.""" session = self._tool_to_session[name] session_tool_name = self.tools[name].name return await session.call_tool(session_tool_name, args) 

disconnect_from_server async

disconnect_from_server(session: ClientSession) -> None 

Disconnects from a single MCP server.

Source code in src/mcp/client/session_group.py
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
async def disconnect_from_server(self, session: mcp.ClientSession) -> None:  """Disconnects from a single MCP server.""" session_known_for_components = session in self._sessions session_known_for_stack = session in self._session_exit_stacks if not session_known_for_components and not session_known_for_stack: raise McpError( types.ErrorData( code=types.INVALID_PARAMS, message="Provided session is not managed or already disconnected.", ) ) if session_known_for_components: component_names = self._sessions.pop(session) # Pop from _sessions tracking # Remove prompts associated with the session. for name in component_names.prompts: if name in self._prompts: del self._prompts[name] # Remove resources associated with the session. for name in component_names.resources: if name in self._resources: del self._resources[name] # Remove tools associated with the session. for name in component_names.tools: if name in self._tools: del self._tools[name] if name in self._tool_to_session: del self._tool_to_session[name] # Clean up the session's resources via its dedicated exit stack if session_known_for_stack: session_stack_to_close = self._session_exit_stacks.pop(session) await session_stack_to_close.aclose() 

connect_with_session async

connect_with_session( server_info: Implementation, session: ClientSession ) -> ClientSession 

Connects to a single MCP server.

Source code in src/mcp/client/session_group.py
218 219 220 221 222 223
async def connect_with_session( self, server_info: types.Implementation, session: mcp.ClientSession ) -> mcp.ClientSession:  """Connects to a single MCP server.""" await self._aggregate_components(server_info, session) return session 

connect_to_server async

connect_to_server( server_params: ServerParameters, ) -> ClientSession 

Connects to a single MCP server.

Source code in src/mcp/client/session_group.py
225 226 227 228 229 230 231
async def connect_to_server( self, server_params: ServerParameters, ) -> mcp.ClientSession:  """Connects to a single MCP server.""" server_info, session = await self._establish_session(server_params) return await self.connect_with_session(server_info, session) 

StdioServerParameters

Bases: BaseModel

Source code in src/mcp/client/stdio/__init__.py
 72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89  90  91  92  93  94  95  96  97  98  99 100 101 102
class StdioServerParameters(BaseModel): command: str  """The executable to run to start the server.""" args: list[str] = Field(default_factory=list)  """Command line arguments to pass to the executable.""" env: dict[str, str] | None = None  """  The environment to use when spawning the process.  If not specified, the result of get_default_environment() will be used.  """ cwd: str | Path | None = None  """The working directory to use when spawning the process.""" encoding: str = "utf-8"  """  The text encoding used when sending/receiving messages to the server  defaults to utf-8  """ encoding_error_handler: Literal["strict", "ignore", "replace"] = "strict"  """  The text encoding error handler.  See https://docs.python.org/3/library/codecs.html#codec-base-classes for  explanations of possible values  """ 

command instance-attribute

command: str 

The executable to run to start the server.

args class-attribute instance-attribute

args: list[str] = Field(default_factory=list) 

Command line arguments to pass to the executable.

env class-attribute instance-attribute

env: dict[str, str] | None = None 

The environment to use when spawning the process.

If not specified, the result of get_default_environment() will be used.

cwd class-attribute instance-attribute

cwd: str | Path | None = None 

The working directory to use when spawning the process.

encoding class-attribute instance-attribute

encoding: str = 'utf-8' 

The text encoding used when sending/receiving messages to the server

defaults to utf-8

encoding_error_handler class-attribute instance-attribute

encoding_error_handler: Literal[ "strict", "ignore", "replace" ] = "strict" 

The text encoding error handler.

See https://docs.python.org/3/library/codecs.html#codec-base-classes for explanations of possible values

stdio_client async

stdio_client( server: StdioServerParameters, errlog: TextIO = stderr ) 

Client transport for stdio: this will connect to a server by spawning a process and communicating with it over stdin/stdout.

Source code in src/mcp/client/stdio/__init__.py
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
@asynccontextmanager async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stderr):  """  Client transport for stdio: this will connect to a server by spawning a  process and communicating with it over stdin/stdout.  """ read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] write_stream: MemoryObjectSendStream[SessionMessage] write_stream_reader: MemoryObjectReceiveStream[SessionMessage] read_stream_writer, read_stream = anyio.create_memory_object_stream(0) write_stream, write_stream_reader = anyio.create_memory_object_stream(0) try: command = _get_executable_command(server.command) # Open process with stderr piped for capture process = await _create_platform_compatible_process( command=command, args=server.args, env=({**get_default_environment(), **server.env} if server.env is not None else get_default_environment()), errlog=errlog, cwd=server.cwd, ) except OSError: # Clean up streams if process creation fails await read_stream.aclose() await write_stream.aclose() await read_stream_writer.aclose() await write_stream_reader.aclose() raise async def stdout_reader(): assert process.stdout, "Opened process is missing stdout" try: async with read_stream_writer: buffer = "" async for chunk in TextReceiveStream( process.stdout, encoding=server.encoding, errors=server.encoding_error_handler, ): lines = (buffer + chunk).split("\n") buffer = lines.pop() for line in lines: try: message = types.JSONRPCMessage.model_validate_json(line) except Exception as exc: logger.exception("Failed to parse JSONRPC message from server") await read_stream_writer.send(exc) continue session_message = SessionMessage(message) await read_stream_writer.send(session_message) except anyio.ClosedResourceError: await anyio.lowlevel.checkpoint() async def stdin_writer(): assert process.stdin, "Opened process is missing stdin" try: async with write_stream_reader: async for session_message in write_stream_reader: json = session_message.message.model_dump_json(by_alias=True, exclude_none=True) await process.stdin.send( (json + "\n").encode( encoding=server.encoding, errors=server.encoding_error_handler, ) ) except anyio.ClosedResourceError: await anyio.lowlevel.checkpoint() async with ( anyio.create_task_group() as tg, process, ): tg.start_soon(stdout_reader) tg.start_soon(stdin_writer) try: yield read_stream, write_stream finally: # MCP spec: stdio shutdown sequence # 1. Close input stream to server # 2. Wait for server to exit, or send SIGTERM if it doesn't exit in time # 3. Send SIGKILL if still not exited if process.stdin: try: await process.stdin.aclose() except Exception: # stdin might already be closed, which is fine pass try: # Give the process time to exit gracefully after stdin closes with anyio.fail_after(PROCESS_TERMINATION_TIMEOUT): await process.wait() except TimeoutError: # Process didn't exit from stdin closure, use platform-specific termination # which handles SIGTERM -> SIGKILL escalation await _terminate_process_tree(process) except ProcessLookupError: # Process already exited, which is fine pass await read_stream.aclose() await write_stream.aclose() await read_stream_writer.aclose() await write_stream_reader.aclose() 

ServerSession

Bases: BaseSession[ServerRequest, ServerNotification, ServerResult, ClientRequest, ClientNotification]

Source code in src/mcp/server/session.py
 71  72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89  90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
class ServerSession( BaseSession[ types.ServerRequest, types.ServerNotification, types.ServerResult, types.ClientRequest, types.ClientNotification, ] ): _initialized: InitializationState = InitializationState.NotInitialized _client_params: types.InitializeRequestParams | None = None def __init__( self, read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], write_stream: MemoryObjectSendStream[SessionMessage], init_options: InitializationOptions, stateless: bool = False, ) -> None: super().__init__(read_stream, write_stream, types.ClientRequest, types.ClientNotification) self._initialization_state = ( InitializationState.Initialized if stateless else InitializationState.NotInitialized ) self._init_options = init_options self._incoming_message_stream_writer, self._incoming_message_stream_reader = anyio.create_memory_object_stream[ ServerRequestResponder ](0) self._exit_stack.push_async_callback(lambda: self._incoming_message_stream_reader.aclose()) @property def client_params(self) -> types.InitializeRequestParams | None: return self._client_params def check_client_capability(self, capability: types.ClientCapabilities) -> bool:  """Check if the client supports a specific capability.""" if self._client_params is None: return False # Get client capabilities from initialization params client_caps = self._client_params.capabilities # Check each specified capability in the passed in capability object if capability.roots is not None: if client_caps.roots is None: return False if capability.roots.listChanged and not client_caps.roots.listChanged: return False if capability.sampling is not None: if client_caps.sampling is None: return False if capability.elicitation is not None: if client_caps.elicitation is None: return False if capability.experimental is not None: if client_caps.experimental is None: return False # Check each experimental capability for exp_key, exp_value in capability.experimental.items(): if exp_key not in client_caps.experimental or client_caps.experimental[exp_key] != exp_value: return False return True async def _receive_loop(self) -> None: async with self._incoming_message_stream_writer: await super()._receive_loop() async def _received_request(self, responder: RequestResponder[types.ClientRequest, types.ServerResult]): match responder.request.root: case types.InitializeRequest(params=params): requested_version = params.protocolVersion self._initialization_state = InitializationState.Initializing self._client_params = params with responder: await responder.respond( types.ServerResult( types.InitializeResult( protocolVersion=requested_version if requested_version in SUPPORTED_PROTOCOL_VERSIONS else types.LATEST_PROTOCOL_VERSION, capabilities=self._init_options.capabilities, serverInfo=types.Implementation( name=self._init_options.server_name, version=self._init_options.server_version, websiteUrl=self._init_options.website_url, icons=self._init_options.icons, ), instructions=self._init_options.instructions, ) ) ) self._initialization_state = InitializationState.Initialized case types.PingRequest(): # Ping requests are allowed at any time pass case _: if self._initialization_state != InitializationState.Initialized: raise RuntimeError("Received request before initialization was complete") async def _received_notification(self, notification: types.ClientNotification) -> None: # Need this to avoid ASYNC910 await anyio.lowlevel.checkpoint() match notification.root: case types.InitializedNotification(): self._initialization_state = InitializationState.Initialized case _: if self._initialization_state != InitializationState.Initialized: raise RuntimeError("Received notification before initialization was complete") async def send_log_message( self, level: types.LoggingLevel, data: Any, logger: str | None = None, related_request_id: types.RequestId | None = None, ) -> None:  """Send a log message notification.""" await self.send_notification( types.ServerNotification( types.LoggingMessageNotification( params=types.LoggingMessageNotificationParams( level=level, data=data, logger=logger, ), ) ), related_request_id, ) async def send_resource_updated(self, uri: AnyUrl) -> None:  """Send a resource updated notification.""" await self.send_notification( types.ServerNotification( types.ResourceUpdatedNotification( params=types.ResourceUpdatedNotificationParams(uri=uri), ) ) ) async def create_message( self, messages: list[types.SamplingMessage], *, max_tokens: int, system_prompt: str | None = None, include_context: types.IncludeContext | None = None, temperature: float | None = None, stop_sequences: list[str] | None = None, metadata: dict[str, Any] | None = None, model_preferences: types.ModelPreferences | None = None, related_request_id: types.RequestId | None = None, ) -> types.CreateMessageResult:  """Send a sampling/create_message request.""" return await self.send_request( request=types.ServerRequest( types.CreateMessageRequest( params=types.CreateMessageRequestParams( messages=messages, systemPrompt=system_prompt, includeContext=include_context, temperature=temperature, maxTokens=max_tokens, stopSequences=stop_sequences, metadata=metadata, modelPreferences=model_preferences, ), ) ), result_type=types.CreateMessageResult, metadata=ServerMessageMetadata( related_request_id=related_request_id, ), ) async def list_roots(self) -> types.ListRootsResult:  """Send a roots/list request.""" return await self.send_request( types.ServerRequest(types.ListRootsRequest()), types.ListRootsResult, ) async def elicit( self, message: str, requestedSchema: types.ElicitRequestedSchema, related_request_id: types.RequestId | None = None, ) -> types.ElicitResult:  """Send an elicitation/create request.  Args:  message: The message to present to the user  requestedSchema: Schema defining the expected response structure  Returns:  The client's response  """ return await self.send_request( types.ServerRequest( types.ElicitRequest( params=types.ElicitRequestParams( message=message, requestedSchema=requestedSchema, ), ) ), types.ElicitResult, metadata=ServerMessageMetadata(related_request_id=related_request_id), ) async def send_ping(self) -> types.EmptyResult:  """Send a ping request.""" return await self.send_request( types.ServerRequest(types.PingRequest()), types.EmptyResult, ) async def send_progress_notification( self, progress_token: str | int, progress: float, total: float | None = None, message: str | None = None, related_request_id: str | None = None, ) -> None:  """Send a progress notification.""" await self.send_notification( types.ServerNotification( types.ProgressNotification( params=types.ProgressNotificationParams( progressToken=progress_token, progress=progress, total=total, message=message, ), ) ), related_request_id, ) async def send_resource_list_changed(self) -> None:  """Send a resource list changed notification.""" await self.send_notification(types.ServerNotification(types.ResourceListChangedNotification())) async def send_tool_list_changed(self) -> None:  """Send a tool list changed notification.""" await self.send_notification(types.ServerNotification(types.ToolListChangedNotification())) async def send_prompt_list_changed(self) -> None:  """Send a prompt list changed notification.""" await self.send_notification(types.ServerNotification(types.PromptListChangedNotification())) async def _handle_incoming(self, req: ServerRequestResponder) -> None: await self._incoming_message_stream_writer.send(req) @property def incoming_messages( self, ) -> MemoryObjectReceiveStream[ServerRequestResponder]: return self._incoming_message_stream_reader 

check_client_capability

check_client_capability( capability: ClientCapabilities, ) -> bool 

Check if the client supports a specific capability.

Source code in src/mcp/server/session.py
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
def check_client_capability(self, capability: types.ClientCapabilities) -> bool:  """Check if the client supports a specific capability.""" if self._client_params is None: return False # Get client capabilities from initialization params client_caps = self._client_params.capabilities # Check each specified capability in the passed in capability object if capability.roots is not None: if client_caps.roots is None: return False if capability.roots.listChanged and not client_caps.roots.listChanged: return False if capability.sampling is not None: if client_caps.sampling is None: return False if capability.elicitation is not None: if client_caps.elicitation is None: return False if capability.experimental is not None: if client_caps.experimental is None: return False # Check each experimental capability for exp_key, exp_value in capability.experimental.items(): if exp_key not in client_caps.experimental or client_caps.experimental[exp_key] != exp_value: return False return True 

send_log_message async

send_log_message( level: LoggingLevel, data: Any, logger: str | None = None, related_request_id: RequestId | None = None, ) -> None 

Send a log message notification.

Source code in src/mcp/server/session.py
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
async def send_log_message( self, level: types.LoggingLevel, data: Any, logger: str | None = None, related_request_id: types.RequestId | None = None, ) -> None:  """Send a log message notification.""" await self.send_notification( types.ServerNotification( types.LoggingMessageNotification( params=types.LoggingMessageNotificationParams( level=level, data=data, logger=logger, ), ) ), related_request_id, ) 

send_resource_updated async

send_resource_updated(uri: AnyUrl) -> None 

Send a resource updated notification.

Source code in src/mcp/server/session.py
205 206 207 208 209 210 211 212 213
async def send_resource_updated(self, uri: AnyUrl) -> None:  """Send a resource updated notification.""" await self.send_notification( types.ServerNotification( types.ResourceUpdatedNotification( params=types.ResourceUpdatedNotificationParams(uri=uri), ) ) ) 

create_message async

create_message( messages: list[SamplingMessage], *, max_tokens: int, system_prompt: str | None = None, include_context: IncludeContext | None = None, temperature: float | None = None, stop_sequences: list[str] | None = None, metadata: dict[str, Any] | None = None, model_preferences: ModelPreferences | None = None, related_request_id: RequestId | None = None ) -> CreateMessageResult 

Send a sampling/create_message request.

Source code in src/mcp/server/session.py
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
async def create_message( self, messages: list[types.SamplingMessage], *, max_tokens: int, system_prompt: str | None = None, include_context: types.IncludeContext | None = None, temperature: float | None = None, stop_sequences: list[str] | None = None, metadata: dict[str, Any] | None = None, model_preferences: types.ModelPreferences | None = None, related_request_id: types.RequestId | None = None, ) -> types.CreateMessageResult:  """Send a sampling/create_message request.""" return await self.send_request( request=types.ServerRequest( types.CreateMessageRequest( params=types.CreateMessageRequestParams( messages=messages, systemPrompt=system_prompt, includeContext=include_context, temperature=temperature, maxTokens=max_tokens, stopSequences=stop_sequences, metadata=metadata, modelPreferences=model_preferences, ), ) ), result_type=types.CreateMessageResult, metadata=ServerMessageMetadata( related_request_id=related_request_id, ), ) 

list_roots async

list_roots() -> ListRootsResult 

Send a roots/list request.

Source code in src/mcp/server/session.py
250 251 252 253 254 255
async def list_roots(self) -> types.ListRootsResult:  """Send a roots/list request.""" return await self.send_request( types.ServerRequest(types.ListRootsRequest()), types.ListRootsResult, ) 

elicit async

elicit( message: str, requestedSchema: ElicitRequestedSchema, related_request_id: RequestId | None = None, ) -> ElicitResult 

Send an elicitation/create request.

Parameters:

Name Type Description Default
message str

The message to present to the user

required
requestedSchema ElicitRequestedSchema

Schema defining the expected response structure

required

Returns:

Type Description
ElicitResult

The client's response

Source code in src/mcp/server/session.py
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
async def elicit( self, message: str, requestedSchema: types.ElicitRequestedSchema, related_request_id: types.RequestId | None = None, ) -> types.ElicitResult:  """Send an elicitation/create request.  Args:  message: The message to present to the user  requestedSchema: Schema defining the expected response structure  Returns:  The client's response  """ return await self.send_request( types.ServerRequest( types.ElicitRequest( params=types.ElicitRequestParams( message=message, requestedSchema=requestedSchema, ), ) ), types.ElicitResult, metadata=ServerMessageMetadata(related_request_id=related_request_id), ) 

send_ping async

send_ping() -> EmptyResult 

Send a ping request.

Source code in src/mcp/server/session.py
285 286 287 288 289 290
async def send_ping(self) -> types.EmptyResult:  """Send a ping request.""" return await self.send_request( types.ServerRequest(types.PingRequest()), types.EmptyResult, ) 

send_progress_notification async

send_progress_notification( progress_token: str | int, progress: float, total: float | None = None, message: str | None = None, related_request_id: str | None = None, ) -> None 

Send a progress notification.

Source code in src/mcp/server/session.py
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
async def send_progress_notification( self, progress_token: str | int, progress: float, total: float | None = None, message: str | None = None, related_request_id: str | None = None, ) -> None:  """Send a progress notification.""" await self.send_notification( types.ServerNotification( types.ProgressNotification( params=types.ProgressNotificationParams( progressToken=progress_token, progress=progress, total=total, message=message, ), ) ), related_request_id, ) 

send_resource_list_changed async

send_resource_list_changed() -> None 

Send a resource list changed notification.

Source code in src/mcp/server/session.py
315 316 317
async def send_resource_list_changed(self) -> None:  """Send a resource list changed notification.""" await self.send_notification(types.ServerNotification(types.ResourceListChangedNotification())) 

send_tool_list_changed async

send_tool_list_changed() -> None 

Send a tool list changed notification.

Source code in src/mcp/server/session.py
319 320 321
async def send_tool_list_changed(self) -> None:  """Send a tool list changed notification.""" await self.send_notification(types.ServerNotification(types.ToolListChangedNotification())) 

send_prompt_list_changed async

send_prompt_list_changed() -> None 

Send a prompt list changed notification.

Source code in src/mcp/server/session.py
323 324 325
async def send_prompt_list_changed(self) -> None:  """Send a prompt list changed notification.""" await self.send_notification(types.ServerNotification(types.PromptListChangedNotification())) 

stdio_server async

stdio_server( stdin: AsyncFile[str] | None = None, stdout: AsyncFile[str] | None = None, ) 

Server transport for stdio: this communicates with an MCP client by reading from the current process' stdin and writing to stdout.

Source code in src/mcp/server/stdio.py
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
@asynccontextmanager async def stdio_server( stdin: anyio.AsyncFile[str] | None = None, stdout: anyio.AsyncFile[str] | None = None, ):  """  Server transport for stdio: this communicates with an MCP client by reading  from the current process' stdin and writing to stdout.  """ # Purposely not using context managers for these, as we don't want to close # standard process handles. Encoding of stdin/stdout as text streams on # python is platform-dependent (Windows is particularly problematic), so we # re-wrap the underlying binary stream to ensure UTF-8. if not stdin: stdin = anyio.wrap_file(TextIOWrapper(sys.stdin.buffer, encoding="utf-8")) if not stdout: stdout = anyio.wrap_file(TextIOWrapper(sys.stdout.buffer, encoding="utf-8")) read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] write_stream: MemoryObjectSendStream[SessionMessage] write_stream_reader: MemoryObjectReceiveStream[SessionMessage] read_stream_writer, read_stream = anyio.create_memory_object_stream(0) write_stream, write_stream_reader = anyio.create_memory_object_stream(0) async def stdin_reader(): try: async with read_stream_writer: async for line in stdin: try: message = types.JSONRPCMessage.model_validate_json(line) except Exception as exc: await read_stream_writer.send(exc) continue session_message = SessionMessage(message) await read_stream_writer.send(session_message) except anyio.ClosedResourceError: await anyio.lowlevel.checkpoint() async def stdout_writer(): try: async with write_stream_reader: async for session_message in write_stream_reader: json = session_message.message.model_dump_json(by_alias=True, exclude_none=True) await stdout.write(json + "\n") await stdout.flush() except anyio.ClosedResourceError: await anyio.lowlevel.checkpoint() async with anyio.create_task_group() as tg: tg.start_soon(stdin_reader) tg.start_soon(stdout_writer) yield read_stream, write_stream 

McpError

Bases: Exception

Exception type raised when an error arrives over an MCP connection.

Source code in src/mcp/shared/exceptions.py
 4  5  6  7  8  9 10 11 12 13 14
class McpError(Exception):  """  Exception type raised when an error arrives over an MCP connection.  """ error: ErrorData def __init__(self, error: ErrorData):  """Initialize McpError.""" super().__init__(error.message) self.error = error 

__init__

__init__(error: ErrorData) 

Initialize McpError.

Source code in src/mcp/shared/exceptions.py
11 12 13 14
def __init__(self, error: ErrorData):  """Initialize McpError.""" super().__init__(error.message) self.error = error 

CallToolRequest

Bases: Request[CallToolRequestParams, Literal['tools/call']]

Used by the client to invoke a tool provided by the server.

Source code in src/mcp/types.py
909 910 911 912 913
class CallToolRequest(Request[CallToolRequestParams, Literal["tools/call"]]):  """Used by the client to invoke a tool provided by the server.""" method: Literal["tools/call"] = "tools/call" params: CallToolRequestParams 

ClientCapabilities

Bases: BaseModel

Capabilities a client may support.

Source code in src/mcp/types.py
265 266 267 268 269 270 271 272 273 274 275 276
class ClientCapabilities(BaseModel):  """Capabilities a client may support.""" experimental: dict[str, dict[str, Any]] | None = None  """Experimental, non-standard capabilities that the client supports.""" sampling: SamplingCapability | None = None  """Present if the client supports sampling from an LLM.""" elicitation: ElicitationCapability | None = None  """Present if the client supports elicitation from the user.""" roots: RootsCapability | None = None  """Present if the client supports listing roots.""" model_config = ConfigDict(extra="allow") 

experimental class-attribute instance-attribute

experimental: dict[str, dict[str, Any]] | None = None 

Experimental, non-standard capabilities that the client supports.

sampling class-attribute instance-attribute

sampling: SamplingCapability | None = None 

Present if the client supports sampling from an LLM.

elicitation class-attribute instance-attribute

elicitation: ElicitationCapability | None = None 

Present if the client supports elicitation from the user.

roots class-attribute instance-attribute

roots: RootsCapability | None = None 

Present if the client supports listing roots.

CompleteRequest

Bases: Request[CompleteRequestParams, Literal['completion/complete']]

A request from the client to the server, to ask for completion options.

Source code in src/mcp/types.py
1135 1136 1137 1138 1139
class CompleteRequest(Request[CompleteRequestParams, Literal["completion/complete"]]):  """A request from the client to the server, to ask for completion options.""" method: Literal["completion/complete"] = "completion/complete" params: CompleteRequestParams 

CreateMessageRequest

Bases: Request[CreateMessageRequestParams, Literal['sampling/createMessage']]

A request from the server to sample an LLM via the client.

Source code in src/mcp/types.py
1063 1064 1065 1066 1067
class CreateMessageRequest(Request[CreateMessageRequestParams, Literal["sampling/createMessage"]]):  """A request from the server to sample an LLM via the client.""" method: Literal["sampling/createMessage"] = "sampling/createMessage" params: CreateMessageRequestParams 

CreateMessageResult

Bases: Result

The client's response to a sampling/create_message request from the server.

Source code in src/mcp/types.py
1073 1074 1075 1076 1077 1078 1079 1080 1081
class CreateMessageResult(Result):  """The client's response to a sampling/create_message request from the server.""" role: Role content: TextContent | ImageContent | AudioContent model: str  """The name of the model that generated the message.""" stopReason: StopReason | None = None  """The reason why sampling stopped, if known.""" 

model instance-attribute

model: str 

The name of the model that generated the message.

stopReason class-attribute instance-attribute

stopReason: StopReason | None = None 

The reason why sampling stopped, if known.

ErrorData

Bases: BaseModel

Error information for JSON-RPC error responses.

Source code in src/mcp/types.py
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
class ErrorData(BaseModel):  """Error information for JSON-RPC error responses.""" code: int  """The error type that occurred.""" message: str  """  A short description of the error. The message SHOULD be limited to a concise single  sentence.  """ data: Any | None = None  """  Additional information about the error. The value of this member is defined by the  sender (e.g. detailed error information, nested errors etc.).  """ model_config = ConfigDict(extra="allow") 

code instance-attribute

code: int 

The error type that occurred.

message instance-attribute

message: str 

A short description of the error. The message SHOULD be limited to a concise single sentence.

data class-attribute instance-attribute

data: Any | None = None 

Additional information about the error. The value of this member is defined by the sender (e.g. detailed error information, nested errors etc.).

GetPromptRequest

Bases: Request[GetPromptRequestParams, Literal['prompts/get']]

Used by the client to get a prompt provided by the server.

Source code in src/mcp/types.py
683 684 685 686 687
class GetPromptRequest(Request[GetPromptRequestParams, Literal["prompts/get"]]):  """Used by the client to get a prompt provided by the server.""" method: Literal["prompts/get"] = "prompts/get" params: GetPromptRequestParams 

GetPromptResult

Bases: Result

The server's response to a prompts/get request from the client.

Source code in src/mcp/types.py
797 798 799 800 801 802
class GetPromptResult(Result):  """The server's response to a prompts/get request from the client.""" description: str | None = None  """An optional description for the prompt.""" messages: list[PromptMessage] 

description class-attribute instance-attribute

description: str | None = None 

An optional description for the prompt.

Implementation

Bases: BaseMetadata

Describes the name and version of an MCP implementation.

Source code in src/mcp/types.py
231 232 233 234 235 236 237 238 239 240 241 242
class Implementation(BaseMetadata):  """Describes the name and version of an MCP implementation.""" version: str websiteUrl: str | None = None  """An optional URL of the website for this implementation.""" icons: list[Icon] | None = None  """An optional list of icons for this implementation.""" model_config = ConfigDict(extra="allow") 

websiteUrl class-attribute instance-attribute

websiteUrl: str | None = None 

An optional URL of the website for this implementation.

icons class-attribute instance-attribute

icons: list[Icon] | None = None 

An optional list of icons for this implementation.

InitializedNotification

Bases: Notification[NotificationParams | None, Literal['notifications/initialized']]

This notification is sent from the client to the server after initialization has finished.

Source code in src/mcp/types.py
366 367 368 369 370 371 372 373
class InitializedNotification(Notification[NotificationParams | None, Literal["notifications/initialized"]]):  """  This notification is sent from the client to the server after initialization has  finished.  """ method: Literal["notifications/initialized"] = "notifications/initialized" params: NotificationParams | None = None 

InitializeRequest

Bases: Request[InitializeRequestParams, Literal['initialize']]

This request is sent from the client to the server when it first connects, asking it to begin initialization.

Source code in src/mcp/types.py
345 346 347 348 349 350 351 352
class InitializeRequest(Request[InitializeRequestParams, Literal["initialize"]]):  """  This request is sent from the client to the server when it first connects, asking it  to begin initialization.  """ method: Literal["initialize"] = "initialize" params: InitializeRequestParams 

InitializeResult

Bases: Result

After receiving an initialize request from the client, the server sends this.

Source code in src/mcp/types.py
355 356 357 358 359 360 361 362 363
class InitializeResult(Result):  """After receiving an initialize request from the client, the server sends this.""" protocolVersion: str | int  """The version of the Model Context Protocol that the server wants to use.""" capabilities: ServerCapabilities serverInfo: Implementation instructions: str | None = None  """Instructions describing how to use the server and its features.""" 

protocolVersion instance-attribute

protocolVersion: str | int 

The version of the Model Context Protocol that the server wants to use.

instructions class-attribute instance-attribute

instructions: str | None = None 

Instructions describing how to use the server and its features.

JSONRPCError

Bases: BaseModel

A response to a request that indicates an error occurred.

Source code in src/mcp/types.py
182 183 184 185 186 187 188
class JSONRPCError(BaseModel):  """A response to a request that indicates an error occurred.""" jsonrpc: Literal["2.0"] id: str | int error: ErrorData model_config = ConfigDict(extra="allow") 

JSONRPCRequest

Bases: Request[dict[str, Any] | None, str]

A request that expects a response.

Source code in src/mcp/types.py
124 125 126 127 128 129 130
class JSONRPCRequest(Request[dict[str, Any] | None, str]):  """A request that expects a response.""" jsonrpc: Literal["2.0"] id: RequestId method: str params: dict[str, Any] | None = None 

JSONRPCResponse

Bases: BaseModel

A successful (non-error) response to a request.

Source code in src/mcp/types.py
140 141 142 143 144 145 146
class JSONRPCResponse(BaseModel):  """A successful (non-error) response to a request.""" jsonrpc: Literal["2.0"] id: RequestId result: dict[str, Any] model_config = ConfigDict(extra="allow") 

ListPromptsRequest

Bases: PaginatedRequest[Literal['prompts/list']]

Sent from the client to request a list of prompts and prompt templates.

Source code in src/mcp/types.py
632 633 634 635
class ListPromptsRequest(PaginatedRequest[Literal["prompts/list"]]):  """Sent from the client to request a list of prompts and prompt templates.""" method: Literal["prompts/list"] = "prompts/list" 

ListPromptsResult

Bases: PaginatedResult

The server's response to a prompts/list request from the client.

Source code in src/mcp/types.py
667 668 669 670
class ListPromptsResult(PaginatedResult):  """The server's response to a prompts/list request from the client.""" prompts: list[Prompt] 

ListResourcesRequest

Bases: PaginatedRequest[Literal['resources/list']]

Sent from the client to request a list of resources the server has.

Source code in src/mcp/types.py
419 420 421 422
class ListResourcesRequest(PaginatedRequest[Literal["resources/list"]]):  """Sent from the client to request a list of resources the server has.""" method: Literal["resources/list"] = "resources/list" 

ListResourcesResult

Bases: PaginatedResult

The server's response to a resources/list request from the client.

Source code in src/mcp/types.py
484 485 486 487
class ListResourcesResult(PaginatedResult):  """The server's response to a resources/list request from the client.""" resources: list[Resource] 

ListToolsResult

Bases: PaginatedResult

The server's response to a tools/list request from the client.

Source code in src/mcp/types.py
895 896 897 898
class ListToolsResult(PaginatedResult):  """The server's response to a tools/list request from the client.""" tools: list[Tool] 

LoggingMessageNotification

Bases: Notification[LoggingMessageNotificationParams, Literal['notifications/message']]

Notification of a log message passed from server to client.

Source code in src/mcp/types.py
968 969 970 971 972
class LoggingMessageNotification(Notification[LoggingMessageNotificationParams, Literal["notifications/message"]]):  """Notification of a log message passed from server to client.""" method: Literal["notifications/message"] = "notifications/message" params: LoggingMessageNotificationParams 

Notification

Bases: BaseModel, Generic[NotificationParamsT, MethodT]

Base class for JSON-RPC notifications.

Source code in src/mcp/types.py
 97  98  99 100 101 102
class Notification(BaseModel, Generic[NotificationParamsT, MethodT]):  """Base class for JSON-RPC notifications.""" method: MethodT params: NotificationParamsT model_config = ConfigDict(extra="allow") 

PingRequest

Bases: Request[RequestParams | None, Literal['ping']]

A ping, issued by either the server or the client, to check that the other party is still alive.

Source code in src/mcp/types.py
376 377 378 379 380 381 382 383
class PingRequest(Request[RequestParams | None, Literal["ping"]]):  """  A ping, issued by either the server or the client, to check that the other party is  still alive.  """ method: Literal["ping"] = "ping" params: RequestParams | None = None 

ProgressNotification

Bases: Notification[ProgressNotificationParams, Literal['notifications/progress']]

An out-of-band notification used to inform the receiver of a progress update for a long-running request.

Source code in src/mcp/types.py
409 410 411 412 413 414 415 416
class ProgressNotification(Notification[ProgressNotificationParams, Literal["notifications/progress"]]):  """  An out-of-band notification used to inform the receiver of a progress update for a  long-running request.  """ method: Literal["notifications/progress"] = "notifications/progress" params: ProgressNotificationParams 

PromptsCapability

Bases: BaseModel

Capability for prompts operations.

Source code in src/mcp/types.py
279 280 281 282 283 284
class PromptsCapability(BaseModel):  """Capability for prompts operations.""" listChanged: bool | None = None  """Whether this server supports notifications for changes to the prompt list.""" model_config = ConfigDict(extra="allow") 

listChanged class-attribute instance-attribute

listChanged: bool | None = None 

Whether this server supports notifications for changes to the prompt list.

ReadResourceRequest

Bases: Request[ReadResourceRequestParams, Literal['resources/read']]

Sent from the client to the server, to read a specific resource URI.

Source code in src/mcp/types.py
513 514 515 516 517
class ReadResourceRequest(Request[ReadResourceRequestParams, Literal["resources/read"]]):  """Sent from the client to the server, to read a specific resource URI.""" method: Literal["resources/read"] = "resources/read" params: ReadResourceRequestParams 

ReadResourceResult

Bases: Result

The server's response to a resources/read request from the client.

Source code in src/mcp/types.py
552 553 554 555
class ReadResourceResult(Result):  """The server's response to a resources/read request from the client.""" contents: list[TextResourceContents | BlobResourceContents] 

Resource

Bases: BaseMetadata

A known resource that the server is capable of reading.

Source code in src/mcp/types.py
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
class Resource(BaseMetadata):  """A known resource that the server is capable of reading.""" uri: Annotated[AnyUrl, UrlConstraints(host_required=False)]  """The URI of this resource.""" description: str | None = None  """A description of what this resource represents.""" mimeType: str | None = None  """The MIME type of this resource, if known.""" size: int | None = None  """  The size of the raw resource content, in bytes (i.e., before base64 encoding  or any tokenization), if known.  This can be used by Hosts to display file sizes and estimate context window usage.  """ icons: list[Icon] | None = None  """An optional list of icons for this resource.""" annotations: Annotations | None = None meta: dict[str, Any] | None = Field(alias="_meta", default=None)  """  See [MCP specification](https://github.com/modelcontextprotocol/modelcontextprotocol/blob/47339c03c143bb4ec01a26e721a1b8fe66634ebe/docs/specification/draft/basic/index.mdx#general-fields)  for notes on _meta usage.  """ model_config = ConfigDict(extra="allow") 

uri instance-attribute

uri: Annotated[AnyUrl, UrlConstraints(host_required=False)] 

The URI of this resource.

description class-attribute instance-attribute

description: str | None = None 

A description of what this resource represents.

mimeType class-attribute instance-attribute

mimeType: str | None = None 

The MIME type of this resource, if known.

size class-attribute instance-attribute

size: int | None = None 

The size of the raw resource content, in bytes (i.e., before base64 encoding or any tokenization), if known.

This can be used by Hosts to display file sizes and estimate context window usage.

icons class-attribute instance-attribute

icons: list[Icon] | None = None 

An optional list of icons for this resource.

meta class-attribute instance-attribute

meta: dict[str, Any] | None = Field( alias="_meta", default=None ) 

See MCP specification for notes on _meta usage.

ResourcesCapability

Bases: BaseModel

Capability for resources operations.

Source code in src/mcp/types.py
287 288 289 290 291 292 293 294
class ResourcesCapability(BaseModel):  """Capability for resources operations.""" subscribe: bool | None = None  """Whether this server supports subscribing to resource updates.""" listChanged: bool | None = None  """Whether this server supports notifications for changes to the resource list.""" model_config = ConfigDict(extra="allow") 

subscribe class-attribute instance-attribute

subscribe: bool | None = None 

Whether this server supports subscribing to resource updates.

listChanged class-attribute instance-attribute

listChanged: bool | None = None 

Whether this server supports notifications for changes to the resource list.

ResourceUpdatedNotification

Bases: Notification[ResourceUpdatedNotificationParams, Literal['notifications/resources/updated']]

A notification from the server to the client, informing it that a resource has changed and may need to be read again.

Source code in src/mcp/types.py
620 621 622 623 624 625 626 627 628 629
class ResourceUpdatedNotification( Notification[ResourceUpdatedNotificationParams, Literal["notifications/resources/updated"]] ):  """  A notification from the server to the client, informing it that a resource has  changed and may need to be read again.  """ method: Literal["notifications/resources/updated"] = "notifications/resources/updated" params: ResourceUpdatedNotificationParams 

RootsCapability

Bases: BaseModel

Capability for root operations.

Source code in src/mcp/types.py
245 246 247 248 249 250
class RootsCapability(BaseModel):  """Capability for root operations.""" listChanged: bool | None = None  """Whether the client supports notifications for changes to the roots list.""" model_config = ConfigDict(extra="allow") 

listChanged class-attribute instance-attribute

listChanged: bool | None = None 

Whether the client supports notifications for changes to the roots list.

SamplingMessage

Bases: BaseModel

Describes a message issued to or received from an LLM API.

Source code in src/mcp/types.py
745 746 747 748 749 750
class SamplingMessage(BaseModel):  """Describes a message issued to or received from an LLM API.""" role: Role content: TextContent | ImageContent | AudioContent model_config = ConfigDict(extra="allow") 

ServerCapabilities

Bases: BaseModel

Capabilities that a server may support.

Source code in src/mcp/types.py
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
class ServerCapabilities(BaseModel):  """Capabilities that a server may support.""" experimental: dict[str, dict[str, Any]] | None = None  """Experimental, non-standard capabilities that the server supports.""" logging: LoggingCapability | None = None  """Present if the server supports sending log messages to the client.""" prompts: PromptsCapability | None = None  """Present if the server offers any prompt templates.""" resources: ResourcesCapability | None = None  """Present if the server offers any resources to read.""" tools: ToolsCapability | None = None  """Present if the server offers any tools to call.""" completions: CompletionsCapability | None = None  """Present if the server offers autocompletion suggestions for prompts and resources.""" model_config = ConfigDict(extra="allow") 

experimental class-attribute instance-attribute

experimental: dict[str, dict[str, Any]] | None = None 

Experimental, non-standard capabilities that the server supports.

logging class-attribute instance-attribute

logging: LoggingCapability | None = None 

Present if the server supports sending log messages to the client.

prompts class-attribute instance-attribute

prompts: PromptsCapability | None = None 

Present if the server offers any prompt templates.

resources class-attribute instance-attribute

resources: ResourcesCapability | None = None 

Present if the server offers any resources to read.

tools class-attribute instance-attribute

tools: ToolsCapability | None = None 

Present if the server offers any tools to call.

completions class-attribute instance-attribute

completions: CompletionsCapability | None = None 

Present if the server offers autocompletion suggestions for prompts and resources.

SetLevelRequest

Bases: Request[SetLevelRequestParams, Literal['logging/setLevel']]

A request from the client to the server, to enable or adjust logging.

Source code in src/mcp/types.py
946 947 948 949 950
class SetLevelRequest(Request[SetLevelRequestParams, Literal["logging/setLevel"]]):  """A request from the client to the server, to enable or adjust logging.""" method: Literal["logging/setLevel"] = "logging/setLevel" params: SetLevelRequestParams 

SubscribeRequest

Bases: Request[SubscribeRequestParams, Literal['resources/subscribe']]

Sent from the client to request resources/updated notifications from the server whenever a particular resource changes.

Source code in src/mcp/types.py
581 582 583 584 585 586 587 588
class SubscribeRequest(Request[SubscribeRequestParams, Literal["resources/subscribe"]]):  """  Sent from the client to request resources/updated notifications from the server  whenever a particular resource changes.  """ method: Literal["resources/subscribe"] = "resources/subscribe" params: SubscribeRequestParams 

Tool

Bases: BaseMetadata

Definition for a tool the client can call.

Source code in src/mcp/types.py
871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892
class Tool(BaseMetadata):  """Definition for a tool the client can call.""" description: str | None = None  """A human-readable description of the tool.""" inputSchema: dict[str, Any]  """A JSON Schema object defining the expected parameters for the tool.""" outputSchema: dict[str, Any] | None = None  """  An optional JSON Schema object defining the structure of the tool's output  returned in the structuredContent field of a CallToolResult.  """ icons: list[Icon] | None = None  """An optional list of icons for this tool.""" annotations: ToolAnnotations | None = None  """Optional additional tool information.""" meta: dict[str, Any] | None = Field(alias="_meta", default=None)  """  See [MCP specification](https://github.com/modelcontextprotocol/modelcontextprotocol/blob/47339c03c143bb4ec01a26e721a1b8fe66634ebe/docs/specification/draft/basic/index.mdx#general-fields)  for notes on _meta usage.  """ model_config = ConfigDict(extra="allow") 

description class-attribute instance-attribute

description: str | None = None 

A human-readable description of the tool.

inputSchema instance-attribute

inputSchema: dict[str, Any] 

A JSON Schema object defining the expected parameters for the tool.

outputSchema class-attribute instance-attribute

outputSchema: dict[str, Any] | None = None 

An optional JSON Schema object defining the structure of the tool's output returned in the structuredContent field of a CallToolResult.

icons class-attribute instance-attribute

icons: list[Icon] | None = None 

An optional list of icons for this tool.

annotations class-attribute instance-attribute

annotations: ToolAnnotations | None = None 

Optional additional tool information.

meta class-attribute instance-attribute

meta: dict[str, Any] | None = Field( alias="_meta", default=None ) 

See MCP specification for notes on _meta usage.

ToolsCapability

Bases: BaseModel

Capability for tools operations.

Source code in src/mcp/types.py
297 298 299 300 301 302
class ToolsCapability(BaseModel):  """Capability for tools operations.""" listChanged: bool | None = None  """Whether this server supports notifications for changes to the tool list.""" model_config = ConfigDict(extra="allow") 

listChanged class-attribute instance-attribute

listChanged: bool | None = None 

Whether this server supports notifications for changes to the tool list.

UnsubscribeRequest

Bases: Request[UnsubscribeRequestParams, Literal['resources/unsubscribe']]

Sent from the client to request cancellation of resources/updated notifications from the server.

Source code in src/mcp/types.py
599 600 601 602 603 604 605 606
class UnsubscribeRequest(Request[UnsubscribeRequestParams, Literal["resources/unsubscribe"]]):  """  Sent from the client to request cancellation of resources/updated notifications from  the server.  """ method: Literal["resources/unsubscribe"] = "resources/unsubscribe" params: UnsubscribeRequestParams