WIP Python implementation using FastMCP

This commit is contained in:
Luiz Felipe Costa 2025-09-23 01:38:48 -03:00
parent 8cd790d2a6
commit 004d5614e2
63 changed files with 3074 additions and 1098 deletions

View file

@ -1,56 +1,16 @@
# Ghost MCP Development Environment Configuration # Ghost MCP Server Configuration
# Copy this file to .env and update the values as needed
# ============================================================================= # Ghost instance configuration
# Database Configuration
# =============================================================================
MYSQL_ROOT_PASSWORD=your_strong_root_password_here
MYSQL_DATABASE=ghost_dev
MYSQL_USER=ghost
MYSQL_PASSWORD=your_strong_ghost_db_password_here
# =============================================================================
# Ghost Configuration
# =============================================================================
# The URL where Ghost will be accessible
GHOST_URL=http://localhost:2368 GHOST_URL=http://localhost:2368
GHOST_CONTENT_API_KEY=your_content_api_key_here
GHOST_ADMIN_API_KEY=your_admin_api_key_here
GHOST_VERSION=v5.0
GHOST_MODE=auto
GHOST_TIMEOUT=30
GHOST_MAX_RETRIES=3
GHOST_RETRY_BACKOFF_FACTOR=2.0
# ============================================================================= # Logging configuration
# Email Configuration (Optional - for testing email features) LOG_LEVEL=info
# ============================================================================= LOG_STRUCTURED=true
# From address for Ghost emails LOG_REQUEST_ID=true
GHOST_MAIL_FROM=noreply@localhost
# SMTP Configuration (uncomment and configure if needed)
# MAIL_SERVICE=Gmail
# MAIL_USER=your-email@gmail.com
# MAIL_PASSWORD=your-app-password
# =============================================================================
# MCP Development Configuration (for future use)
# =============================================================================
# These will be used by the MCP server once implemented
# Ghost API Keys (obtain these after Ghost setup)
# GHOST_CONTENT_API_KEY=your_content_api_key_here
# GHOST_ADMIN_API_KEY=your_admin_api_key_here
# Ghost API Version
# GHOST_VERSION=v5.0
# MCP Operation Mode
# MCP_GHOST_MODE=auto
# =============================================================================
# Development Settings
# =============================================================================
# Set to 'development' for verbose logging
NODE_ENV=development
# =============================================================================
# Security Notes
# =============================================================================
# - Never commit the actual .env file to version control
# - Use strong, unique passwords for production
# - Keep API keys secure and never log them
# - For production, use proper SMTP configuration

View file

@ -1,34 +0,0 @@
{
"root": true,
"parser": "@typescript-eslint/parser",
"plugins": ["@typescript-eslint", "prettier"],
"extends": [
"eslint:recommended",
"@typescript-eslint/recommended",
"prettier"
],
"parserOptions": {
"ecmaVersion": 2022,
"sourceType": "module",
"project": "./tsconfig.json"
},
"env": {
"node": true,
"es2022": true
},
"rules": {
"prettier/prettier": "error",
"@typescript-eslint/no-unused-vars": "error",
"@typescript-eslint/explicit-function-return-type": "warn",
"@typescript-eslint/no-explicit-any": "warn",
"@typescript-eslint/no-non-null-assertion": "warn",
"@typescript-eslint/prefer-const": "error",
"@typescript-eslint/no-var-requires": "error",
"prefer-const": "error",
"no-var": "error",
"no-console": "warn",
"eqeqeq": "error",
"curly": "error"
},
"ignorePatterns": ["dist/", "node_modules/", "*.js"]
}

55
.gitignore vendored Normal file
View file

@ -0,0 +1,55 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Logs
*.log
# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/

View file

@ -1,11 +0,0 @@
{
"semi": true,
"trailingComma": "es5",
"singleQuote": true,
"printWidth": 80,
"tabWidth": 2,
"useTabs": false,
"bracketSpacing": true,
"arrowParens": "avoid",
"endOfLine": "lf"
}

7
CLAUDE.md Normal file
View file

@ -0,0 +1,7 @@
# !!!IMPORTANT!!!
- Always use `docker compose` instead of `docker-compose`

208
Makefile Normal file
View file

@ -0,0 +1,208 @@
# Ghost MCP Development Makefile
.PHONY: help install install-uv start-ghost stop-ghost setup-tokens test test-connection run dev clean logs status check-deps
# Default target
help: ## Show this help message
@echo "Ghost MCP Development Commands"
@echo "============================="
@echo ""
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'
@echo ""
@echo "Quick start:"
@echo " make install-uv # Install uv package manager (if not installed)"
@echo " make install # Install Python dependencies"
@echo " make start-ghost # Start Ghost and database containers"
@echo " make setup-tokens # Extract API keys and create .env file"
@echo " make test # Test the implementation"
@echo " make run # Run the MCP server"
# Python environment setup
install-uv: ## Install uv package manager
@echo "📦 Installing uv package manager..."
@if command -v uv >/dev/null 2>&1; then \
echo "✅ uv is already installed"; \
uv --version; \
else \
echo "Installing uv..."; \
curl -LsSf https://astral.sh/uv/install.sh | sh; \
echo "✅ uv installed successfully"; \
fi
install: ## Install Python dependencies using uv
@echo "📦 Installing Python dependencies with uv..."
@if ! command -v uv >/dev/null 2>&1; then \
echo "❌ uv not found. Run 'make install-uv' first"; \
exit 1; \
fi
uv sync
@echo "✅ Dependencies installed successfully"
install-pip: ## Install Python dependencies using pip (fallback)
@echo "📦 Installing Python dependencies with pip..."
python -m pip install -e .
@echo "✅ Dependencies installed successfully"
# Docker environment
start-ghost: ## Start Ghost and database containers
@echo "🐳 Starting Ghost development environment..."
docker-compose up -d
@echo "⏳ Waiting for containers to be healthy..."
@timeout=60; \
while [ $$timeout -gt 0 ]; do \
if docker-compose ps | grep -q "ghost-mcp-dev.*Up" && docker-compose ps | grep -q "ghost-db-dev.*Up.*healthy"; then \
echo "✅ Ghost containers are running and healthy"; \
break; \
fi; \
echo " Waiting for containers... ($$timeout seconds remaining)"; \
sleep 2; \
timeout=$$((timeout - 2)); \
done; \
if [ $$timeout -le 0 ]; then \
echo "❌ Containers did not start properly"; \
make logs; \
exit 1; \
fi
@echo ""
@echo "🎉 Ghost is ready!"
@echo " Ghost admin: http://localhost:2368/ghost/"
@echo " Ghost site: http://localhost:2368/"
@echo " Database: Available on port 3306"
stop-ghost: ## Stop Ghost and database containers
@echo "🛑 Stopping Ghost development environment..."
docker-compose down
@echo "✅ Ghost containers stopped"
restart-ghost: ## Restart Ghost and database containers
@echo "🔄 Restarting Ghost development environment..."
docker-compose restart
@echo "✅ Ghost containers restarted"
# API token setup
setup-tokens: ## Extract API keys from Ghost database and create .env file
@echo "🔑 Setting up API tokens..."
./scripts/setup-tokens.sh
# Testing
test-connection: ## Test Ghost API connectivity
@if [ ! -f .env ]; then \
echo "❌ .env file not found. Run 'make setup-tokens' first"; \
exit 1; \
fi
@python scripts/test-connection.py
test: check-deps test-connection ## Run all tests
@echo "🧪 Running comprehensive tests..."
@echo "Testing MCP tools registration..."
@python -c "\
import sys; \
sys.path.insert(0, 'src'); \
from ghost_mcp.server import mcp; \
print(f'✅ FastMCP server initialized'); \
print(f' Tools registered: {len([attr for attr in dir(mcp) if not attr.startswith(\"_\")])}+')"
@echo "✅ All tests passed!"
# Running the server
run: check-deps ## Run the Ghost MCP server
@echo "🚀 Starting Ghost MCP server..."
@if [ ! -f .env ]; then \
echo "❌ .env file not found. Run 'make setup-tokens' first"; \
exit 1; \
fi
python -m ghost_mcp.server
dev: check-deps ## Run the Ghost MCP server in development mode with auto-reload
@echo "🚀 Starting Ghost MCP server in development mode..."
@if [ ! -f .env ]; then \
echo "❌ .env file not found. Run 'make setup-tokens' first"; \
exit 1; \
fi
python -m ghost_mcp.server --dev
# Utilities
logs: ## Show Docker container logs
@echo "📋 Showing container logs..."
docker-compose logs -f
status: ## Show status of all components
@echo "📊 Ghost MCP Status"
@echo "=================="
@echo ""
@echo "🐳 Docker Containers:"
@docker-compose ps || echo " No containers running"
@echo ""
@echo "📁 Configuration:"
@if [ -f .env ]; then \
echo " ✅ .env file exists"; \
echo " 📍 Ghost URL: $$(grep GHOST_URL .env | cut -d= -f2)"; \
echo " 🔑 Content API: $$(grep GHOST_CONTENT_API_KEY .env | cut -d= -f2 | cut -c1-10)..."; \
echo " 🔑 Admin API: $$(grep GHOST_ADMIN_API_KEY .env | cut -d= -f2 | cut -c1-10)..."; \
else \
echo " ❌ .env file missing"; \
fi
@echo ""
@echo "🐍 Python Environment:"
@if command -v uv >/dev/null 2>&1; then \
echo " ✅ uv: $$(uv --version)"; \
else \
echo " ❌ uv not installed"; \
fi
@echo " 🐍 Python: $$(python --version)"
@if python -c "import ghost_mcp" 2>/dev/null; then \
echo " ✅ ghost_mcp package installed"; \
else \
echo " ❌ ghost_mcp package not installed"; \
fi
check-deps: ## Check if all dependencies are available
@if [ ! -f .env ]; then \
echo "❌ .env file not found. Run 'make setup-tokens' first"; \
exit 1; \
fi
@if ! python -c "import ghost_mcp" 2>/dev/null; then \
echo "❌ ghost_mcp package not installed. Run 'make install' first"; \
exit 1; \
fi
clean: ## Clean up development environment
@echo "🧹 Cleaning up development environment..."
docker-compose down -v
rm -f .env
@if command -v uv >/dev/null 2>&1; then \
uv clean; \
fi
@echo "✅ Cleanup complete"
# Development workflow
setup: install-uv install start-ghost setup-tokens ## Complete setup from scratch
@echo ""
@echo "🎉 Ghost MCP setup complete!"
@echo ""
@echo "Ready to use:"
@echo " make test # Test the implementation"
@echo " make run # Run the MCP server"
@echo " make status # Check system status"
# Documentation
docs: ## Show important URLs and information
@echo "📚 Ghost MCP Documentation"
@echo "========================="
@echo ""
@echo "🌐 Web Interfaces:"
@echo " Ghost Admin: http://localhost:2368/ghost/"
@echo " Ghost Site: http://localhost:2368/"
@echo " phpMyAdmin: http://localhost:8080/ (if enabled)"
@echo ""
@echo "📁 Important Files:"
@echo " Configuration: .env"
@echo " Project: pyproject.toml"
@echo " Docker: docker-compose.yml"
@echo " Setup Script: scripts/setup-tokens.sh"
@echo ""
@echo "🔧 Development Commands:"
@echo " make setup # Complete initial setup"
@echo " make test # Test functionality"
@echo " make run # Run MCP server"
@echo " make logs # View container logs"
@echo " make status # Check system status"

View file

@ -1,27 +0,0 @@
export default {
preset: 'ts-jest/presets/default-esm',
testEnvironment: 'node',
extensionsToTreatAsEsm: ['.ts'],
moduleNameMapping: {
'^(\\.{1,2}/.*)\\.js$': '$1',
},
transform: {
'^.+\\.ts$': ['ts-jest', {
useESM: true,
}],
},
collectCoverageFrom: [
'src/**/*.ts',
'!src/**/*.test.ts',
'!src/**/*.spec.ts',
'!src/index.ts',
],
coverageDirectory: 'coverage',
coverageReporters: ['text', 'lcov', 'html'],
testMatch: [
'**/__tests__/**/*.ts',
'**/?(*.)+(spec|test).ts',
],
moduleFileExtensions: ['ts', 'js', 'json'],
verbose: true,
};

View file

