import os
import base64
import requests
from dotenv import load_dotenv
from crewai import Agent, Task, Crew
# Load environment variables
load_dotenv()
# MCP Server configuration
MCP_SERVER_URL = os.getenv('MCP_SERVER_URL', 'https://mcp.cloud.cdata.com/mcp')
MCP_USERNAME = os.getenv('MCP_USERNAME', '')
MCP_PASSWORD = os.getenv('MCP_PASSWORD', '')
# Create Basic Auth header
auth_header = {}
if MCP_USERNAME and MCP_PASSWORD:
credentials = f"{MCP_USERNAME}:{MCP_PASSWORD}"
auth_header = {
"Authorization": f"Basic {base64.b64encode(credentials.encode()).decode()}"
}
# MCP tool invocation function
def call_mcp_tool(tool_name, input_data):
payload = {
"tool": tool_name,
"input": input_data
}
try:
response = requests.post(
MCP_SERVER_URL,
json=payload,
headers=auth_header,
timeout=10
)
response.raise_for_status()
return response.json()
except Exception as e:
return {"error": str(e)}
# Define CrewAI agent
class MCPQueryAgent(Agent):
def __init__(self):
super().__init__(
name="cdata_query_assistant",
role="Data Query Assistant",
goal="Help users explore and query CData Connect Cloud databases",
backstory="You are a helpful assistant trained to interact with CData Connect Cloud via MCP tools. You understand databases, schemas, and SQL queries, and you guide users through their data exploration journey."
)
def get_catalogs(self):
return call_mcp_tool("getCatalogs", {})
def get_schemas(self, catalog):
return call_mcp_tool("getSchemas", {"catalog": catalog})
def get_tables(self, catalog, schema):
return call_mcp_tool("getTables", {"catalog": catalog, "schema": schema})
def get_columns(self, catalog, schema, table):
return call_mcp_tool("getColumns", {
"catalog": catalog,
"schema": schema,
"table": table
})
def query_data(self, catalog, query):
return call_mcp_tool("queryData", {
"catalog": catalog,
"query": query
})
# Instantiate agent
agent = MCPQueryAgent()
# Define tasks
task1 = Task(
agent=agent,
description="List top 10 available catalogs in the CData Connect Cloud",
expected_output="Catalog list",
output_function=lambda agent: agent.get_catalogs()
)
# Create and run crew
crew = Crew(
agents=[agent],
tasks=[task1]
)
if __name__ == "__main__":
results = crew.kickoff()
print("\n=== Final Output ===")
for result in results:
print(result)