All Products
Search
Document Center

OpenAPI Explorer:Integrate an OpenAPI MCP Server into an agent

Last Updated:Sep 17, 2025

This topic describes how to develop an OAuth 2.0 authorization flow using the official Model-Context-Protocol (MCP) source code and how to integrate an OpenAPI MCP Server into an agent.

Customize the OAuth authorization flow

The sample code implements the OAuth authorization flow based on the official MCP source code. To customize the OAuth authorization flow, you can modify the code in handle_redirect and handle_callback.

Important

In a production environment, you must securely store the OAuth token. This topic uses InMemoryTokenStorage for demonstration purposes.

# oauth_handler.py import asyncio import webbrowser from http.server import BaseHTTPRequestHandler, HTTPServer import threading from mcp.client.auth import OAuthClientProvider, TokenStorage from mcp.shared.auth import OAuthToken, OAuthClientInformationFull from urllib.parse import parse_qs, urlparse class InMemoryTokenStorage(TokenStorage): """Demo In-memory token storage implementation.""" def __init__(self): self.tokens: OAuthToken | None = None self.client_info: OAuthClientInformationFull | None = None async def get_tokens(self) -> OAuthToken | None: """Get stored tokens.""" return self.tokens async def set_tokens(self, tokens: OAuthToken) -> None: """Store tokens.""" self.tokens = tokens async def get_client_info(self) -> OAuthClientInformationFull | None: """Get stored client information.""" return self.client_info async def set_client_info(self, client_info: OAuthClientInformationFull) -> None: """Store client information.""" self.client_info = client_info class CallbackHandler(BaseHTTPRequestHandler): """HTTP handler for OAuth callback.""" def __init__(self, callback_server, *args, **kwargs): self.callback_server = callback_server super().__init__(*args, **kwargs) def do_GET(self): """Handle GET request for OAuth callback.""" try: # Parse parameters from the callback URL parsed_url = urlparse(self.path) params = parse_qs(parsed_url.query) if 'code' in params: # Get the authorization code code = params['code'][0] state = params.get('state', [None])[0] # Store the result self.callback_server.auth_code = code self.callback_server.auth_state = state self.callback_server.auth_received = True # Return a success page self.send_response(200) self.send_header('Content-type', 'text/html; charset=utf-8') self.end_headers() success_html = """<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Authorization Successful</title> </head> <body> <h1>Authorization Successful</h1> <p>Authorization successful. You can now return to your application.</p> <p>This window will automatically close in seconds.</p> <button onclick="window.close()">Close Now</button> <script> let count = 3; const el = document.getElementById('countdown'); const timer = setInterval(() => { count--; el.textContent = count; if (count <= 0) { clearInterval(timer); window.close(); } }, 1000); </script> </body> </html> """ self.wfile.write(success_html.encode('utf-8')) elif 'error' in params: # Handle the error error = params['error'][0] error_description = params.get('error_description', ['Unknown error'])[0] self.callback_server.auth_error = f"{error}: {error_description}" self.callback_server.auth_received = True # Return an error page self.send_response(400) self.send_header('Content-type', 'text/html; charset=utf-8') self.end_headers() error_html = f"""<!DOCTYPE html> <html lang=\"en\"> <head> <meta charset=\"UTF-8\"> <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\"> <title>Authorization Failed</title> </head> <body> <h1>Authorization Failed</h1> <p>An error occurred during the authorization process.</p> <p>{error}</p> <p>{error_description}</p> <button onclick=\"window.close()\">Close Window</button> </body> </html> """ self.wfile.write(error_html.encode('utf-8')) except Exception as e: self.callback_server.auth_error = str(e) self.callback_server.auth_received = True self.send_response(500) self.send_header('Content-type', 'text/html; charset=utf-8') self.end_headers() internal_error_html = f"""<!DOCTYPE html> <html lang=\"en\"> <head> <meta charset=\"UTF-8\"> <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\"> <title>Server Error</title> </head> <body> <h1>Internal Server Error</h1> <p>The server encountered an internal error and could not complete your request.</p> <pre>{str(e)}</pre> <button onclick=\"window.close()\">Close Window</button> </body> </html> """ self.wfile.write(internal_error_html.encode('utf-8')) def log_message(self, format, *args): """Suppress log output""" pass class CallbackServer: """OAuth callback server""" def __init__(self, port=3000): self.port = port self.server = None self.thread = None self.auth_code = None self.auth_state = None self.auth_error = None self.auth_received = False def start(self): """Start the callback server""" handler = lambda *args, **kwargs: CallbackHandler(self, *args, **kwargs) self.server = HTTPServer(('localhost', self.port), handler) self.thread = threading.Thread(target=self.server.serve_forever, daemon=True) self.thread.start() print(f"OAuth callback server started, listening on port {self.port}") def stop(self): """Stop the callback server""" if self.server: self.server.shutdown() self.server.server_close() if self.thread: self.thread.join(timeout=1) print("OAuth callback server stopped") async def wait_for_callback(self, timeout=300): """Wait for the OAuth callback""" start_time = asyncio.get_event_loop().time() while not self.auth_received: if asyncio.get_event_loop().time() - start_time > timeout: raise TimeoutError("Timed out waiting for OAuth callback") await asyncio.sleep(0.1) if self.auth_error: raise Exception(f"OAuth authorization failed: {self.auth_error}") return self.auth_code, self.auth_state # Global callback server instance _callback_server = None async def handle_redirect(auth_url: str) -> None: """Automatically open the browser for OAuth authorization""" global _callback_server # Start the callback server if _callback_server is None: _callback_server = CallbackServer(port=3000) _callback_server.start() print(f"Opening browser for OAuth authorization...") print(f"Authorization URL: {auth_url}") # Automatically open the browser webbrowser.open(auth_url) async def handle_callback() -> tuple[str, str | None]: """Automatically handle the OAuth callback""" global _callback_server if _callback_server is None: raise Exception("Callback server is not started") print("Waiting for OAuth authorization to complete...") try: # Wait for the callback code, state = await _callback_server.wait_for_callback() print("OAuth authorization successful!") return code, state except Exception as e: print(f"OAuth authorization failed: {e}") raise finally: # Clean up the server state, but keep the server running for reuse _callback_server.auth_code = None _callback_server.auth_state = None _callback_server.auth_error = None _callback_server.auth_received = False