@ -1,65 +0,0 @@
{
"name": "ghost-mcp",
"version": "0.1.0",
"description": "Ghost MCP Server - Complete Ghost CMS functionality through Model Context Protocol",
"main": "dist/index.js",
"type": "module",
"scripts": {
"build": "tsc",
"dev": "tsx watch src/index.ts",
"start": "node dist/index.js",
"test": "jest",
"test:watch": "jest --watch",
"lint": "eslint src --ext .ts",
"lint:fix": "eslint src --ext .ts --fix",
"type-check": "tsc --noEmit",
"clean": "rimraf dist"
},
"keywords": [
"ghost",
"mcp",
"model-context-protocol",
"cms",
"api",
"blog"
],
"author": "Ghost MCP Development Team",
"license": "MIT",
"dependencies": {
"@modelcontextprotocol/sdk": "^1.0.0",
"axios": "^1.7.0",
"dotenv": "^16.4.0",
"jsonwebtoken": "^9.0.0",
"uuid": "^10.0.0",
"winston": "^3.11.0",
"zod": "^3.23.0"
},
"devDependencies": {
"@types/jest": "^29.5.0",
"@types/jsonwebtoken": "^9.0.0",
"@types/node": "^20.11.0",
"@types/uuid": "^10.0.0",
"@typescript-eslint/eslint-plugin": "^7.0.0",
"@typescript-eslint/parser": "^7.0.0",
"eslint": "^8.57.0",
"eslint-config-prettier": "^9.1.0",
"eslint-plugin-prettier": "^5.1.0",
"jest": "^29.7.0",
"prettier": "^3.2.0",
"rimraf": "^5.0.0",
"ts-jest": "^29.1.0",
"tsx": "^4.7.0",
"typescript": "^5.4.0"
},
"engines": {
"node": ">=18.0.0"
},
"repository": {
"type": "git",
"url": "https://github.com/your-org/ghost-mcp.git"
},
"bugs": {
"url": "https://github.com/your-org/ghost-mcp/issues"
},
"homepage": "https://github.com/your-org/ghost-mcp#readme"
}

76
pyproject.toml Normal file
View file

@ -0,0 +1,76 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "ghost-mcp"
version = "0.1.0"
description = "Ghost CMS MCP server providing comprehensive Ghost API access"
authors = [{name = "Ghost MCP Team"}]
readme = "README.md"
requires-python = ">=3.10"
license = {text = "MIT"}
dependencies = [
"fastmcp>=0.2.0",
"requests>=2.31.0",
"pyjwt>=2.8.0",
"pydantic>=2.5.0",
"python-dotenv>=1.0.0",
"structlog>=23.2.0",
"httpx>=0.25.0",
"typing-extensions>=4.8.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.4.0",
"pytest-asyncio>=0.21.0",
"pytest-cov>=4.1.0",
"black>=23.0.0",
"ruff>=0.1.0",
"mypy>=1.7.0",
"pre-commit>=3.5.0",
"hatchling>=1.21.0",
]
[project.scripts]
ghost-mcp = "ghost_mcp.server:main"
# Package discovery
[tool.hatch.build.targets.wheel]
packages = ["src/ghost_mcp"]
[tool.black]
line-length = 88
target-version = ['py310']
[tool.ruff]
target-version = "py310"
line-length = 88
select = ["E", "F", "W", "I", "N", "UP", "ANN", "S", "BLE", "FBT", "B", "A", "COM", "C4", "DTZ", "T10", "EM", "EXE", "FA", "ISC", "ICN", "G", "INP", "PIE", "T20", "PYI", "PT", "Q", "RSE", "RET", "SLF", "SLOT", "SIM", "TID", "TCH", "INT", "ARG", "PTH", "TD", "FIX", "ERA", "PD", "PGH", "PL", "TRY", "FLY", "NPY", "AIR", "PERF", "FURB", "LOG", "RUF"]
ignore = ["ANN101", "ANN102", "S101", "PLR0913", "PLR0915", "PLR2004"]
[tool.mypy]
python_version = "3.10"
strict = true
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
warn_no_return = true
warn_unreachable = true
strict_equality = true
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py", "*_test.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-v --cov=ghost_mcp --cov-report=html --cov-report=term-missing"
asyncio_mode = "auto"

135
scripts/setup-tokens.sh Executable file
View file

@ -0,0 +1,135 @@
#!/bin/bash
set -e
# Ghost MCP API Token Setup Script
echo "🔑 Setting up Ghost MCP API tokens..."
# Check if Docker containers are running
if ! docker-compose ps | grep -q "ghost-mcp-dev.*Up"; then
echo "❌ Error: Ghost container is not running"
echo "Please run: make start-ghost"
exit 1
fi
if ! docker-compose ps | grep -q "ghost-db-dev.*Up"; then
echo "❌ Error: Ghost database container is not running"
echo "Please run: make start-ghost"
exit 1
fi
echo "✅ Ghost containers are running"
# Wait for Ghost to be fully ready
echo "⏳ Waiting for Ghost API to be ready..."
max_attempts=30
attempt=0
while [ $attempt -lt $max_attempts ]; do
if curl -s "http://localhost:2368/ghost/api/content/" >/dev/null 2>&1; then
break
fi
echo " Attempt $((attempt + 1))/$max_attempts - waiting for Ghost..."
sleep 2
attempt=$((attempt + 1))
done
if [ $attempt -eq $max_attempts ]; then
echo "❌ Error: Ghost API did not become ready in time"
exit 1
fi
echo "✅ Ghost API is ready"
# Extract API keys from database
echo "🔍 Extracting API keys from Ghost database..."
CONTENT_API_KEY=$(docker exec ghost-db-dev mysql -u root -prootpassword ghost_dev -e "SELECT secret FROM api_keys WHERE type='content' LIMIT 1;" 2>/dev/null | tail -n 1)
ADMIN_API_DATA=$(docker exec ghost-db-dev mysql -u root -prootpassword ghost_dev -e "SELECT id, secret FROM api_keys WHERE type='admin' LIMIT 1;" 2>/dev/null | tail -n 1)
if [ -z "$CONTENT_API_KEY" ] || [ "$CONTENT_API_KEY" = "secret" ]; then
echo "❌ Error: Could not retrieve Content API key"
exit 1
fi
if [ -z "$ADMIN_API_DATA" ] || [ "$ADMIN_API_DATA" = "id secret" ]; then
echo "❌ Error: Could not retrieve Admin API key"
exit 1
fi
# Parse admin API data (format: "id secret")
ADMIN_API_ID=$(echo "$ADMIN_API_DATA" | cut -f1)
ADMIN_API_SECRET=$(echo "$ADMIN_API_DATA" | cut -f2)
ADMIN_API_KEY="${ADMIN_API_ID}:${ADMIN_API_SECRET}"
echo "✅ API keys extracted successfully"
echo " Content API Key: ${CONTENT_API_KEY:0:10}..."
echo " Admin API Key: ${ADMIN_API_ID:0:10}:${ADMIN_API_SECRET:0:10}..."
# Create .env file
echo "📝 Creating .env file..."
cat > .env << EOF
# Ghost MCP Server Configuration
# Generated on $(date)
# Ghost instance configuration
GHOST_URL=http://localhost:2368
GHOST_CONTENT_API_KEY=$CONTENT_API_KEY
GHOST_ADMIN_API_KEY=$ADMIN_API_KEY
GHOST_VERSION=v5.0
GHOST_MODE=auto
GHOST_TIMEOUT=30
GHOST_MAX_RETRIES=3
GHOST_RETRY_BACKOFF_FACTOR=2.0
# Logging configuration
LOG_LEVEL=info
LOG_STRUCTURED=true
LOG_REQUEST_ID=true
EOF
echo "✅ .env file created successfully"
# Test the configuration
echo "🧪 Testing API connectivity..."
# Test Content API
echo " Testing Content API..."
CONTENT_TEST=$(curl -s "http://localhost:2368/ghost/api/content/settings/?key=$CONTENT_API_KEY" | grep -o '"title"' || echo "")
if [ -n "$CONTENT_TEST" ]; then
echo " ✅ Content API: Working"
else
echo " ❌ Content API: Failed"
fi
# Test Admin API (using Python to generate JWT and test)
echo " Testing Admin API..."
python3 -c "
import asyncio
import sys
import os
sys.path.insert(0, 'src')
async def test_admin():
try:
from ghost_mcp.client import GhostClient
async with GhostClient() as client:
result = await client._make_request('GET', 'site/', api_type='admin')
print(' ✅ Admin API: Working')
except Exception as e:
print(f' ❌ Admin API: Failed - {e}')
asyncio.run(test_admin())
" 2>/dev/null || echo " ⚠️ Admin API: Could not test (install dependencies first)"
echo ""
echo "🎉 Ghost MCP API tokens setup complete!"
echo ""
echo "Next steps:"
echo "1. Install dependencies: make install"
echo "2. Test the implementation: make test"
echo "3. Run the MCP server: make run"
echo ""
echo "Configuration file: .env"
echo "Ghost admin interface: http://localhost:2368/ghost/"
echo "Ghost public site: http://localhost:2368/"

44
scripts/test-connection.py Executable file
View file

@ -0,0 +1,44 @@
#!/usr/bin/env python3
"""Test Ghost API connectivity script."""
import asyncio
import sys
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from ghost_mcp.client import GhostClient
async def test_connection():
"""Test Ghost API connection."""
print("🧪 Testing Ghost API connection...")
async with GhostClient() as client:
# Test Content API
try:
result = await client._make_request("GET", "settings/", api_type="content")
title = result.get("settings", {}).get("title", "Unknown")
print(f"✅ Content API: Connected to '{title}'")
except Exception as e:
print(f"❌ Content API: {e}")
# Test Admin API
try:
result = await client._make_request("GET", "site/", api_type="admin")
print("✅ Admin API: Connected successfully")
except Exception as e:
print(f"❌ Admin API: {e}")
# Test posts endpoint
try:
result = await client.get_posts(limit=1)
posts_count = len(result.get("posts", []))
print(f"✅ Posts: Found {posts_count} posts")
except Exception as e:
print(f"❌ Posts: {e}")
if __name__ == "__main__":
asyncio.run(test_connection())

67
scripts/test-mcp-tools.py Executable file
View file

@ -0,0 +1,67 @@
#!/usr/bin/env python3
"""Test Ghost MCP tools functionality."""
import asyncio
import json
import sys
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from ghost_mcp.tools.content.posts import get_posts
from ghost_mcp.tools.content.settings import get_settings
from ghost_mcp.server import check_ghost_connection
async def test_mcp_tools():
"""Test MCP tools functionality."""
print("🧪 Testing Ghost MCP tools...")
# Test connection check tool
print("\n1. Testing connection check tool...")
try:
result = await check_ghost_connection()
data = json.loads(result)
print(f"✅ Connection check: {data.get('connection_test', 'unknown')}")
print(f" Ghost URL: {data.get('ghost_url')}")
print(f" Content API: {'' if data.get('content_api_configured') else ''}")
print(f" Admin API: {'' if data.get('admin_api_configured') else ''}")
except Exception as e:
print(f"❌ Connection check failed: {e}")
# Test get settings
print("\n2. Testing get_settings tool...")
try:
result = await get_settings()
data = json.loads(result)
if "settings" in data:
settings = data["settings"]
print(f"✅ Settings: Site title is '{settings.get('title')}'")
print(f" Description: {settings.get('description')}")
print(f" URL: {settings.get('url')}")
else:
print(f"❌ Settings: {data}")
except Exception as e:
print(f"❌ Settings failed: {e}")
# Test get posts
print("\n3. Testing get_posts tool...")
try:
result = await get_posts(limit=2)
data = json.loads(result)
if "posts" in data:
posts = data["posts"]
print(f"✅ Posts: Found {len(posts)} posts")
for post in posts:
print(f" - '{post.get('title')}' (status: {post.get('status')})")
else:
print(f"❌ Posts: {data}")
except Exception as e:
print(f"❌ Posts failed: {e}")
print("\n🎉 MCP tools test completed!")
if __name__ == "__main__":
asyncio.run(test_mcp_tools())

View file

@ -1,44 +0,0 @@
/**
* Ghost Admin API JWT Authentication
*
* Handles JWT token generation and management for Ghost Admin API.
* Implements the authentication flow documented in contracts/admin-api-auth.md
*/
import jwt from 'jsonwebtoken';
// TODO: Phase 1 - Implement Admin API JWT authentication
// - JWT token generation
// - Token caching and refresh
// - Authorization header management
export interface AdminAuthConfig {
apiKey: string; // format: id:secret
ghostUrl: string;
apiVersion: string;
}
export class AdminApiAuth {
private token: string | null = null;
private tokenExpiry: number | null = null;
constructor(private config: AdminAuthConfig) {
// TODO: Phase 1 - Initialize authentication
}
generateToken(): string {
// TODO: Phase 1 - Implement JWT token generation
// Based on contracts/admin-api-auth.md specifications
throw new Error('Not implemented - Phase 1');
}
getValidToken(): string {
// TODO: Phase 1 - Get valid token (generate if expired)
throw new Error('Not implemented - Phase 1');
}
getAuthHeaders(): Record<string, string> {
// TODO: Phase 1 - Return Authorization and other required headers
throw new Error('Not implemented - Phase 1');
}
}

View file

@ -1,33 +0,0 @@
/**
* Ghost Content API Authentication
*
* Handles authentication for Ghost Content API using query parameter method.
* Implements the authentication flow documented in contracts/content-api-auth.md
*/
// TODO: Phase 1 - Implement Content API authentication
// - Query parameter authentication
// - URL building with API key
// - Request header management
export interface ContentAuthConfig {
apiKey: string;
ghostUrl: string;
apiVersion: string;
}
export class ContentApiAuth {
constructor(private config: ContentAuthConfig) {
// TODO: Phase 1 - Initialize authentication
}
buildUrl(endpoint: string, params?: Record<string, unknown>): string {
// TODO: Phase 1 - Build URL with API key and parameters
throw new Error('Not implemented - Phase 1');
}
getHeaders(): Record<string, string> {
// TODO: Phase 1 - Return required headers
throw new Error('Not implemented - Phase 1');
}
}

View file

