This is a Model Context Protocol (MCP) server that provides real-time cryptocurrency category analysis, portfolio optimization, and market insights.
returnsAndVolatility – Analyzes returns and volatility for top cryptocurrency categories over a
specified period.covarianceMatrix – Calculates the covariance matrix for top categories to understand how they
move together.correlationMatrix – Generates a correlation matrix showing relationships between cryptocurrency
categories.meanVariancePortfolioCalculation – Calculates optimal portfolio allocation using Modern
Portfolio Theory.markowitzMinimumVariancePortfolio – Calculates the mathematically optimal minimum variance
portfolio using Markowitz optimization.get-category-performance – Analyzes performance metrics for a specific cryptocurrency category.
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import OpenAI from "openai";
import readline from "readline/promises";
import { config } from "dotenv";
config(); // Load .env file
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
// Create readline interface for user input
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
async function initAndProcessQuery(baseUrl, query) {
// Create StreamableHTTP transport
const transport = new StreamableHTTPClientTransport(new URL(baseUrl));
const client = new Client({
name: "crypto-mcp-client",
version: "1.0.0"
});
try {
// Connect to MCP server
await client.connect(transport);
console.log("Connected to MCP server");
// List available tools
const toolsResponse = await client.listTools();
// Convert MCP tools to OpenAI function format
const functions = toolsResponse.tools.map(tool => ({
name: tool.name,
description: tool.description || "",
parameters: tool.inputSchema || {}
}));
// Ask GPT-4 to process the query
const messages = [
{
role: "system",
content: "Use the available cryptocurrency analysis tools to answer user questions about portfolio optimization and market analysis."
},
{
role: "user",
content: query
}
];
const chatResponse = await openai.chat.completions.create({
model: "gpt-4o",
messages: messages,
functions: functions,
function_call: "auto",
temperature: 0
});
const message = chatResponse.choices[0].message;
// Handle function calls
if (message.function_call) {
const functionName = message.function_call.name;
const functionArgs = JSON.parse(message.function_call.arguments);
// Call the MCP tool
const result = await client.callTool({
name: functionName,
arguments: functionArgs
});
// Get the result text
const toolOutput = result.content[0].text;
// Get final response from OpenAI
const finalResponse = await openai.chat.completions.create({
model: "gpt-4o",
messages: [
...messages,
{
role: "assistant",
content: null,
function_call: message.function_call
},
{
role: "function",
name: functionName,
content: toolOutput
}
]
});
return finalResponse.choices[0].message.content;
}
return message.content || "No response";
} finally {
// Clean up
await client.close();
await transport.close();
}
}
async function chatLoop(mcpUrl) {
console.log("\nCrypto Categories MCP + OpenAI Chat (type 'quit' to exit)\n");
while (true) {
const query = await rl.question("You: ");
if (!query.trim() || query.toLowerCase() === "quit") {
break;
}
try {
const response = await initAndProcessQuery(mcpUrl, query);
console.log("\nAssistant:", response, "\n");
} catch (error) {
console.error("Error during query:", error);
}
}
rl.close();
}
// Run the chat loop
if (import.meta.url === `file://${process.argv[1]}`) {
chatLoop("http://localhost:3000/mcp")
.catch(console.error);
}
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
async function runCryptoAnalysis() {
const transport = new StreamableHTTPClientTransport(
new URL("http://localhost:3000/mcp")
);
const client = new Client({
name: "crypto-direct-client",
version: "1.0.0"
});
try {
// Connect to server
await client.connect(transport);
console.log("Connected to Crypto Categories MCP server");
// List available tools
const tools = await client.listTools();
console.log("Available tools:", tools.tools.map(t => t.name));
// Example 1: Get returns and volatility
console.log("\n1. Analyzing top 10 categories returns and volatility...");
const volatilityResult = await client.callTool({
name: "returnsAndVolatility",
arguments: {
numberOfCategories: 10,
days: 30
}
});
console.log(volatilityResult.content[0].text);
// Example 2: Calculate correlation matrix
console.log("\n2. Calculating correlation matrix...");
const correlationResult = await client.callTool({
name: "correlationMatrix",
arguments: {
numberOfCategories: 5,
days: 30
}
});
console.log(correlationResult.content[0].text);
// Example 3: Optimize portfolio
console.log("\n3. Creating optimized portfolio...");
const portfolioResult = await client.callTool({
name: "meanVariancePortfolioCalculation",
arguments: {
numberOfCategories: 8,
riskTolerance: 0.5
}
});
console.log(portfolioResult.content[0].text);
// Example 4: Get a pre-built prompt
console.log("\n4. Using portfolio creation prompt...");
const prompt = await client.getPrompt({
name: "create-portfolio-10-categories"
});
console.log("Prompt content:", prompt.messages[0].content.text);
} catch (error) {
console.error("Error:", error);
} finally {
await client.close();
await transport.close();
}
}
// Run the analysis
runCryptoAnalysis().catch(console.error);
import asyncio
import os
import json
from typing import Dict, Any, List
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from dotenv import load_dotenv
import pprint
import logging
from litellm import completion, ModelResponse, OpenAIError # Import from litellm
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# --- Ollama Configuration ---
# litellm expects the base URL without /v1 for Ollama usually
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL)
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL")
# Set environment variables for litellm to know about Ollama
os.environ["OLLAMA_API_BASE"] = OLLAMA_BASE_URL
# litellm does not typically need an API key for Ollama, but some models might
# os.environ["OLLAMA_API_KEY"] = "ollama_dummy_key" # If your Ollama setup required one
async def init_and_process_query(mcp_base_url: str, query: str) -> str:
"""
Initialize MCP connection and process a query using litellm for Ollama (with function calling).
Args:
mcp_base_url: The MCP server URL.
query: User query to process.
Returns:
The assistant's response or an error message.
"""
logger.info(f"Connecting to MCP server at: {mcp_base_url}")
logger.info(f"Using Ollama model: {OLLAMA_MODEL} via litellm at {OLLAMA_BASE_URL}")
try:
async with streamablehttp_client(mcp_base_url) as (read_stream, write_stream, _,):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
logger.info("Successfully connected and initialized MCP session.")
tools_response = await session.list_tools()
logger.info(f"Found {len(tools_response.tools)} tools from MCP.")
# Convert MCP tools to litellm/OpenAI 'tools' format
# litellm expects "tools" with "type": "function"
litellm_tools = []
for tool in tools_response.tools:
parameters = tool.inputSchema if tool.inputSchema else {"type": "object", "properties": {}}
litellm_tools.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description or "",
"parameters": parameters
}
})
logger.debug(f"Added tool for litellm: {tool.name} with schema: {parameters}")
# Initial messages for the LLM
messages = [
{
"role": "system",
"content": "You are a web3 AI portfolio manager, use the available tools to answer user questions."
},
{
"role": "user",
"content": query
}
]
# --- First Call to Ollama via litellm for Tool Selection ---
try:
logger.info("Making initial call to Ollama (via litellm) for tool selection...")
chat_response: ModelResponse = completion(
model=f"ollama/{OLLAMA_MODEL}", # litellm's format for Ollama models
messages=messages,
tools=litellm_tools, # Pass tools
tool_choice="auto", # Let litellm/model decide
temperature=0
)
logger.debug(f"Ollama first response (via litellm): {chat_response.model_dump_json(indent=2)}")
except OpenAIError as e: # litellm can wrap errors as OpenAIError or its own custom errors
logger.error(f"LiteLLM/Ollama API error: {e.status_code if hasattr(e, 'status_code') else 'N/A'} - {e}")
return f"LiteLLM/Ollama API error: {e}. Please ensure Ollama is running and accessible at {OLLAMA_BASE_URL} and supports function calling."
except Exception as e:
logger.error(f"An unexpected error occurred during first LiteLLM/Ollama call: {e}", exc_info=True)
return f"An unexpected error occurred during the LLM's initial processing: {e}"
message = chat_response.choices[0].message
# Check for tool_calls in the message
if message.tool_calls:
# LiteLLM/OpenAI API returns a list of tool_calls
# For simplicity, we'll process the first tool call
tool_call = message.tool_calls[0]
function_name = tool_call.function.name
try:
function_args = json.loads(tool_call.function.arguments)
logger.info(f"Ollama requested function call: {function_name} with args: {function_args}")
except json.JSONDecodeError as e:
logger.error(f"Failed to decode function arguments JSON: {tool_call.function.arguments} - {e}")
return "LLM attempted to call a function but provided malformed arguments."
try:
# Call the MCP tool
logger.info(f"Calling MCP tool: {function_name}")
tool_response = await session.call_tool(
name=function_name,
arguments=function_args
)
tool_result = ""
if tool_response.content:
for content_part in tool_response.content:
if hasattr(content_part, 'text'):
tool_result += content_part.text
elif hasattr(content_part, 'data'):
try:
tool_result += str(content_part.data)
except Exception as data_err:
logger.warning(f"Could not convert data part to string: {data_err}")
tool_result += f""
logger.info(f"MCP tool '{function_name}' returned: {tool_result[:200]}...")
# Append original assistant message with tool_calls and then tool response
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [tool_call.model_dump()] # Ensure it's in the correct dict format
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id, # Crucial for linking tool response to tool call
"content": tool_result
})
# --- Second Call to Ollama via litellm for Final Response ---
logger.info("Making second call to Ollama (via litellm) for final response based on tool output...")
final_response_chat: ModelResponse = completion(
model=f"ollama/{OLLAMA_MODEL}",
messages=messages, # Send the full conversation hist
)
logger.debug(f"Ollama final response (via litellm): {final_response_chat.model_dump_json(indent=2)}")
final_content = final_response_chat.choices[0].message.content
return final_content or "No final response generated by the AI after tool execution."
except OpenAIError as e:
logger.error(f"LiteLLM/Ollama API error during final call: {e}")
return f"LiteLLM/Ollama API error during final response: {e}."
except Exception as e:
logger.error(f"Error during tool execution or final LLM call: {e}", exc_info=True)
return f"Error processing tool response or generating final answer: {str(e)}"
else:
# If no tool call was made, return the content from the first response
return message.content or "No specific response from AI."
except Exception as e:
logger.error(f"An unexpected error occurred during MCP initialization or query processing: {e}", exc_info=True)
return f"An unhandled error occurred: {str(e)}"
async def main():
mcp_url = "https://portfolio.chainaware.ai/mcp"
user_query = "show me correlation matrix for top 3 categories" # Example query
print(f"Querying: '{user_query}'")
response = await init_and_process_query(mcp_url, user_query)
print("\nAssistant Response:")
pprint.pprint(response)
if __name__ == "__main__":
asyncio.run(main())
MCP Endpoint: /mcp
Post messages via: /chat