Integrate MCP into an agent

This topic describes how to use mainstream agent frameworks to connect to an OpenAPI MCP Server through OAuth authentication. It also explains how to use large models and MCP Tools to manage Alibaba Cloud resources.

AgentScope

AgentScope is an open source agent framework from Alibaba. It supports features such as agent tool management, agent long-term memory control, and intelligent retrieval-augmented generation (RAG).

# -*- coding: utf-8 -*- """The main entry point of the ReAct agent example.""" import asyncio import os from agentscope.agent import ReActAgent, UserAgent from agentscope.formatter import DashScopeChatFormatter from agentscope.memory import InMemoryMemory from agentscope.model import DashScopeChatModel from agentscope.tool import ( Toolkit, execute_shell_command, execute_python_code, view_text_file, ) from agentscope.mcp import HttpStatelessClient from mcp.client.auth import OAuthClientProvider, OAuthClientInformationFull, OAuthClientMetadata, OAuthToken from pydantic import AnyUrl from oauth_handler import InMemoryTokenStorage, handle_redirect, handle_callback # openai base # read from .env load_dotenv() server_url = "https://openapi-mcp.cn-hangzhou.aliyuncs.com/accounts/14******/custom/****/id/KXy******/mcp" memory_token_storage = InMemoryTokenStorage() oauth_provider = OAuthClientProvider( server_url=server_url, client_metadata=OAuthClientMetadata( client_name="AgentScopeExampleClient", redirect_uris=[AnyUrl("http://localhost:3000/callback")], grant_types=["authorization_code", "refresh_token"], response_types=["code"], scope=None, ), storage=memory_token_storage, redirect_handler=handle_redirect, callback_handler=handle_callback, ) stateless_client = HttpStatelessClient( # The name used to identify MCP name="mcp_services_stateless", transport="streamable_http", url=server_url, auth=oauth_provider, ) async def main() -> None: """The main entry point for the ReAct agent example.""" toolkit = Toolkit() # toolkit.register_tool_function(execute_shell_command) # toolkit.register_tool_function(execute_python_code) # toolkit.register_tool_function(view_text_file) await toolkit.register_mcp_client(stateless_client) agent = ReActAgent( name="AlibabaCloudOpsAgent", sys_prompt="You are an Alibaba Cloud O&M Assistant. You are skilled at using various Alibaba Cloud products, such as ECS, RDS, and VPC, to fulfill my requests.", model=DashScopeChatModel( api_key=os.environ.get("DASHSCOPE_API_KEY"), model_name="qwen3-max-preview", enable_thinking=False, stream=True, ), formatter=DashScopeChatFormatter(), toolkit=toolkit, memory=InMemoryMemory(), ) user = UserAgent("User") msg = None while True: msg = await user(msg) if msg.get_text_content() == "exit": break msg = await agent(msg) asyncio.run(main())