@ -1,62 +0,0 @@
/**
* Ghost API Client
*
* Unified client for both Ghost Content API (read-only) and Admin API (read/write).
* Handles authentication, request/response processing, and error handling.
*/
// TODO: Phase 1 - Implement this client
// - Content API authentication (query parameter)
// - Admin API JWT authentication
// - Request/response handling
// - Error mapping
// - Retry logic
export interface GhostClientConfig {
ghostUrl: string;
contentApiKey?: string;
adminApiKey?: string;
apiVersion: string;
}
export interface GhostApiResponse<T> {
data: T;
meta?: {
pagination?: {
page: number;
limit: number;
pages: number;
total: number;
next?: number;
prev?: number;
};
};
}
export class GhostClient {
constructor(config: GhostClientConfig) {
// TODO: Phase 1 - Initialize client with configuration
}
// TODO: Phase 1 - Content API methods
async contentGet<T>(endpoint: string, params?: Record<string, unknown>): Promise<GhostApiResponse<T>> {
throw new Error('Not implemented - Phase 1');
}
// TODO: Phase 1 - Admin API methods
async adminGet<T>(endpoint: string, params?: Record<string, unknown>): Promise<GhostApiResponse<T>> {
throw new Error('Not implemented - Phase 1');
}
async adminPost<T>(endpoint: string, data: unknown): Promise<GhostApiResponse<T>> {
throw new Error('Not implemented - Phase 1');
}
async adminPut<T>(endpoint: string, data: unknown): Promise<GhostApiResponse<T>> {
throw new Error('Not implemented - Phase 1');
}
async adminDelete<T>(endpoint: string): Promise<GhostApiResponse<T>> {
throw new Error('Not implemented - Phase 1');
}
}

View file

@ -0,0 +1,5 @@
"""Ghost MCP Server - Comprehensive Ghost CMS integration for MCP."""
__version__ = "0.1.0"
__author__ = "Ghost MCP Team"
__description__ = "Ghost CMS MCP server providing comprehensive Ghost API access"

View file

@ -0,0 +1,6 @@
"""Authentication modules for Ghost API."""
from .admin_auth import AdminAuth
from .content_auth import ContentAuth
__all__ = ["AdminAuth", "ContentAuth"]

View file

@ -0,0 +1,134 @@
"""Admin API authentication using JWT tokens."""
import time
from datetime import datetime, timedelta
from typing import Dict, Optional
import jwt
from ..config import config
from ..types.errors import AuthenticationError
from ..utils.logging import get_logger
logger = get_logger(__name__)
class AdminAuth:
"""Admin API JWT authentication handler."""
def __init__(self, api_key: Optional[str] = None) -> None:
"""Initialize with optional API key override."""
self.api_key = api_key or config.ghost.admin_api_key
self._cached_token: Optional[str] = None
self._token_expires_at: Optional[datetime] = None
if not self.api_key:
logger.warning("No Admin API key configured")
def get_auth_headers(self, request_id: Optional[str] = None) -> Dict[str, str]:
"""Get authentication headers for Admin API requests."""
token = self._get_jwt_token(request_id)
return {"Authorization": f"Ghost {token}"}
def _get_jwt_token(self, request_id: Optional[str] = None) -> str:
"""Get or generate JWT token for Admin API."""
if not self.api_key:
raise AuthenticationError(
"Admin API key not configured",
context="Admin API requires an API key for JWT generation",
request_id=request_id,
)
# Check if we have a valid cached token
if self._cached_token and self._token_expires_at:
if datetime.now() < self._token_expires_at - timedelta(seconds=30):
logger.debug("Using cached JWT token", request_id=request_id)
return self._cached_token
# Generate new token
token = self._generate_jwt_token(request_id)
self._cached_token = token
# JWT tokens expire after 5 minutes
self._token_expires_at = datetime.now() + timedelta(minutes=5)
logger.debug("Generated new JWT token", request_id=request_id)
return token
def _generate_jwt_token(self, request_id: Optional[str] = None) -> str:
"""Generate JWT token for Admin API authentication."""
try:
# Split the admin key (id:secret format)
if ":" not in self.api_key:
raise AuthenticationError(
"Invalid Admin API key format",
context="Admin API key must be in 'id:secret' format",
request_id=request_id,
)
key_id, secret = self.api_key.split(":", 1)
# Current timestamp
now = int(time.time())
# JWT payload
payload = {
"iat": now,
"exp": now + 300, # 5 minutes from now
"aud": "/admin/",
}
# JWT header
header = {"alg": "HS256", "typ": "JWT", "kid": key_id}
# Generate token
token = jwt.encode(
payload,
bytes.fromhex(secret),
algorithm="HS256",
headers=header,
)
return token
except Exception as e:
raise AuthenticationError(
f"Failed to generate JWT token: {e}",
context="JWT token generation failed",
request_id=request_id,
) from e
def is_configured(self) -> bool:
"""Check if Admin API authentication is properly configured."""
return bool(self.api_key and ":" in self.api_key)
def validate_key_format(self) -> bool:
"""Validate the Admin API key format."""
if not self.api_key:
return False
try:
# Admin keys should be in 'id:secret' format
if ":" not in self.api_key:
return False
key_id, secret = self.api_key.split(":", 1)
# Key ID should be 24 character hex string
if len(key_id) != 24 or not all(c in "0123456789abcdef" for c in key_id.lower()):
return False
# Secret should be 64 character hex string
if len(secret) != 64 or not all(c in "0123456789abcdef" for c in secret.lower()):
return False
return True
except Exception:
return False
def invalidate_cache(self) -> None:
"""Invalidate cached JWT token to force regeneration."""
self._cached_token = None
self._token_expires_at = None
logger.debug("JWT token cache invalidated")

View file

@ -0,0 +1,44 @@
"""Content API authentication using query parameter keys."""
from typing import Dict, Optional
from ..config import config
from ..types.errors import AuthenticationError
from ..utils.logging import get_logger
logger = get_logger(__name__)
class ContentAuth:
"""Content API authentication handler."""
def __init__(self, api_key: Optional[str] = None) -> None:
"""Initialize with optional API key override."""
self.api_key = api_key or config.ghost.content_api_key
if not self.api_key:
logger.warning("No Content API key configured")
def get_auth_params(self, request_id: Optional[str] = None) -> Dict[str, str]:
"""Get authentication parameters for Content API requests."""
if not self.api_key:
raise AuthenticationError(
"Content API key not configured",
context="Content API requires an API key",
request_id=request_id,
)
return {"key": self.api_key}
def is_configured(self) -> bool:
"""Check if Content API authentication is properly configured."""
return bool(self.api_key)
def validate_key_format(self) -> bool:
"""Validate the API key format."""
if not self.api_key:
return False
# Content API keys are typically 26 character hex strings
return len(self.api_key) == 26 and all(
c in "0123456789abcdef" for c in self.api_key.lower()
)

300
src/ghost_mcp/client.py Normal file
View file

@ -0,0 +1,300 @@
"""Ghost API client with unified interface for Content and Admin APIs."""
import uuid
from typing import Any, Dict, List, Optional, Union
from urllib.parse import urljoin, urlparse
import httpx
from httpx import Response
from .auth import AdminAuth, ContentAuth
from .config import config
from .types.errors import AuthenticationError, GhostApiError, NetworkError
from .types.ghost import GhostApiResponse, GhostErrorResponse
from .utils.logging import get_logger
from .utils.retry import RetryConfig, with_retry
logger = get_logger(__name__)
class GhostClient:
"""Unified Ghost API client for both Content and Admin APIs."""
def __init__(
self,
base_url: Optional[str] = None,
content_api_key: Optional[str] = None,
admin_api_key: Optional[str] = None,
timeout: Optional[int] = None,
) -> None:
"""Initialize Ghost client with optional configuration overrides."""
self.base_url = base_url or str(config.ghost.url)
self.timeout = timeout or config.ghost.timeout
# Ensure base URL has proper format
if not self.base_url.endswith("/"):
self.base_url += "/"
# Initialize authentication handlers
self.content_auth = ContentAuth(content_api_key)
self.admin_auth = AdminAuth(admin_api_key)
# HTTP client configuration
self.client = httpx.AsyncClient(
timeout=self.timeout,
limits=httpx.Limits(max_keepalive_connections=10, max_connections=100),
)
# Retry configuration
self.retry_config = RetryConfig(
max_retries=config.ghost.max_retries,
base_delay=1.0,
exponential_base=config.ghost.retry_backoff_factor,
)
async def __aenter__(self) -> "GhostClient":
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Async context manager exit."""
await self.close()
async def close(self) -> None:
"""Close the HTTP client."""
await self.client.aclose()
def _build_url(self, endpoint: str, api_type: str = "content") -> str:
"""Build full URL for API endpoint."""
api_base = f"ghost/api/{api_type}/"
return urljoin(self.base_url, api_base + endpoint)
async def _make_request(
self,
method: str,
endpoint: str,
api_type: str = "content",
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None,
files: Optional[Dict[str, Any]] = None,
request_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Make HTTP request to Ghost API with error handling and retries."""
if request_id is None:
request_id = str(uuid.uuid4())
url = self._build_url(endpoint, api_type)
headers = {"User-Agent": "Ghost-MCP/0.1.0"}
# Add authentication
if api_type == "content":
if not self.content_auth.is_configured():
raise AuthenticationError(
"Content API key not configured",
request_id=request_id,
)
if params is None:
params = {}
params.update(self.content_auth.get_auth_params(request_id))
elif api_type == "admin":
if not self.admin_auth.is_configured():
raise AuthenticationError(
"Admin API key not configured",
request_id=request_id,
)
headers.update(self.admin_auth.get_auth_headers(request_id))
# Prepare request data
request_kwargs: Dict[str, Any] = {
"method": method,
"url": url,
"headers": headers,
"params": params,
}
if json_data is not None:
request_kwargs["json"] = json_data
if files is not None:
request_kwargs["files"] = files
logger.info(
"Making Ghost API request",
method=method,
url=url,
api_type=api_type,
request_id=request_id,
)
async def _request() -> Dict[str, Any]:
try:
response: Response = await self.client.request(**request_kwargs)
return await self._handle_response(response, request_id)
except httpx.TimeoutException as e:
raise NetworkError(
f"Request timeout: {e}",
context=f"Timeout after {self.timeout}s",
request_id=request_id,
) from e
except httpx.ConnectError as e:
raise NetworkError(
f"Connection error: {e}",
context=f"Failed to connect to {url}",
request_id=request_id,
) from e
except httpx.HTTPError as e:
raise NetworkError(
f"HTTP error: {e}",
context=f"Request to {url} failed",
request_id=request_id,
) from e
return await with_retry(_request, self.retry_config, request_id)
async def _handle_response(self, response: Response, request_id: str) -> Dict[str, Any]:
"""Handle HTTP response and convert to appropriate format or raise errors."""
logger.debug(
"Received Ghost API response",
status_code=response.status_code,
headers=dict(response.headers),
request_id=request_id,
)
# Check for successful response
if response.status_code < 400:
try:
data = response.json()
logger.debug("Successfully parsed response JSON", request_id=request_id)
return data
except Exception as e:
raise GhostApiError(
f"Failed to parse response JSON: {e}",
context="Invalid JSON response from Ghost API",
request_id=request_id,
) from e
# Handle error responses
try:
error_data = response.json()
error_response = GhostErrorResponse(**error_data)
# Get first error for primary error info
first_error = error_response.errors[0] if error_response.errors else None
error_message = first_error.message if first_error else "Unknown Ghost API error"
error_code = first_error.code if first_error else None
raise GhostApiError(
error_message,
code=error_code,
context=f"HTTP {response.status_code}",
request_id=request_id,
)
except Exception as e:
if isinstance(e, GhostApiError):
raise
# Fallback for non-JSON error responses
raise GhostApiError(
f"HTTP {response.status_code}: {response.text}",
context="Non-JSON error response",
request_id=request_id,
) from e
# Content API methods
async def get_posts(
self,
limit: Optional[int] = None,
page: Optional[int] = None,
filter: Optional[str] = None,
include: Optional[str] = None,
fields: Optional[str] = None,
order: Optional[str] = None,
request_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Get posts from Content API."""
params = {}
if limit is not None:
params["limit"] = limit
if page is not None:
params["page"] = page
if filter is not None:
params["filter"] = filter
if include is not None:
params["include"] = include
if fields is not None:
params["fields"] = fields
if order is not None:
params["order"] = order
return await self._make_request(
"GET", "posts/", api_type="content", params=params, request_id=request_id
)
async def get_post_by_id(
self,
post_id: str,
include: Optional[str] = None,
fields: Optional[str] = None,
request_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Get single post by ID from Content API."""
params = {}
if include is not None:
params["include"] = include
if fields is not None:
params["fields"] = fields
return await self._make_request(
"GET", f"posts/{post_id}/", api_type="content", params=params, request_id=request_id
)
async def get_post_by_slug(
self,
slug: str,
include: Optional[str] = None,
fields: Optional[str] = None,
request_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Get single post by slug from Content API."""
params = {}
if include is not None:
params["include"] = include
if fields is not None:
params["fields"] = fields
return await self._make_request(
"GET", f"posts/slug/{slug}/", api_type="content", params=params, request_id=request_id
)
# Admin API methods (examples)
async def create_post(
self,
post_data: Dict[str, Any],
request_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Create new post via Admin API."""
return await self._make_request(
"POST", "posts/", api_type="admin", json_data=post_data, request_id=request_id
)
async def update_post(
self,
post_id: str,
post_data: Dict[str, Any],
request_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Update existing post via Admin API."""
return await self._make_request(
"PUT", f"posts/{post_id}/", api_type="admin", json_data=post_data, request_id=request_id
)
async def delete_post(
self,
post_id: str,
request_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Delete post via Admin API."""
return await self._make_request(
"DELETE", f"posts/{post_id}/", api_type="admin", request_id=request_id
)

81
src/ghost_mcp/config.py Normal file
View file

@ -0,0 +1,81 @@
"""Configuration management with environment variable precedence."""
import os
from enum import Enum
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv
from pydantic import BaseModel, Field, HttpUrl
class LogLevel(str, Enum):
"""Logging levels."""
DEBUG = "debug"
INFO = "info"
WARNING = "warning"
ERROR = "error"
class GhostMode(str, Enum):
"""Ghost API mode."""
READONLY = "readonly"
READWRITE = "readwrite"
AUTO = "auto"
class LoggingConfig(BaseModel):
"""Logging configuration."""
level: LogLevel = LogLevel.INFO
structured: bool = True
request_id: bool = True
class GhostConfig(BaseModel):
"""Ghost API configuration."""
url: HttpUrl = Field(default="http://localhost:2368")
content_api_key: Optional[str] = None
admin_api_key: Optional[str] = None
version: str = "v5.0"
mode: GhostMode = GhostMode.AUTO
timeout: int = 30
max_retries: int = 3
retry_backoff_factor: float = 2.0
class Config(BaseModel):
"""Main configuration class."""
ghost: GhostConfig = Field(default_factory=GhostConfig)
logging: LoggingConfig = Field(default_factory=LoggingConfig)
@classmethod
def load(cls, env_file: Optional[str] = None) -> "Config":
"""Load configuration with precedence: env vars > .env file > defaults."""
# Load .env file if specified or if .env exists
env_path = Path(env_file) if env_file else Path(".env")
if env_path.exists():
load_dotenv(env_path)
# Create config with environment variables taking precedence
ghost_config = GhostConfig(
url=os.getenv("GHOST_URL", "http://localhost:2368"),
content_api_key=os.getenv("GHOST_CONTENT_API_KEY"),
admin_api_key=os.getenv("GHOST_ADMIN_API_KEY"),
version=os.getenv("GHOST_VERSION", "v5.0"),
mode=GhostMode(os.getenv("GHOST_MODE", "auto")),
timeout=int(os.getenv("GHOST_TIMEOUT", "30")),
max_retries=int(os.getenv("GHOST_MAX_RETRIES", "3")),
retry_backoff_factor=float(os.getenv("GHOST_RETRY_BACKOFF_FACTOR", "2.0")),
)
logging_config = LoggingConfig(
level=LogLevel(os.getenv("LOG_LEVEL", "info")),
structured=os.getenv("LOG_STRUCTURED", "true").lower() == "true",
request_id=os.getenv("LOG_REQUEST_ID", "true").lower() == "true",
)
return cls(ghost=ghost_config, logging=logging_config)
# Global configuration instance
config = Config.load()

107
src/ghost_mcp/server.py Normal file
View file

@ -0,0 +1,107 @@
"""Ghost MCP server entry point."""
import asyncio
from typing import Any
from fastmcp import FastMCP
from .config import config
from .tools import register_admin_tools, register_content_tools
from .utils.logging import setup_logging, get_logger
# Set up logging
setup_logging()
logger = get_logger(__name__)
# Create FastMCP server
mcp = FastMCP("Ghost MCP Server")
@mcp.tool()
async def check_ghost_connection() -> str:
"""
Check connection to Ghost instance and API key configuration.
Returns:
JSON string with connection status and configuration info
"""
import json
from .client import GhostClient
status = {
"ghost_url": str(config.ghost.url),
"content_api_configured": bool(config.ghost.content_api_key),
"admin_api_configured": bool(config.ghost.admin_api_key),
"mode": config.ghost.mode.value,
"connection_test": "pending",
}
try:
async with GhostClient() as client:
# Test Content API if configured
if client.content_auth.is_configured():
try:
await client._make_request("GET", "settings/", api_type="content")
status["content_api_status"] = "connected"
except Exception as e:
status["content_api_status"] = f"error: {e}"
# Test Admin API if configured
if client.admin_auth.is_configured():
try:
await client._make_request("GET", "site/", api_type="admin")
status["admin_api_status"] = "connected"
except Exception as e:
status["admin_api_status"] = f"error: {e}"
status["connection_test"] = "completed"
except Exception as e:
status["connection_test"] = f"failed: {e}"
return json.dumps(status, indent=2)
def register_tools() -> None:
"""Register all MCP tools based on configuration."""
logger.info("Registering MCP tools", mode=config.ghost.mode.value)
# Always register Content API tools (read-only)
if config.ghost.content_api_key:
logger.info("Registering Content API tools")
register_content_tools(mcp)
else:
logger.warning("Content API key not configured - Content tools not available")
# Register Admin API tools based on mode and configuration
if config.ghost.mode.value in ["readwrite", "auto"]:
if config.ghost.admin_api_key:
logger.info("Registering Admin API tools")
register_admin_tools(mcp)
elif config.ghost.mode.value == "readwrite":
logger.warning("Admin API key not configured - Admin tools not available in readwrite mode")
else:
logger.info("Admin API key not configured - running in read-only mode")
else:
logger.info("Running in read-only mode - Admin tools not registered")
def main() -> None:
"""Main entry point for the Ghost MCP server."""
logger.info(
"Starting Ghost MCP server",
ghost_url=str(config.ghost.url),
mode=config.ghost.mode.value,
content_api_configured=bool(config.ghost.content_api_key),
admin_api_configured=bool(config.ghost.admin_api_key),
)
# Register tools
register_tools()
# Run the MCP server
mcp.run()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,11 @@
"""MCP tools for Ghost API access."""
from .content import *
from .admin import *
__all__ = [
# Content API tools
"register_content_tools",
# Admin API tools
"register_admin_tools",
]

View file

@ -0,0 +1,25 @@
"""Admin API tools for read/write access to Ghost content."""
from fastmcp import FastMCP
from .posts import register_admin_post_tools
from .pages import register_admin_page_tools
from .tags import register_admin_tag_tools
from .authors import register_admin_author_tools
from .members import register_admin_member_tools
from .settings import register_admin_settings_tools
from .media import register_admin_media_tools
def register_admin_tools(mcp: FastMCP) -> None:
"""Register all Admin API tools."""
register_admin_post_tools(mcp)
register_admin_page_tools(mcp)
register_admin_tag_tools(mcp)
register_admin_author_tools(mcp)
register_admin_member_tools(mcp)
register_admin_settings_tools(mcp)
register_admin_media_tools(mcp)
__all__ = ["register_admin_tools"]

View file

@ -0,0 +1,10 @@
"""Admin API tools for authors management."""
from fastmcp import FastMCP
def register_admin_author_tools(mcp: FastMCP) -> None:
"""Register author management Admin API tools."""
# Author management tools would go here
# These are typically more complex due to user permissions
pass

View file

@ -0,0 +1,9 @@
"""Admin API tools for media management."""
from fastmcp import FastMCP
def register_admin_media_tools(mcp: FastMCP) -> None:
"""Register media management Admin API tools."""
# Media upload and management tools would go here
pass

View file

@ -0,0 +1,9 @@
"""Admin API tools for members management."""
from fastmcp import FastMCP
def register_admin_member_tools(mcp: FastMCP) -> None:
"""Register member management Admin API tools."""
# Member management tools would go here
pass

View file

@ -0,0 +1,56 @@
"""Admin API tools for pages management."""
import json
from typing import Any, Dict, Optional
from fastmcp import FastMCP
from ...client import GhostClient
from ...utils.validation import validate_id_parameter
def register_admin_page_tools(mcp: FastMCP) -> None:
"""Register page management Admin API tools."""
@mcp.tool()
async def create_page(
title: str,
content: Optional[str] = None,
content_format: str = "lexical",
status: str = "draft",
slug: Optional[str] = None,
) -> str:
"""Create a new page via Ghost Admin API."""
if not title or not title.strip():
return json.dumps({"error": "Title is required"})
try:
page_data: Dict[str, Any] = {
"pages": [{
"title": title.strip(),
"status": status,
}]
}
page = page_data["pages"][0]
if content:
if content_format == "html":
page["html"] = content
elif content_format == "lexical":
page["lexical"] = content
if slug:
page["slug"] = slug
async with GhostClient() as client:
result = await client._make_request(
method="POST",
endpoint="pages/",
api_type="admin",
json_data=page_data,
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})

View file

@ -0,0 +1,247 @@
"""Admin API tools for posts management."""
import json
from typing import Any, Dict, Optional
from fastmcp import FastMCP
from ...client import GhostClient
from ...utils.validation import validate_id_parameter
def register_admin_post_tools(mcp: FastMCP) -> None:
"""Register post management Admin API tools."""
@mcp.tool()
async def create_post(
title: str,
content: Optional[str] = None,
content_format: str = "lexical",
status: str = "draft",
slug: Optional[str] = None,
excerpt: Optional[str] = None,
featured: bool = False,
tags: Optional[str] = None,
authors: Optional[str] = None,
published_at: Optional[str] = None,
meta_title: Optional[str] = None,
meta_description: Optional[str] = None,
) -> str:
"""
Create a new post via Ghost Admin API.
Args:
title: Post title (required)
content: Post content (HTML or Lexical JSON)
content_format: Content format ('html', 'lexical', default: 'lexical')
status: Post status ('draft', 'published', 'scheduled', default: 'draft')
slug: Post slug (auto-generated if not provided)
excerpt: Custom excerpt
featured: Whether post is featured (default: False)
tags: Comma-separated tag names or IDs
authors: Comma-separated author names or IDs
published_at: Publish date (ISO format, for scheduled posts)
meta_title: SEO meta title
meta_description: SEO meta description
Returns:
JSON string containing created post data
"""
if not title or not title.strip():
return json.dumps({"error": "Title is required"})
try:
# Build post data
post_data: Dict[str, Any] = {
"posts": [{
"title": title.strip(),
"status": status,
"featured": featured,
}]
}
post = post_data["posts"][0]
# Add content in appropriate format
if content:
if content_format == "html":
post["html"] = content
elif content_format == "lexical":
# Assume content is already Lexical JSON string
post["lexical"] = content
else:
return json.dumps({"error": "Content format must be 'html' or 'lexical'"})
# Add optional fields
if slug:
post["slug"] = slug
if excerpt:
post["custom_excerpt"] = excerpt
if published_at:
post["published_at"] = published_at
if meta_title:
post["meta_title"] = meta_title
if meta_description:
post["meta_description"] = meta_description
# Handle tags (simplified - in real implementation would resolve tag names to IDs)
if tags:
tag_list = [{"name": tag.strip()} for tag in tags.split(",") if tag.strip()]
if tag_list:
post["tags"] = tag_list
# Handle authors (simplified)
if authors:
author_list = [{"name": author.strip()} for author in authors.split(",") if author.strip()]
if author_list:
post["authors"] = author_list
async with GhostClient() as client:
result = await client.create_post(post_data)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def update_post(
post_id: str,
title: Optional[str] = None,
content: Optional[str] = None,
content_format: str = "lexical",
status: Optional[str] = None,
slug: Optional[str] = None,
excerpt: Optional[str] = None,
featured: Optional[bool] = None,
published_at: Optional[str] = None,
meta_title: Optional[str] = None,
meta_description: Optional[str] = None,
) -> str:
"""
Update an existing post via Ghost Admin API.
Args:
post_id: Post ID to update (required)
title: New post title
content: New post content (HTML or Lexical JSON)
content_format: Content format ('html', 'lexical', default: 'lexical')
status: New post status ('draft', 'published', 'scheduled')
slug: New post slug
excerpt: New custom excerpt
featured: Whether post is featured
published_at: New publish date (ISO format)
meta_title: New SEO meta title
meta_description: New SEO meta description
Returns:
JSON string containing updated post data
"""
try:
post_id = validate_id_parameter(post_id, "post_id")
# Build update data with only provided fields
post_data: Dict[str, Any] = {"posts": [{}]}
post = post_data["posts"][0]
if title is not None:
post["title"] = title.strip()
if status is not None:
post["status"] = status
if slug is not None:
post["slug"] = slug
if excerpt is not None:
post["custom_excerpt"] = excerpt
if featured is not None:
post["featured"] = featured
if published_at is not None:
post["published_at"] = published_at
if meta_title is not None:
post["meta_title"] = meta_title
if meta_description is not None:
post["meta_description"] = meta_description
# Add content in appropriate format
if content is not None:
if content_format == "html":
post["html"] = content
elif content_format == "lexical":
post["lexical"] = content
else:
return json.dumps({"error": "Content format must be 'html' or 'lexical'"})
# Must have at least one field to update
if not post:
return json.dumps({"error": "At least one field must be provided for update"})
async with GhostClient() as client:
result = await client.update_post(post_id, post_data)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def delete_post(post_id: str) -> str:
"""
Delete a post via Ghost Admin API.
Args:
post_id: Post ID to delete (required)
Returns:
JSON string containing deletion confirmation
"""
try:
post_id = validate_id_parameter(post_id, "post_id")
async with GhostClient() as client:
result = await client.delete_post(post_id)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def get_admin_posts(
limit: Optional[int] = None,
page: Optional[int] = None,
filter: Optional[str] = None,
include: Optional[str] = None,
fields: Optional[str] = None,
order: Optional[str] = None,
) -> str:
"""
Get posts from Ghost Admin API (includes drafts and all statuses).
Args:
limit: Number of posts to return (1-50, default: 15)
page: Page number for pagination (default: 1)
filter: Ghost filter syntax for filtering posts
include: Comma-separated list of fields to include (tags, authors, etc.)
fields: Comma-separated list of fields to return
order: Order of posts (published_at desc, etc.)
Returns:
JSON string containing posts data with metadata
"""
try:
async with GhostClient() as client:
result = await client._make_request(
method="GET",
endpoint="posts/",
api_type="admin",
params={
k: v for k, v in {
"limit": limit,
"page": page,
"filter": filter,
"include": include,
"fields": fields,
"order": order,
}.items() if v is not None
}
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})

View file

@ -0,0 +1,9 @@
"""Admin API tools for settings management."""
from fastmcp import FastMCP
def register_admin_settings_tools(mcp: FastMCP) -> None:
"""Register settings management Admin API tools."""
# Settings management tools would go here
pass

View file

@ -0,0 +1,38 @@
"""Admin API tools for tags management."""
import json
from typing import Any, Dict
from fastmcp import FastMCP
from ...client import GhostClient
def register_admin_tag_tools(mcp: FastMCP) -> None:
"""Register tag management Admin API tools."""
@mcp.tool()
async def create_tag(name: str, description: str = "") -> str:
"""Create a new tag via Ghost Admin API."""
if not name or not name.strip():
return json.dumps({"error": "Tag name is required"})
try:
tag_data: Dict[str, Any] = {
"tags": [{
"name": name.strip(),
"description": description,
}]
}
async with GhostClient() as client:
result = await client._make_request(
method="POST",
endpoint="tags/",
api_type="admin",
json_data=tag_data,
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})