LangGraph

LangGraph is a low-level orchestration framework for building, managing, and deploying long-running, stateful agents.

import asyncio import sys from dotenv import load_dotenv import os from langgraph.prebuilt import create_react_agent from langchain.chat_models import init_chat_model from langchain_mcp_adapters.client import MultiServerMCPClient from mcp.client.auth import OAuthClientProvider, OAuthClientInformationFull, OAuthClientMetadata, OAuthToken from pydantic import AnyUrl from oauth_handler import InMemoryTokenStorage, handle_callback, handle_redirect # openai base # read from .env load_dotenv() async def make_agent(): model = init_chat_model(model=os.getenv("OPENAI_MODEL"), api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL"), model_provider='openai') # Use the same server URL as the MCP service server_url = "https://openapi-mcp.cn-hangzhou.aliyuncs.com/accounts/1025904068912955/custom/test-ecs/id/1kB196nPAhRIbH1z/mcp" oauth_provider = OAuthClientProvider( server_url=server_url, client_metadata=OAuthClientMetadata( client_name="Example MCP Client", redirect_uris=[AnyUrl("http://localhost:3000/callback")], grant_types=["authorization_code", "refresh_token"], response_types=["code"], scope=None, ), storage=InMemoryTokenStorage(), redirect_handler=handle_redirect, callback_handler=handle_callback, ) mcp_client = MultiServerMCPClient( { "resourcecenter": { "url": server_url, "transport": "streamable_http", "auth": oauth_provider } } ) tools = await mcp_client.get_tools() agent = create_react_agent( model=model, tools=tools, prompt="You are a helpful assistant" ) return agent async def chat_loop(): """Chat loop""" # Create the agent print("Initializing AI assistant...") agent = await make_agent() print("AI assistant is ready! Enter 'quit' or 'exit' to leave\n") # Chat history messages = [] while True: try: # Get user input user_input = input("User: ").strip() # Check for exit command if user_input.lower() in ['quit', 'exit']: print("Goodbye!") break # Skip empty input if not user_input: continue # Add user message to history messages.append({"role": "user", "content": user_input}) print("AI: ", end="", flush=True) # Invoke the agent response = await agent.ainvoke( {"messages": messages}, {"recursion_limit": 50} ) # Extract the AI response ai_response = response["messages"][-1].content print(ai_response) # Add AI response to history messages.append({"role": "assistant", "content": ai_response}) print() # Separate with a blank line except KeyboardInterrupt: print("\n\nGoodbye!") break except Exception as e: print(f"Error: {e}") print("Please try again...\n") async def main(): await chat_loop() # Run the main function if __name__ == "__main__": asyncio.run(main())