View file

@ -0,0 +1,19 @@
"""Content API tools for read-only access to Ghost content."""
from fastmcp import FastMCP
from .posts import register_post_tools
from .pages import register_page_tools
from .tags import register_tag_tools
from .authors import register_author_tools
from .settings import register_settings_tools
def register_content_tools(mcp: FastMCP) -> None:
"""Register all Content API tools."""
register_post_tools(mcp)
register_page_tools(mcp)
register_tag_tools(mcp)
register_author_tools(mcp)
register_settings_tools(mcp)
__all__ = ["register_content_tools"]

View file

@ -0,0 +1,142 @@
"""Content API tools for authors."""
import json
from typing import Optional
from fastmcp import FastMCP
from ...client import GhostClient
from ...utils.validation import validate_filter_syntax, validate_id_parameter, validate_slug_parameter
def register_author_tools(mcp: FastMCP) -> None:
"""Register author-related Content API tools."""
@mcp.tool()
async def get_authors(
limit: Optional[int] = None,
page: Optional[int] = None,
filter: Optional[str] = None,
include: Optional[str] = None,
fields: Optional[str] = None,
order: Optional[str] = None,
) -> str:
"""
Get authors from Ghost Content API.
Args:
limit: Number of authors to return (1-50, default: 15)
page: Page number for pagination (default: 1)
filter: Ghost filter syntax for filtering authors
include: Comma-separated list of fields to include (count.posts, etc.)
fields: Comma-separated list of fields to return
order: Order of authors (name asc, count.posts desc, etc.)
Returns:
JSON string containing authors data with metadata
"""
# Validate parameters
if limit is not None and (limit < 1 or limit > 50):
return json.dumps({"error": "Limit must be between 1 and 50"})
if page is not None and page < 1:
return json.dumps({"error": "Page must be 1 or greater"})
if filter and not validate_filter_syntax(filter):
return json.dumps({"error": "Invalid filter syntax"})
try:
async with GhostClient() as client:
result = await client._make_request(
method="GET",
endpoint="authors/",
api_type="content",
params={
k: v for k, v in {
"limit": limit,
"page": page,
"filter": filter,
"include": include,
"fields": fields,
"order": order,
}.items() if v is not None
}
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def get_author_by_id(
author_id: str,
include: Optional[str] = None,
fields: Optional[str] = None,
) -> str:
"""
Get a single author by ID from Ghost Content API.
Args:
author_id: The author ID
include: Comma-separated list of fields to include (count.posts, etc.)
fields: Comma-separated list of fields to return
Returns:
JSON string containing author data
"""
try:
author_id = validate_id_parameter(author_id, "author_id")
async with GhostClient() as client:
result = await client._make_request(
method="GET",
endpoint=f"authors/{author_id}/",
api_type="content",
params={
k: v for k, v in {
"include": include,
"fields": fields,
}.items() if v is not None
}
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def get_author_by_slug(
slug: str,
include: Optional[str] = None,
fields: Optional[str] = None,
) -> str:
"""
Get a single author by slug from Ghost Content API.
Args:
slug: The author slug
include: Comma-separated list of fields to include (count.posts, etc.)
fields: Comma-separated list of fields to return
Returns:
JSON string containing author data
"""
try:
slug = validate_slug_parameter(slug)
async with GhostClient() as client:
result = await client._make_request(
method="GET",
endpoint=f"authors/slug/{slug}/",
api_type="content",
params={
k: v for k, v in {
"include": include,
"fields": fields,
}.items() if v is not None
}
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})

View file

@ -0,0 +1,142 @@
"""Content API tools for pages."""
import json
from typing import Optional
from fastmcp import FastMCP
from ...client import GhostClient
from ...utils.validation import validate_filter_syntax, validate_id_parameter, validate_slug_parameter
def register_page_tools(mcp: FastMCP) -> None:
"""Register page-related Content API tools."""
@mcp.tool()
async def get_pages(
limit: Optional[int] = None,
page: Optional[int] = None,
filter: Optional[str] = None,
include: Optional[str] = None,
fields: Optional[str] = None,
order: Optional[str] = None,
) -> str:
"""
Get published pages from Ghost Content API.
Args:
limit: Number of pages to return (1-50, default: 15)
page: Page number for pagination (default: 1)
filter: Ghost filter syntax for filtering pages
include: Comma-separated list of fields to include (tags, authors, etc.)
fields: Comma-separated list of fields to return
order: Order of pages (published_at desc, etc.)
Returns:
JSON string containing pages data with metadata
"""
# Validate parameters
if limit is not None and (limit < 1 or limit > 50):
return json.dumps({"error": "Limit must be between 1 and 50"})
if page is not None and page < 1:
return json.dumps({"error": "Page must be 1 or greater"})
if filter and not validate_filter_syntax(filter):
return json.dumps({"error": "Invalid filter syntax"})
try:
async with GhostClient() as client:
result = await client._make_request(
method="GET",
endpoint="pages/",
api_type="content",
params={
k: v for k, v in {
"limit": limit,
"page": page,
"filter": filter,
"include": include,
"fields": fields,
"order": order,
}.items() if v is not None
}
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def get_page_by_id(
page_id: str,
include: Optional[str] = None,
fields: Optional[str] = None,
) -> str:
"""
Get a single page by ID from Ghost Content API.
Args:
page_id: The page ID
include: Comma-separated list of fields to include (tags, authors, etc.)
fields: Comma-separated list of fields to return
Returns:
JSON string containing page data
"""
try:
page_id = validate_id_parameter(page_id, "page_id")
async with GhostClient() as client:
result = await client._make_request(
method="GET",
endpoint=f"pages/{page_id}/",
api_type="content",
params={
k: v for k, v in {
"include": include,
"fields": fields,
}.items() if v is not None
}
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def get_page_by_slug(
slug: str,
include: Optional[str] = None,
fields: Optional[str] = None,
) -> str:
"""
Get a single page by slug from Ghost Content API.
Args:
slug: The page slug
include: Comma-separated list of fields to include (tags, authors, etc.)
fields: Comma-separated list of fields to return
Returns:
JSON string containing page data
"""
try:
slug = validate_slug_parameter(slug)
async with GhostClient() as client:
result = await client._make_request(
method="GET",
endpoint=f"pages/slug/{slug}/",
api_type="content",
params={
k: v for k, v in {
"include": include,
"fields": fields,
}.items() if v is not None
}
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})

View file

@ -0,0 +1,161 @@
"""Content API tools for posts."""
import json
from typing import Any, Dict, Optional
from fastmcp import FastMCP
from ...client import GhostClient
from ...utils.validation import validate_filter_syntax, validate_id_parameter, validate_slug_parameter
def register_post_tools(mcp: FastMCP) -> None:
"""Register post-related Content API tools."""
@mcp.tool()
async def get_posts(
limit: Optional[int] = None,
page: Optional[int] = None,
filter: Optional[str] = None,
include: Optional[str] = None,
fields: Optional[str] = None,
order: Optional[str] = None,
) -> str:
"""
Get published posts from Ghost Content API.
Args:
limit: Number of posts to return (1-50, default: 15)
page: Page number for pagination (default: 1)
filter: Ghost filter syntax for filtering posts
include: Comma-separated list of fields to include (tags, authors, etc.)
fields: Comma-separated list of fields to return
order: Order of posts (published_at desc, etc.)
Returns:
JSON string containing posts data with metadata
"""
# Validate parameters
if limit is not None and (limit < 1 or limit > 50):
return json.dumps({"error": "Limit must be between 1 and 50"})
if page is not None and page < 1:
return json.dumps({"error": "Page must be 1 or greater"})
if filter and not validate_filter_syntax(filter):
return json.dumps({"error": "Invalid filter syntax"})
try:
async with GhostClient() as client:
result = await client.get_posts(
limit=limit,
page=page,
filter=filter,
include=include,
fields=fields,
order=order,
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def get_post_by_id(
post_id: str,
include: Optional[str] = None,
fields: Optional[str] = None,
) -> str:
"""
Get a single post by ID from Ghost Content API.
Args:
post_id: The post ID
include: Comma-separated list of fields to include (tags, authors, etc.)
fields: Comma-separated list of fields to return
Returns:
JSON string containing post data
"""
try:
post_id = validate_id_parameter(post_id, "post_id")
async with GhostClient() as client:
result = await client.get_post_by_id(
post_id=post_id,
include=include,
fields=fields,
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def get_post_by_slug(
slug: str,
include: Optional[str] = None,
fields: Optional[str] = None,
) -> str:
"""
Get a single post by slug from Ghost Content API.
Args:
slug: The post slug
include: Comma-separated list of fields to include (tags, authors, etc.)
fields: Comma-separated list of fields to return
Returns:
JSON string containing post data
"""
try:
slug = validate_slug_parameter(slug)
async with GhostClient() as client:
result = await client.get_post_by_slug(
slug=slug,
include=include,
fields=fields,
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def search_posts(
query: str,
limit: Optional[int] = None,
include: Optional[str] = None,
) -> str:
"""
Search posts by title and content.
Args:
query: Search query string
limit: Number of results to return (1-50, default: 15)
include: Comma-separated list of fields to include
Returns:
JSON string containing matching posts
"""
if not query or not query.strip():
return json.dumps({"error": "Query parameter is required"})
if limit is not None and (limit < 1 or limit > 50):
return json.dumps({"error": "Limit must be between 1 and 50"})
try:
# Use Ghost's filter syntax for searching
search_filter = f"title:~'{query}',plaintext:~'{query}'"
async with GhostClient() as client:
result = await client.get_posts(
limit=limit,
filter=search_filter,
include=include,
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})

View file

@ -0,0 +1,70 @@
"""Content API tools for settings."""
import json
from typing import Optional
from fastmcp import FastMCP
from ...client import GhostClient
def register_settings_tools(mcp: FastMCP) -> None:
"""Register settings-related Content API tools."""
@mcp.tool()
async def get_settings() -> str:
"""
Get public settings from Ghost Content API.
Returns:
JSON string containing public settings data
"""
try:
async with GhostClient() as client:
result = await client._make_request(
method="GET",
endpoint="settings/",
api_type="content",
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def get_site_info() -> str:
"""
Get basic site information from Ghost.
Returns:
JSON string containing site title, description, URL, and other public info
"""
try:
async with GhostClient() as client:
result = await client._make_request(
method="GET",
endpoint="settings/",
api_type="content",
)
# Extract key site information
if "settings" in result:
settings = result["settings"]
site_info = {
"title": settings.get("title"),
"description": settings.get("description"),
"url": settings.get("url"),
"logo": settings.get("logo"),
"icon": settings.get("icon"),
"cover_image": settings.get("cover_image"),
"accent_color": settings.get("accent_color"),
"timezone": settings.get("timezone"),
"lang": settings.get("lang"),
"version": settings.get("version"),
}
return json.dumps({"site_info": site_info}, indent=2, default=str)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})

View file

@ -0,0 +1,142 @@
"""Content API tools for tags."""
import json
from typing import Optional
from fastmcp import FastMCP
from ...client import GhostClient
from ...utils.validation import validate_filter_syntax, validate_id_parameter, validate_slug_parameter
def register_tag_tools(mcp: FastMCP) -> None:
"""Register tag-related Content API tools."""
@mcp.tool()
async def get_tags(
limit: Optional[int] = None,
page: Optional[int] = None,
filter: Optional[str] = None,
include: Optional[str] = None,
fields: Optional[str] = None,
order: Optional[str] = None,
) -> str:
"""
Get tags from Ghost Content API.
Args:
limit: Number of tags to return (1-50, default: 15)
page: Page number for pagination (default: 1)
filter: Ghost filter syntax for filtering tags
include: Comma-separated list of fields to include (count.posts, etc.)
fields: Comma-separated list of fields to return
order: Order of tags (name asc, count.posts desc, etc.)
Returns:
JSON string containing tags data with metadata
"""
# Validate parameters
if limit is not None and (limit < 1 or limit > 50):
return json.dumps({"error": "Limit must be between 1 and 50"})
if page is not None and page < 1:
return json.dumps({"error": "Page must be 1 or greater"})
if filter and not validate_filter_syntax(filter):
return json.dumps({"error": "Invalid filter syntax"})
try:
async with GhostClient() as client:
result = await client._make_request(
method="GET",
endpoint="tags/",
api_type="content",
params={
k: v for k, v in {
"limit": limit,
"page": page,
"filter": filter,
"include": include,
"fields": fields,
"order": order,
}.items() if v is not None
}
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def get_tag_by_id(
tag_id: str,
include: Optional[str] = None,
fields: Optional[str] = None,
) -> str:
"""
Get a single tag by ID from Ghost Content API.
Args:
tag_id: The tag ID
include: Comma-separated list of fields to include (count.posts, etc.)
fields: Comma-separated list of fields to return
Returns:
JSON string containing tag data
"""
try:
tag_id = validate_id_parameter(tag_id, "tag_id")
async with GhostClient() as client:
result = await client._make_request(
method="GET",
endpoint=f"tags/{tag_id}/",
api_type="content",
params={
k: v for k, v in {
"include": include,
"fields": fields,
}.items() if v is not None
}
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def get_tag_by_slug(
slug: str,
include: Optional[str] = None,
fields: Optional[str] = None,
) -> str:
"""
Get a single tag by slug from Ghost Content API.
Args:
slug: The tag slug
include: Comma-separated list of fields to include (count.posts, etc.)
fields: Comma-separated list of fields to return
Returns:
JSON string containing tag data
"""
try:
slug = validate_slug_parameter(slug)
async with GhostClient() as client:
result = await client._make_request(
method="GET",
endpoint=f"tags/slug/{slug}/",
api_type="content",
params={
k: v for k, v in {
"include": include,
"fields": fields,
}.items() if v is not None
}
)
return json.dumps(result, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})

View file

@ -0,0 +1,30 @@
"""Type definitions for Ghost MCP server."""
from .errors import *
from .ghost import *
from .mcp import *
__all__ = [
# Error types
"GhostMCPError",
"NetworkError",
"AuthenticationError",
"GhostApiError",
"ValidationError",
"ErrorCategory",
# Ghost types
"GhostPost",
"GhostPage",
"GhostTag",
"GhostAuthor",
"GhostMember",
"GhostSettings",
"GhostApiResponse",
"GhostErrorResponse",
"ContentFormat",
# MCP types
"MCPToolDefinition",
"MCPToolResponse",
"MCPToolRequest",
"MCPError",
]

View file

@ -0,0 +1,128 @@
"""Error types and categories for Ghost MCP server."""
import uuid
from enum import Enum
from typing import Any, Dict, Optional
class ErrorCategory(str, Enum):
"""Error categories for comprehensive error handling."""
NETWORK = "NETWORK"
AUTHENTICATION = "AUTHENTICATION"
GHOST_API = "GHOST_API"
MCP_PROTOCOL = "MCP_PROTOCOL"
FILE_UPLOAD = "FILE_UPLOAD"
VALIDATION = "VALIDATION"
class GhostMCPError(Exception):
"""Base error class for Ghost MCP server."""
def __init__(
self,
message: str,
category: ErrorCategory,
code: Optional[str] = None,
context: Optional[str] = None,
request_id: Optional[str] = None,
) -> None:
super().__init__(message)
self.id = str(uuid.uuid4())
self.category = category
self.code = code
self.context = context
self.request_id = request_id
def to_dict(self) -> Dict[str, Any]:
"""Convert error to dictionary for logging."""
return {
"id": self.id,
"message": str(self),
"category": self.category.value,
"code": self.code,
"context": self.context,
"request_id": self.request_id,
}
class NetworkError(GhostMCPError):
"""Network-related errors."""
def __init__(
self,
message: str,
context: Optional[str] = None,
request_id: Optional[str] = None,
) -> None:
super().__init__(
message, ErrorCategory.NETWORK, "NETWORK_ERROR", context, request_id
)
class AuthenticationError(GhostMCPError):
"""Authentication-related errors."""
def __init__(
self,
message: str,
context: Optional[str] = None,
request_id: Optional[str] = None,
) -> None:
super().__init__(
message, ErrorCategory.AUTHENTICATION, "AUTH_ERROR", context, request_id
)
class GhostApiError(GhostMCPError):
"""Ghost API-related errors."""
def __init__(
self,
message: str,
code: Optional[str] = None,
context: Optional[str] = None,
request_id: Optional[str] = None,
) -> None:
super().__init__(message, ErrorCategory.GHOST_API, code, context, request_id)
class ValidationError(GhostMCPError):
"""Validation-related errors."""
def __init__(
self,
message: str,
context: Optional[str] = None,
request_id: Optional[str] = None,
) -> None:
super().__init__(
message, ErrorCategory.VALIDATION, "VALIDATION_ERROR", context, request_id
)
class FileUploadError(GhostMCPError):
"""File upload-related errors."""
def __init__(
self,
message: str,
context: Optional[str] = None,
request_id: Optional[str] = None,
) -> None:
super().__init__(
message, ErrorCategory.FILE_UPLOAD, "FILE_UPLOAD_ERROR", context, request_id
)
class MCPProtocolError(GhostMCPError):
"""MCP protocol-related errors."""
def __init__(
self,
message: str,
context: Optional[str] = None,
request_id: Optional[str] = None,
) -> None:
super().__init__(
message, ErrorCategory.MCP_PROTOCOL, "MCP_ERROR", context, request_id
)

View file

@ -0,0 +1,251 @@
"""Ghost API type definitions based on Ghost v5.0 API."""
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, Field, HttpUrl
class ContentFormat(str, Enum):
"""Content formats supported by Ghost."""
LEXICAL = "lexical"
HTML = "html"
MOBILEDOC = "mobiledoc"
class PostStatus(str, Enum):
"""Post status options."""
DRAFT = "draft"
PUBLISHED = "published"
SCHEDULED = "scheduled"
class VisibilityType(str, Enum):
"""Content visibility options."""
PUBLIC = "public"
MEMBERS = "members"
PAID = "paid"
TIERS = "tiers"
class GhostMeta(BaseModel):
"""Ghost API response metadata."""
pagination: Optional[Dict[str, Any]] = None
class GhostAuthor(BaseModel):
"""Ghost author object."""
id: str
name: str
slug: str
email: Optional[str] = None
profile_image: Optional[HttpUrl] = None
cover_image: Optional[HttpUrl] = None
bio: Optional[str] = None
website: Optional[HttpUrl] = None
location: Optional[str] = None
facebook: Optional[str] = None
twitter: Optional[str] = None
accessibility: Optional[str] = None
status: str
meta_title: Optional[str] = None
meta_description: Optional[str] = None
tour: Optional[List[str]] = None
last_seen: Optional[datetime] = None
created_at: datetime
updated_at: datetime
roles: Optional[List[Dict[str, Any]]] = None
url: str
class GhostTag(BaseModel):
"""Ghost tag object."""
id: str
name: str
slug: str
description: Optional[str] = None
feature_image: Optional[HttpUrl] = None
visibility: VisibilityType = VisibilityType.PUBLIC
og_image: Optional[HttpUrl] = None
og_title: Optional[str] = None
og_description: Optional[str] = None
twitter_image: Optional[HttpUrl] = None
twitter_title: Optional[str] = None
twitter_description: Optional[str] = None
meta_title: Optional[str] = None
meta_description: Optional[str] = None
codeinjection_head: Optional[str] = None
codeinjection_foot: Optional[str] = None
canonical_url: Optional[HttpUrl] = None
accent_color: Optional[str] = None
created_at: datetime
updated_at: datetime
url: str
class GhostPost(BaseModel):
"""Ghost post object."""
id: str
uuid: str
title: str
slug: str
mobiledoc: Optional[str] = None
lexical: Optional[str] = None
html: Optional[str] = None
comment_id: Optional[str] = None
plaintext: Optional[str] = None
feature_image: Optional[HttpUrl] = None
feature_image_alt: Optional[str] = None
feature_image_caption: Optional[str] = None
featured: bool = False
status: PostStatus = PostStatus.DRAFT
visibility: VisibilityType = VisibilityType.PUBLIC
created_at: datetime
updated_at: datetime
published_at: Optional[datetime] = None
custom_excerpt: Optional[str] = None
codeinjection_head: Optional[str] = None
codeinjection_foot: Optional[str] = None
custom_template: Optional[str] = None
canonical_url: Optional[HttpUrl] = None
tags: Optional[List[GhostTag]] = None
authors: Optional[List[GhostAuthor]] = None
primary_author: Optional[GhostAuthor] = None
primary_tag: Optional[GhostTag] = None
url: str
excerpt: Optional[str] = None
reading_time: Optional[int] = None
access: Optional[bool] = None
og_image: Optional[HttpUrl] = None
og_title: Optional[str] = None
og_description: Optional[str] = None
twitter_image: Optional[HttpUrl] = None
twitter_title: Optional[str] = None
twitter_description: Optional[str] = None
meta_title: Optional[str] = None
meta_description: Optional[str] = None
email_segment: Optional[str] = None
newsletter: Optional[Dict[str, Any]] = None
class GhostPage(BaseModel):
"""Ghost page object."""
id: str
uuid: str
title: str
slug: str
mobiledoc: Optional[str] = None
lexical: Optional[str] = None
html: Optional[str] = None
comment_id: Optional[str] = None
plaintext: Optional[str] = None
feature_image: Optional[HttpUrl] = None
feature_image_alt: Optional[str] = None
feature_image_caption: Optional[str] = None
featured: bool = False
status: PostStatus = PostStatus.DRAFT
visibility: VisibilityType = VisibilityType.PUBLIC
created_at: datetime
updated_at: datetime
published_at: Optional[datetime] = None
custom_excerpt: Optional[str] = None
codeinjection_head: Optional[str] = None
codeinjection_foot: Optional[str] = None
custom_template: Optional[str] = None
canonical_url: Optional[HttpUrl] = None
tags: Optional[List[GhostTag]] = None
authors: Optional[List[GhostAuthor]] = None
primary_author: Optional[GhostAuthor] = None
primary_tag: Optional[GhostTag] = None
url: str
excerpt: Optional[str] = None
reading_time: Optional[int] = None
access: Optional[bool] = None
og_image: Optional[HttpUrl] = None
og_title: Optional[str] = None
og_description: Optional[str] = None
twitter_image: Optional[HttpUrl] = None
twitter_title: Optional[str] = None
twitter_description: Optional[str] = None
meta_title: Optional[str] = None
meta_description: Optional[str] = None
class GhostMember(BaseModel):
"""Ghost member object."""
id: str
uuid: str
email: str
name: Optional[str] = None
note: Optional[str] = None
subscribed: bool = True
created_at: datetime
updated_at: datetime
labels: Optional[List[Dict[str, Any]]] = None
avatar_image: Optional[HttpUrl] = None
comped: bool = False
email_count: int = 0
email_opened_count: int = 0
email_open_rate: Optional[float] = None
status: str = "free"
last_seen: Optional[datetime] = None
newsletters: Optional[List[Dict[str, Any]]] = None
subscriptions: Optional[List[Dict[str, Any]]] = None
products: Optional[List[Dict[str, Any]]] = None
class GhostSettings(BaseModel):
"""Ghost settings object."""
title: str
description: str
logo: Optional[HttpUrl] = None
icon: Optional[HttpUrl] = None
accent_color: Optional[str] = None
cover_image: Optional[HttpUrl] = None
facebook: Optional[str] = None
twitter: Optional[str] = None
lang: str = "en"
timezone: str = "Etc/UTC"
codeinjection_head: Optional[str] = None
codeinjection_foot: Optional[str] = None
navigation: Optional[List[Dict[str, Any]]] = None
secondary_navigation: Optional[List[Dict[str, Any]]] = None
meta_title: Optional[str] = None
meta_description: Optional[str] = None
og_image: Optional[HttpUrl] = None
og_title: Optional[str] = None
og_description: Optional[str] = None
twitter_image: Optional[HttpUrl] = None
twitter_title: Optional[str] = None
twitter_description: Optional[str] = None
url: str
class GhostApiResponse(BaseModel):
"""Ghost API response wrapper."""
posts: Optional[List[GhostPost]] = None
pages: Optional[List[GhostPage]] = None
tags: Optional[List[GhostTag]] = None
authors: Optional[List[GhostAuthor]] = None
members: Optional[List[GhostMember]] = None
settings: Optional[GhostSettings] = None
meta: Optional[GhostMeta] = None
class GhostError(BaseModel):
"""Ghost API error object."""
id: str
message: str
context: Optional[str] = None
type: str
details: Optional[str] = None
property: Optional[str] = None
help: Optional[str] = None
code: Optional[str] = None
ghostErrorCode: Optional[str] = None
class GhostErrorResponse(BaseModel):
"""Ghost API error response."""
errors: List[GhostError]

View file

@ -0,0 +1,49 @@
"""MCP-specific type definitions."""
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel
class MCPToolParameter(BaseModel):
"""MCP tool parameter definition."""
type: str
description: Optional[str] = None
required: bool = False
enum: Optional[List[str]] = None
properties: Optional[Dict[str, "MCPToolParameter"]] = None
class MCPToolDefinition(BaseModel):
"""MCP tool definition."""
name: str
description: str
inputSchema: Dict[str, Any]
class MCPContent(BaseModel):
"""MCP response content."""
type: str
text: Optional[str] = None
data: Optional[str] = None
url: Optional[str] = None
mimeType: Optional[str] = None
class MCPToolResponse(BaseModel):
"""MCP tool response."""
content: List[MCPContent]
isError: bool = False
class MCPToolRequest(BaseModel):
"""MCP tool request."""
name: str
arguments: Dict[str, Any]
class MCPError(BaseModel):
"""MCP error response."""
code: int
message: str
data: Optional[Any] = None

View file

@ -0,0 +1,14 @@
"""Utility modules for Ghost MCP server."""
from .logging import setup_logging, get_logger
from .retry import with_retry, RetryConfig
from .validation import validate_filter_syntax, validate_pagination_params
__all__ = [
"setup_logging",
"get_logger",
"with_retry",
"RetryConfig",
"validate_filter_syntax",
"validate_pagination_params",
]

View file

@ -0,0 +1,60 @@
"""Logging utilities for Ghost MCP server."""
import logging
import sys
import uuid
from typing import Any, Dict, Optional
import structlog
from structlog.typing import Processor
from ..config import config
def add_request_id(logger: Any, method_name: str, event_dict: Dict[str, Any]) -> Dict[str, Any]:
"""Add request ID to log events."""
if "request_id" not in event_dict:
event_dict["request_id"] = str(uuid.uuid4())
return event_dict
def setup_logging() -> None:
"""Set up structured logging for the application."""
processors: list[Processor] = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
]
if config.logging.request_id:
processors.append(add_request_id)
if config.logging.structured:
processors.extend([
structlog.processors.JSONRenderer()
])
else:
processors.extend([
structlog.dev.ConsoleRenderer()
])
structlog.configure(
processors=processors,
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, config.logging.level.upper())
),
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
# Configure standard library logging
logging.basicConfig(
format="%(message)s",
stream=sys.stdout,
level=getattr(logging, config.logging.level.upper()),
)
def get_logger(name: Optional[str] = None) -> structlog.BoundLogger:
"""Get a logger instance."""
return structlog.get_logger(name)

View file

@ -0,0 +1,77 @@
"""Retry utilities with exponential backoff."""
import asyncio
import time
from typing import Any, Awaitable, Callable, Optional, TypeVar
from pydantic import BaseModel
from ..types.errors import NetworkError
from .logging import get_logger
T = TypeVar("T")
logger = get_logger(__name__)
class RetryConfig(BaseModel):
"""Configuration for retry behavior."""
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 10.0
exponential_base: float = 2.0
jitter: bool = True
async def with_retry(
operation: Callable[[], Awaitable[T]],
config: Optional[RetryConfig] = None,
request_id: Optional[str] = None,
) -> T:
"""Execute operation with exponential backoff retry logic."""
if config is None:
config = RetryConfig()
last_exception: Optional[Exception] = None
for attempt in range(config.max_retries + 1):
try:
return await operation()
except Exception as e:
last_exception = e
if attempt == config.max_retries:
logger.error(
"Operation failed after all retries",
attempt=attempt,
max_retries=config.max_retries,
error=str(e),
request_id=request_id,
)
break
# Calculate delay with exponential backoff
delay = min(
config.base_delay * (config.exponential_base ** attempt),
config.max_delay
)
# Add jitter to prevent thundering herd
if config.jitter:
import random
delay = delay * (0.5 + random.random() * 0.5)
logger.warning(
"Operation failed, retrying",
attempt=attempt,
delay=delay,
error=str(e),
request_id=request_id,
)
await asyncio.sleep(delay)
# Re-raise the last exception
if last_exception:
raise last_exception
raise NetworkError("Retry logic failed unexpectedly", request_id=request_id)

View file

@ -0,0 +1,93 @@
"""Parameter validation utilities."""
import re
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field, validator
from ..types.errors import ValidationError
class PaginationParams(BaseModel):
"""Pagination parameters validation."""
limit: Optional[int] = Field(None, ge=1, le=50)
page: Optional[int] = Field(None, ge=1)
class FilterParams(BaseModel):
"""Filter parameters validation."""
filter: Optional[str] = None
include: Optional[str] = None
fields: Optional[str] = None
order: Optional[str] = None
def validate_pagination_params(params: Dict[str, Any]) -> PaginationParams:
"""Validate pagination parameters."""
try:
return PaginationParams(**params)
except ValueError as e:
raise ValidationError(f"Invalid pagination parameters: {e}")
def validate_filter_syntax(filter_string: str) -> bool:
"""
Validate Ghost filter syntax (NQL - Node Query Language).
Based on contracts/ghost-filtering.md documentation.
"""
if not filter_string:
return True
# Basic validation patterns for common NQL operators
valid_operators = [
r"\+", # AND
r",", # OR
r":", # EQUALS
r":-", # NOT EQUALS
r":~", # CONTAINS
r":-~", # NOT CONTAINS
r":>", # GREATER THAN
r":<", # LESS THAN
r":>=", # GREATER THAN OR EQUAL
r":<=", # LESS THAN OR EQUAL
]
# Check for balanced parentheses
if filter_string.count('(') != filter_string.count(')'):
return False
# Check for balanced square brackets
if filter_string.count('[') != filter_string.count(']'):
return False
# More comprehensive validation would go here
# For now, we accept most strings as valid
return True
def validate_id_parameter(id_value: str, parameter_name: str = "id") -> str:
"""Validate ID parameter format."""
if not id_value or not isinstance(id_value, str):
raise ValidationError(f"Invalid {parameter_name}: must be a non-empty string")
if len(id_value.strip()) == 0:
raise ValidationError(f"Invalid {parameter_name}: cannot be empty or whitespace")
return id_value.strip()
def validate_slug_parameter(slug: str) -> str:
"""Validate slug parameter format."""
if not slug or not isinstance(slug, str):
raise ValidationError("Invalid slug: must be a non-empty string")
slug = slug.strip()
if not slug:
raise ValidationError("Invalid slug: cannot be empty or whitespace")
# Basic slug validation (alphanumeric, hyphens, underscores)
if not re.match(r'^[a-zA-Z0-9-_]+$', slug):
raise ValidationError("Invalid slug: must contain only alphanumeric characters, hyphens, and underscores")
return slug

View file

@ -1,111 +0,0 @@
/**
* Ghost MCP Server Entry Point
*
* This is the main entry point for the Ghost MCP server providing complete
* Ghost CMS functionality through the Model Context Protocol.
*/
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import { CallToolRequestSchema, ListToolsRequestSchema } from '@modelcontextprotocol/sdk/types.js';
// TODO: Phase 1 - Import configuration and authentication
// import { loadConfig } from './utils/config.js';
// import { GhostClient } from './ghost-client.js';
// TODO: Phase 2 - Import Content API tools
// import { registerContentTools } from './tools/content/index.js';
// TODO: Phase 3 - Import Admin API tools
// import { registerAdminTools } from './tools/admin/index.js';
/**
* Ghost MCP Server
*
* Implements 44+ MCP tools covering all Ghost REST API endpoints:
* - 13 Content API tools (read-only)
* - 31+ Admin API tools (read/write)
*/
class GhostMCPServer {
private server: Server;
constructor() {
this.server = new Server(
{
name: 'ghost-mcp',
version: '0.1.0',
},
{
capabilities: {
tools: {},
},
}
);
this.setupHandlers();
}
private setupHandlers(): void {
// List available tools
this.server.setRequestHandler(ListToolsRequestSchema, async () => {
// TODO: Phase 1 - Return basic tool list
// TODO: Phase 2 - Add Content API tools
// TODO: Phase 3 - Add Admin API tools
return {
tools: [
{
name: 'ghost_status',
description: 'Check Ghost MCP server status and configuration',
inputSchema: {
type: 'object',
properties: {},
},
},
],
};
});
// Handle tool calls
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
switch (name) {
case 'ghost_status':
return {
content: [
{
type: 'text',
text: 'Ghost MCP Server v0.1.0 - Phase 0 Complete\nStatus: Development Mode\nAPI Integration: Pending API Keys',
},
],
};
default:
throw new Error(`Unknown tool: ${name}`);
}
});
}
async start(): Promise<void> {
const transport = new StdioServerTransport();
await this.server.connect(transport);
// TODO: Phase 1 - Initialize configuration and logging
// TODO: Phase 1 - Setup Ghost API authentication
// TODO: Phase 2 - Register Content API tools
// TODO: Phase 3 - Register Admin API tools
console.error('Ghost MCP Server started successfully');
}
}
// Start the server
if (import.meta.url === `file://${process.argv[1]}`) {
const server = new GhostMCPServer();
server.start().catch((error) => {
console.error('Failed to start Ghost MCP Server:', error);
process.exit(1);
});
}
export { GhostMCPServer };

View file

@ -1,8 +0,0 @@
/**
* Ghost Admin API - Media Tools
*
* MCP tools for media upload and management via Admin API.
* Implements tools: ghost_admin_upload_image, ghost_admin_upload_media
*/
// TODO: Phase 4 - Implement Admin API media tools

View file

@ -1,9 +0,0 @@
/**
* Ghost Admin API - Members Tools
*
* MCP tools for member management via Admin API.
* Implements tools: ghost_admin_list_members, ghost_admin_get_member,
* ghost_admin_create_member, ghost_admin_update_member
*/
// TODO: Phase 4 - Implement Admin API members tools

View file

@ -1,9 +0,0 @@
/**
* Ghost Admin API - Pages Tools
*
* MCP tools for full CRUD access to Ghost pages via Admin API.
* Implements tools: ghost_admin_list_pages, ghost_admin_get_page, ghost_admin_create_page,
* ghost_admin_update_page, ghost_admin_copy_page, ghost_admin_delete_page
*/
// TODO: Phase 3 - Implement Admin API pages tools (CRUD operations)

View file

@ -1,9 +0,0 @@
/**
* Ghost Admin API - Posts Tools
*
* MCP tools for full CRUD access to Ghost posts via Admin API.
* Implements tools: ghost_admin_list_posts, ghost_admin_get_post, ghost_admin_create_post,
* ghost_admin_update_post, ghost_admin_copy_post, ghost_admin_delete_post
*/
// TODO: Phase 3 - Implement Admin API posts tools (CRUD operations)

View file

@ -1,9 +0,0 @@
/**
* Ghost Admin API - Tags Tools
*
* MCP tools for full CRUD access to Ghost tags via Admin API.
* Implements tools: ghost_admin_list_tags, ghost_admin_get_tag, ghost_admin_create_tag,
* ghost_admin_update_tag, ghost_admin_delete_tag
*/
// TODO: Phase 3 - Implement Admin API tags tools (CRUD operations)

View file

@ -1,9 +0,0 @@
/**
* Ghost Admin API - Tiers Tools
*
* MCP tools for membership tier management via Admin API.
* Implements tools: ghost_admin_list_tiers, ghost_admin_get_tier,
* ghost_admin_create_tier, ghost_admin_update_tier
*/
// TODO: Phase 4 - Implement Admin API tiers tools

View file

@ -1,9 +0,0 @@
/**
* Ghost Admin API - Webhooks Tools
*
* MCP tools for webhook management via Admin API.
* Implements tools: ghost_admin_list_webhooks, ghost_admin_create_webhook,
* ghost_admin_update_webhook, ghost_admin_delete_webhook
*/
// TODO: Phase 4 - Implement Admin API webhooks tools

View file

@ -1,8 +0,0 @@
/**
* Ghost Content API - Authors Tools
*
* MCP tools for read-only access to Ghost authors via Content API.
* Implements tools: ghost_list_authors, ghost_get_author_by_id, ghost_get_author_by_slug
*/
// TODO: Phase 2 - Implement Content API authors tools

View file

@ -1,8 +0,0 @@
/**
* Ghost Content API - Pages Tools
*
* MCP tools for read-only access to Ghost pages via Content API.
* Implements tools: ghost_list_pages, ghost_get_page_by_id, ghost_get_page_by_slug
*/
// TODO: Phase 2 - Implement Content API pages tools

View file

@ -1,31 +0,0 @@
/**
* Ghost Content API - Posts Tools
*
* MCP tools for read-only access to Ghost posts via Content API.
* Implements tools: ghost_list_posts, ghost_get_post_by_id, ghost_get_post_by_slug
*/
// TODO: Phase 2 - Implement Content API posts tools
// - ghost_list_posts
// - ghost_get_post_by_id
// - ghost_get_post_by_slug
export const postsContentTools = [
{
name: 'ghost_list_posts',
description: 'List all published posts from Ghost Content API',
inputSchema: {
type: 'object',
properties: {
limit: { type: 'number', description: 'Number of posts to return (max 50)' },
page: { type: 'number', description: 'Page number for pagination' },
filter: { type: 'string', description: 'Filter posts using Ghost filter syntax' },
include: { type: 'string', description: 'Include related resources (tags,authors)' },
fields: { type: 'string', description: 'Comma-separated list of fields to return' },
},
},
},
// TODO: Phase 2 - Add other posts tools
];
// TODO: Phase 2 - Implement tool handlers

View file

@ -1,8 +0,0 @@
/**
* Ghost Content API - Settings and Other Tools
*
* MCP tools for Ghost settings and tiers via Content API.
* Implements tools: ghost_list_tiers, ghost_get_settings
*/
// TODO: Phase 2 - Implement Content API settings and tiers tools

View file

@ -1,8 +0,0 @@
/**
* Ghost Content API - Tags Tools
*
* MCP tools for read-only access to Ghost tags via Content API.
* Implements tools: ghost_list_tags, ghost_get_tag_by_id, ghost_get_tag_by_slug
*/
// TODO: Phase 2 - Implement Content API tags tools

View file

@ -1,225 +0,0 @@
/**
* Ghost API Response Types
*
* TypeScript interfaces for Ghost API responses and data structures.
* Based on research in contracts/ documentation.
*/
// TODO: Phase 1 - Define Ghost API response types
// Based on contracts/api-endpoints.md and contracts/content-formats.md
export interface GhostPost {
id: string;
title: string;
slug: string;
html?: string;
lexical?: string;
mobiledoc?: string;
feature_image?: string;
featured: boolean;
status: 'draft' | 'published' | 'scheduled';
visibility: 'public' | 'members' | 'paid';
created_at: string;
updated_at: string;
published_at?: string;
custom_excerpt?: string;
codeinjection_head?: string;
codeinjection_foot?: string;
og_image?: string;
og_title?: string;
og_description?: string;
twitter_image?: string;
twitter_title?: string;
twitter_description?: string;
meta_title?: string;
meta_description?: string;
email_subject?: string;
url: string;
excerpt: string;
reading_time: number;
page: boolean;
}
export interface GhostPage extends Omit<GhostPost, 'page'> {
page: true;
}
export interface GhostTag {
id: string;
name: string;
slug: string;
description?: string;
feature_image?: string;
visibility: 'public' | 'internal';
og_image?: string;
og_title?: string;
og_description?: string;
twitter_image?: string;
twitter_title?: string;
twitter_description?: string;
meta_title?: string;
meta_description?: string;
codeinjection_head?: string;
codeinjection_foot?: string;
canonical_url?: string;
accent_color?: string;
url: string;
count: {
posts: number;
};
}
export interface GhostAuthor {
id: string;
name: string;
slug: string;
email?: string;
profile_image?: string;
cover_image?: string;
bio?: string;
website?: string;
location?: string;
facebook?: string;
twitter?: string;
meta_title?: string;
meta_description?: string;
url: string;
count: {
posts: number;
};
}
export interface GhostMember {
id: string;
uuid: string;
email: string;
name?: string;
note?: string;
subscribed: boolean;
created_at: string;
updated_at: string;
labels: GhostLabel[];
subscriptions: GhostSubscription[];
avatar_image?: string;
comped: boolean;
email_count: number;
email_opened_count: number;
email_open_rate?: number;
}
export interface GhostLabel {
id: string;
name: string;
slug: string;
created_at: string;
updated_at: string;
}
export interface GhostSubscription {
id: string;
member_id: string;
tier_id: string;
status: string;
created_at: string;
updated_at: string;
}
export interface GhostTier {
id: string;
name: string;
slug: string;
description?: string;
active: boolean;
type: 'free' | 'paid';
welcome_page_url?: string;
created_at: string;
updated_at: string;
visibility: 'public' | 'none';
trial_days: number;
currency?: string;
monthly_price?: number;
yearly_price?: number;
benefits: string[];
}
export interface GhostSettings {
title: string;
description: string;
logo?: string;
icon?: string;
accent_color?: string;
cover_image?: string;
facebook?: string;
twitter?: string;
lang: string;
timezone: string;
codeinjection_head?: string;
codeinjection_foot?: string;
navigation: GhostNavigation[];
secondary_navigation: GhostNavigation[];
meta_title?: string;
meta_description?: string;
og_image?: string;
og_title?: string;
og_description?: string;
twitter_image?: string;
twitter_title?: string;
twitter_description?: string;
url: string;
}
export interface GhostNavigation {
label: string;
url: string;
}
export interface GhostWebhook {
id: string;
event: string;
target_url: string;
name?: string;
secret?: string;
api_version: string;
integration_id: string;
status: 'available' | 'error';
last_triggered_at?: string;
last_triggered_status?: string;
last_triggered_error?: string;
created_at: string;
updated_at: string;
}
// API Response wrappers
export interface GhostApiResponse<T> {
[key: string]: T[];
meta?: {
pagination?: {
page: number;
limit: number;
pages: number;
total: number;
next?: number;
prev?: number;
};
};
}
export interface GhostApiSingleResponse<T> {
[key: string]: T[];
}
// Error types
export interface GhostApiError {
message: string;
context?: string;
type: string;
details?: string;
property?: string;
help?: string;
code?: string;
id: string;
}
export interface GhostApiErrorResponse {
errors: GhostApiError[];
}

View file

@ -1,63 +0,0 @@
/**
* MCP-Specific Types
*
* TypeScript interfaces for MCP tool definitions and responses.
*/
// TODO: Phase 1 - Define MCP-specific types
export interface MCPToolParameter {
type: string;
description?: string;
required?: boolean;
enum?: string[];
properties?: Record<string, MCPToolParameter>;
}
export interface MCPToolDefinition {
name: string;
description: string;
inputSchema: {
type: 'object';
properties: Record<string, MCPToolParameter>;
required?: string[];
};
}
export interface MCPToolResponse {
content: Array<{
type: 'text' | 'image' | 'resource';
text?: string;
data?: string;
url?: string;
mimeType?: string;
}>;
isError?: boolean;
}
export interface MCPToolRequest {
name: string;
arguments: Record<string, unknown>;
}
// MCP Error types
export interface MCPError {
code: number;
message: string;
data?: unknown;
}
// Configuration types
export interface MCPServerConfig {
ghost: {
url: string;
contentApiKey?: string;
adminApiKey?: string;
version: string;
mode: 'readonly' | 'readwrite' | 'auto';
};
logging: {
level: 'debug' | 'info' | 'warn' | 'error';
structured: boolean;
};
}

View file

@ -1,155 +0,0 @@
/**
* Error Handling Utilities
*
* Error classes, mapping functions, and logging utilities.
* Implements error handling strategy from contracts/error-responses.md
*/
import { v4 as uuidv4 } from 'uuid';
import { GhostApiError, GhostApiErrorResponse } from '../types/ghost.js';
import { MCPError } from '../types/mcp.js';
// TODO: Phase 1 - Implement comprehensive error handling
// Based on contracts/error-responses.md
export enum ErrorCategory {
NETWORK = 'NETWORK',
AUTHENTICATION = 'AUTHENTICATION',
GHOST_API = 'GHOST_API',
MCP_PROTOCOL = 'MCP_PROTOCOL',
FILE_UPLOAD = 'FILE_UPLOAD',
VALIDATION = 'VALIDATION',
}
export class GhostMCPError extends Error {
public readonly id: string;
public readonly category: ErrorCategory;
public readonly code?: string;
public readonly context?: string;
public readonly requestId?: string;
constructor(
message: string,
category: ErrorCategory,
code?: string,
context?: string,
requestId?: string
) {
super(message);
this.name = 'GhostMCPError';
this.id = uuidv4();
this.category = category;
this.code = code;
this.context = context;
this.requestId = requestId;
}
}
export class NetworkError extends GhostMCPError {
constructor(message: string, context?: string, requestId?: string) {
super(message, ErrorCategory.NETWORK, 'NETWORK_ERROR', context, requestId);
this.name = 'NetworkError';
}
}
export class AuthenticationError extends GhostMCPError {
constructor(message: string, context?: string, requestId?: string) {
super(message, ErrorCategory.AUTHENTICATION, 'AUTH_ERROR', context, requestId);
this.name = 'AuthenticationError';
}
}
export class GhostApiError extends GhostMCPError {
constructor(message: string, code?: string, context?: string, requestId?: string) {
super(message, ErrorCategory.GHOST_API, code, context, requestId);
this.name = 'GhostApiError';
}
}
export class ValidationError extends GhostMCPError {
constructor(message: string, context?: string, requestId?: string) {
super(message, ErrorCategory.VALIDATION, 'VALIDATION_ERROR', context, requestId);
this.name = 'ValidationError';
}
}
// Error mapping functions
export function mapGhostErrorToMCP(ghostError: GhostApiErrorResponse, requestId?: string): MCPError {
// TODO: Phase 1 - Implement Ghost API error to MCP error mapping
const firstError = ghostError.errors[0];
return {
code: -32603, // Internal error
message: firstError?.message || 'Unknown Ghost API error',
data: {
type: firstError?.type,
context: firstError?.context,
requestId,
},
};
}
export function createMCPError(error: Error, requestId?: string): MCPError {
// TODO: Phase 1 - Create MCP error from any error type
if (error instanceof GhostMCPError) {
return {
code: -32603,
message: error.message,
data: {
category: error.category,
code: error.code,
context: error.context,
requestId: error.requestId || requestId,
},
};
}
return {
code: -32603,
message: error.message || 'Internal server error',
data: { requestId },
};
}
// Retry logic utilities
export interface RetryConfig {
maxRetries: number;
baseDelay: number;
maxDelay: number;
exponentialBase: number;
}
export const defaultRetryConfig: RetryConfig = {
maxRetries: 3,
baseDelay: 1000,
maxDelay: 10000,
exponentialBase: 2,
};
export async function withRetry<T>(
operation: () => Promise<T>,
config: Partial<RetryConfig> = {},
requestId?: string
): Promise<T> {
// TODO: Phase 1 - Implement retry logic with exponential backoff
const finalConfig = { ...defaultRetryConfig, ...config };
for (let attempt = 0; attempt <= finalConfig.maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
if (attempt === finalConfig.maxRetries) {
throw error;
}
// Calculate delay with exponential backoff
const delay = Math.min(
finalConfig.baseDelay * Math.pow(finalConfig.exponentialBase, attempt),
finalConfig.maxDelay
);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
throw new Error('Retry logic failed unexpectedly');
}

View file

@ -1,45 +0,0 @@
/**
* Parameter Validation Utilities
*
* Zod schemas and validation functions for MCP tool parameters.
*/
import { z } from 'zod';
// TODO: Phase 1 - Implement parameter validation schemas
// Based on contracts/ghost-filtering.md and MCP tool specifications
// Common parameter schemas
export const limitSchema = z.number().min(1).max(50).optional();
export const pageSchema = z.number().min(1).optional();
export const filterSchema = z.string().optional();
export const includeSchema = z.string().optional();
export const fieldsSchema = z.string().optional();
// ID and slug schemas
export const idSchema = z.string().min(1);
export const slugSchema = z.string().min(1);
// Content schemas
export const titleSchema = z.string().min(1);
export const htmlSchema = z.string().optional();
export const lexicalSchema = z.string().optional();
export const statusSchema = z.enum(['draft', 'published', 'scheduled']).optional();
// Validation functions
export function validatePaginationParams(params: unknown): { limit?: number; page?: number } {
// TODO: Phase 1 - Implement validation
return params as { limit?: number; page?: number };
}
export function validateFilterSyntax(filter: string): boolean {
// TODO: Phase 1 - Implement Ghost filter syntax validation
// Based on contracts/ghost-filtering.md
return true;
}
export function validateContentFormat(content: unknown): 'lexical' | 'html' | 'mobiledoc' | 'unknown' {
// TODO: Phase 1 - Implement content format detection
// Based on contracts/content-formats.md
return 'unknown';
}

View file

@ -1,45 +0,0 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "ESNext",
"moduleResolution": "Node",
"allowSyntheticDefaultImports": true,
"esModuleInterop": true,
"allowImportingTsExtensions": false,
"resolveJsonModule": true,
"declaration": true,
"outDir": "./dist",
"rootDir": "./src",
"removeComments": true,
"strict": true,
"noUnusedLocals": true,
"noUnusedParameters": true,
"exactOptionalPropertyTypes": true,
"noImplicitReturns": true,
"noFallthroughCasesInSwitch": true,
"noUncheckedIndexedAccess": true,
"noImplicitOverride": true,
"allowUnusedLabels": false,
"allowUnreachableCode": false,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"experimentalDecorators": true,
"emitDecoratorMetadata": true,
"sourceMap": true,
"incremental": true,
"tsBuildInfoFile": "./dist/.tsbuildinfo"
},
"include": [
"src/**/*"
],
"exclude": [
"node_modules",
"dist",
"**/*.test.ts",
"**/*.spec.ts"
],
"ts-node": {
"esm": true,
"experimentalSpecifierResolution": "node"
}
}