WIP e2e tests

This commit is contained in:
Luiz Felipe Costa 2025-09-23 03:28:35 -03:00
parent a160d00082
commit 89674f277b
11 changed files with 2257 additions and 2 deletions

View file

@ -1,6 +1,6 @@
# Ghost MCP Development Makefile # Ghost MCP Development Makefile
.PHONY: help install install-local deps-install-python deps-install-dev deps-deps-install-uv install-pip venv start-ghost stop-ghost restart-ghost setup-tokens test test-unit test-integration test-coverage test-fast test-parallel test-connection clean-test run dev format lint clean logs status check-deps setup docs .PHONY: help install install-local deps-install-python deps-install-dev deps-deps-install-uv install-pip venv start-ghost stop-ghost restart-ghost setup-tokens test test-unit test-integration test-coverage test-fast test-parallel test-e2e test-connection clean-test run dev format lint clean logs status check-deps setup docs
.PHONY: help .PHONY: help
help: ## Show this help message help: ## Show this help message
@ -114,6 +114,15 @@ test-fast: ## Run tests with fail-fast and short traceback
test-parallel: ## Run tests in parallel test-parallel: ## Run tests in parallel
uv run pytest tests/ -n auto uv run pytest tests/ -n auto
test-e2e: ## Run end-to-end tests against real Ghost instance
@echo "🧪 Running end-to-end tests..."
@if [ ! -f .env ]; then \
echo "❌ .env file not found. Run 'make setup-tokens' first"; \
exit 1; \
fi
@echo "⚠️ Note: These tests require a running Ghost instance (make start-ghost)"
uv run pytest tests/e2e/ -v -m e2e
test-connection: ## Test Ghost API connectivity test-connection: ## Test Ghost API connectivity
@if [ ! -f .env ]; then \ @if [ ! -f .env ]; then \
echo "❌ .env file not found. Run 'make setup-tokens' first"; \ echo "❌ .env file not found. Run 'make setup-tokens' first"; \

View file

@ -74,4 +74,9 @@ python_files = ["test_*.py", "*_test.py"]
python_classes = ["Test*"] python_classes = ["Test*"]
python_functions = ["test_*"] python_functions = ["test_*"]
addopts = "-v --cov=ghost_mcp --cov-report=html --cov-report=term-missing" addopts = "-v --cov=ghost_mcp --cov-report=html --cov-report=term-missing"
asyncio_mode = "auto" asyncio_mode = "auto"
markers = [
"e2e: marks tests as end-to-end tests requiring real Ghost instance",
"admin: marks tests as requiring Ghost Admin API access",
"content: marks tests as requiring only Ghost Content API access"
]

1
tests/e2e/__init__.py Normal file
View file

@ -0,0 +1 @@
"""End-to-end tests for Ghost MCP tools."""

281
tests/e2e/conftest.py Normal file
View file

@ -0,0 +1,281 @@
"""Fixtures for end-to-end tests."""
import asyncio
import json
import os
import uuid
from typing import AsyncGenerator, Dict, List, Any
import pytest
import httpx
from ghost_mcp.server import mcp
from ghost_mcp.client import GhostClient
from ghost_mcp.config import config
@pytest.fixture(scope="session")
def ensure_ghost_running():
"""Ensure Ghost container is running before tests."""
import subprocess
try:
# Check if Ghost is accessible
response = httpx.get("http://localhost:2368", timeout=5)
if response.status_code != 200:
raise Exception("Ghost not accessible")
except Exception:
# Try to start Ghost containers
result = subprocess.run(
["docker", "compose", "ps", "-q", "ghost"],
capture_output=True,
text=True,
cwd="/var/home/luiz/projects/thenets/ghost-mcp"
)
if not result.stdout.strip():
pytest.skip("Ghost container not running. Run 'make start-ghost' first.")
# Verify environment configuration
if not os.getenv("GHOST_CONTENT_API_KEY") or not os.getenv("GHOST_ADMIN_API_KEY"):
pytest.skip("Ghost API keys not configured. Run 'make setup-tokens' first.")
@pytest.fixture
async def ghost_client() -> AsyncGenerator[GhostClient, None]:
"""Provide a Ghost client for tests."""
async with GhostClient() as client:
yield client
@pytest.fixture
def mcp_server():
"""Provide the MCP server instance."""
return mcp
@pytest.fixture
async def test_post_data() -> Dict[str, Any]:
"""Provide test data for creating posts."""
unique_id = str(uuid.uuid4())[:8]
return {
"title": f"Test Post {unique_id}",
"content": json.dumps({
"root": {
"children": [
{
"children": [
{
"detail": 0,
"format": 0,
"mode": "normal",
"style": "",
"text": f"This is a test post content for e2e testing. ID: {unique_id}",
"type": "text",
"version": 1
}
],
"direction": "ltr",
"format": "",
"indent": 0,
"type": "paragraph",
"version": 1
}
],
"direction": "ltr",
"format": "",
"indent": 0,
"type": "root",
"version": 1
}
}),
"content_format": "lexical",
"status": "draft"
}
@pytest.fixture
async def test_page_data() -> Dict[str, Any]:
"""Provide test data for creating pages."""
unique_id = str(uuid.uuid4())[:8]
return {
"title": f"Test Page {unique_id}",
"content": json.dumps({
"root": {
"children": [
{
"children": [
{
"detail": 0,
"format": 0,
"mode": "normal",
"style": "",
"text": f"This is a test page content for e2e testing. ID: {unique_id}",
"type": "text",
"version": 1
}
],
"direction": "ltr",
"format": "",
"indent": 0,
"type": "paragraph",
"version": 1
}
],
"direction": "ltr",
"format": "",
"indent": 0,
"type": "root",
"version": 1
}
}),
"content_format": "lexical",
"status": "draft"
}
@pytest.fixture
async def test_tag_data() -> Dict[str, str]:
"""Provide test data for creating tags."""
unique_id = str(uuid.uuid4())[:8]
return {
"name": f"test-tag-{unique_id}",
"description": f"Test tag for e2e testing {unique_id}"
}
@pytest.fixture
async def cleanup_test_content(ghost_client: GhostClient):
"""Clean up test content after each test."""
created_posts = []
created_pages = []
created_tags = []
def track_post(post_id: str):
created_posts.append(post_id)
def track_page(page_id: str):
created_pages.append(page_id)
def track_tag(tag_id: str):
created_tags.append(tag_id)
# Provide tracking functions
yield {
"track_post": track_post,
"track_page": track_page,
"track_tag": track_tag
}
# Cleanup after test
for post_id in created_posts:
try:
await ghost_client._make_request("DELETE", f"posts/{post_id}/", api_type="admin")
except Exception:
pass # Ignore cleanup errors
for page_id in created_pages:
try:
await ghost_client._make_request("DELETE", f"pages/{page_id}/", api_type="admin")
except Exception:
pass # Ignore cleanup errors
for tag_id in created_tags:
try:
await ghost_client._make_request("DELETE", f"tags/{tag_id}/", api_type="admin")
except Exception:
pass # Ignore cleanup errors
@pytest.fixture
async def sample_post(ghost_client: GhostClient, test_post_data: Dict[str, Any], cleanup_test_content):
"""Create a sample post for testing."""
# Create a post
response = await ghost_client._make_request(
"POST",
"posts/",
api_type="admin",
json_data={"posts": [test_post_data]}
)
post_data = response["posts"][0]
cleanup_test_content["track_post"](post_data["id"])
return post_data
@pytest.fixture
async def sample_published_post(ghost_client: GhostClient, test_post_data: Dict[str, Any], cleanup_test_content):
"""Create a sample published post for testing."""
# Modify data for published post
test_post_data["status"] = "published"
# Create a published post
response = await ghost_client._make_request(
"POST",
"posts/",
api_type="admin",
json_data={"posts": [test_post_data]}
)
post_data = response["posts"][0]
cleanup_test_content["track_post"](post_data["id"])
return post_data
@pytest.fixture
async def sample_page(ghost_client: GhostClient, test_page_data: Dict[str, Any], cleanup_test_content):
"""Create a sample page for testing."""
# Create a page
response = await ghost_client._make_request(
"POST",
"pages/",
api_type="admin",
json_data={"pages": [test_page_data]}
)
page_data = response["pages"][0]
cleanup_test_content["track_page"](page_data["id"])
return page_data
@pytest.fixture
async def sample_tag(ghost_client: GhostClient, test_tag_data: Dict[str, str], cleanup_test_content):
"""Create a sample tag for testing."""
# Create a tag
response = await ghost_client._make_request(
"POST",
"tags/",
api_type="admin",
json_data={"tags": [test_tag_data]}
)
tag_data = response["tags"][0]
cleanup_test_content["track_tag"](tag_data["id"])
return tag_data
@pytest.mark.e2e
class BaseE2ETest:
"""Base class for e2e tests."""
@pytest.fixture(autouse=True)
def setup_test(self, ensure_ghost_running):
"""Auto-use the Ghost running check."""
pass
def get_mcp_tool(self, mcp_server, tool_name: str):
"""Get an MCP tool by name from the server."""
tools = {tool.name: tool for tool in mcp_server.tools}
if tool_name not in tools:
available_tools = list(tools.keys())
raise ValueError(f"Tool '{tool_name}' not found. Available tools: {available_tools}")
return tools[tool_name]
async def call_mcp_tool(self, mcp_server, tool_name: str, **kwargs):
"""Call an MCP tool with the given arguments."""
tool = self.get_mcp_tool(mcp_server, tool_name)
return await tool.func(**kwargs)

View file

@ -0,0 +1,312 @@
"""End-to-end tests for Ghost authors functionality."""
import json
import pytest
from .conftest import BaseE2ETest
@pytest.mark.e2e
class TestAuthorsContentAPIE2E(BaseE2ETest):
"""Test authors Content API functionality end-to-end."""
async def test_get_authors(self, mcp_server):
"""Test getting authors."""
from ghost_mcp.tools.content.authors import get_authors
# Get authors
result = await get_authors()
response = json.loads(result)
# Verify response structure
assert "authors" in response
assert "meta" in response
assert isinstance(response["authors"], list)
# Should have at least the default Ghost author
assert len(response["authors"]) >= 1
# Verify author structure
if response["authors"]:
author = response["authors"][0]
essential_fields = ["id", "name", "slug", "email"]
for field in essential_fields:
assert field in author
async def test_get_authors_with_pagination(self, mcp_server):
"""Test getting authors with pagination parameters."""
from ghost_mcp.tools.content.authors import get_authors
# Get authors with limit
result = await get_authors(limit=5)
response = json.loads(result)
assert "authors" in response
assert len(response["authors"]) <= 5
# Test pagination metadata
assert "meta" in response
assert "pagination" in response["meta"]
async def test_get_authors_with_include_count(self, mcp_server):
"""Test getting authors with post count included."""
from ghost_mcp.tools.content.authors import get_authors
# Get authors with count.posts included
result = await get_authors(include="count.posts")
response = json.loads(result)
# Verify authors include count information
if response["authors"]:
author = response["authors"][0]
assert "count" in author
assert "posts" in author["count"]
assert isinstance(author["count"]["posts"], int)
async def test_get_authors_with_filter(self, mcp_server):
"""Test getting authors with filter."""
from ghost_mcp.tools.content.authors import get_authors
# Filter authors by status
result = await get_authors(filter="status:active")
response = json.loads(result)
# Verify filtering works
assert "authors" in response
if response["authors"]:
for author in response["authors"]:
# Status field might be omitted for active authors in some versions
status = author.get("status", "active")
assert status == "active"
async def test_get_authors_with_order(self, mcp_server):
"""Test getting authors with custom ordering."""
from ghost_mcp.tools.content.authors import get_authors
# Get authors ordered by name
result = await get_authors(order="name asc")
response = json.loads(result)
# Verify ordering (should be alphabetical)
if len(response["authors"]) > 1:
author_names = [author["name"] for author in response["authors"]]
assert author_names == sorted(author_names)
async def test_get_author_by_id(self, mcp_server):
"""Test getting an author by ID."""
from ghost_mcp.tools.content.authors import get_authors, get_author_by_id
# First get all authors to find an existing ID
all_authors_result = await get_authors()
all_authors_response = json.loads(all_authors_result)
if not all_authors_response["authors"]:
pytest.skip("No authors available for testing")
# Use the first author's ID
test_author = all_authors_response["authors"][0]
author_id = test_author["id"]
# Get author by ID
result = await get_author_by_id(author_id)
response = json.loads(result)
# Verify response
assert "authors" in response
assert len(response["authors"]) == 1
author = response["authors"][0]
assert author["id"] == author_id
assert author["name"] == test_author["name"]
async def test_get_author_by_slug(self, mcp_server):
"""Test getting an author by slug."""
from ghost_mcp.tools.content.authors import get_authors, get_author_by_slug
# First get all authors to find an existing slug
all_authors_result = await get_authors()
all_authors_response = json.loads(all_authors_result)
if not all_authors_response["authors"]:
pytest.skip("No authors available for testing")
# Use the first author's slug
test_author = all_authors_response["authors"][0]
author_slug = test_author["slug"]
# Get author by slug
result = await get_author_by_slug(author_slug)
response = json.loads(result)
# Verify response
assert "authors" in response
assert len(response["authors"]) == 1
author = response["authors"][0]
assert author["slug"] == author_slug
assert author["name"] == test_author["name"]
async def test_get_author_by_nonexistent_id(self, mcp_server):
"""Test getting an author with non-existent ID returns proper error."""
from ghost_mcp.tools.content.authors import get_author_by_id
with pytest.raises(Exception) as exc_info:
await get_author_by_id("nonexistent-id")
# Verify we get an appropriate error
assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower()
async def test_get_author_by_nonexistent_slug(self, mcp_server):
"""Test getting an author with non-existent slug returns proper error."""
from ghost_mcp.tools.content.authors import get_author_by_slug
with pytest.raises(Exception) as exc_info:
await get_author_by_slug("nonexistent-slug")
# Verify we get an appropriate error
assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower()
async def test_author_fields_structure(self, mcp_server):
"""Test that authors have expected field structure."""
from ghost_mcp.tools.content.authors import get_authors
# Get authors
result = await get_authors()
response = json.loads(result)
if not response["authors"]:
pytest.skip("No authors available for testing")
author = response["authors"][0]
# Verify essential fields are present
essential_fields = [
"id", "name", "slug", "email", "created_at", "updated_at", "url"
]
for field in essential_fields:
assert field in author, f"Field '{field}' missing from author"
# Verify data types
assert isinstance(author["id"], str)
assert isinstance(author["name"], str)
assert isinstance(author["slug"], str)
assert isinstance(author["email"], str)
async def test_author_with_posts_count(self, mcp_server, sample_published_post):
"""Test author post count when author has posts."""
from ghost_mcp.tools.content.authors import get_authors
# Get authors with post count
result = await get_authors(include="count.posts")
response = json.loads(result)
if not response["authors"]:
pytest.skip("No authors available for testing")
# Find an author with posts
authors_with_posts = [
author for author in response["authors"]
if author.get("count", {}).get("posts", 0) > 0
]
# There should be at least one author with posts (the author of our sample post)
assert len(authors_with_posts) > 0
author_with_posts = authors_with_posts[0]
assert author_with_posts["count"]["posts"] > 0
async def test_author_profile_fields(self, mcp_server):
"""Test that authors include profile-related fields."""
from ghost_mcp.tools.content.authors import get_authors
# Get authors
result = await get_authors()
response = json.loads(result)
if not response["authors"]:
pytest.skip("No authors available for testing")
author = response["authors"][0]
# These fields may be null but should be present
profile_fields = [
"bio", "website", "location", "facebook", "twitter",
"profile_image", "cover_image"
]
for field in profile_fields:
assert field in author
async def test_default_ghost_author_exists(self, mcp_server):
"""Test that the default Ghost author exists."""
from ghost_mcp.tools.content.authors import get_authors
# Get all authors
result = await get_authors()
response = json.loads(result)
# Should have at least one author (the default Ghost author)
assert len(response["authors"]) >= 1
# Look for the default Ghost author
ghost_authors = [
author for author in response["authors"]
if "ghost" in author["name"].lower() or "ghost" in author["slug"].lower()
]
# There should be at least one Ghost-related author
assert len(ghost_authors) >= 1
async def test_author_url_format(self, mcp_server):
"""Test that author URLs follow expected format."""
from ghost_mcp.tools.content.authors import get_authors
# Get authors
result = await get_authors()
response = json.loads(result)
if not response["authors"]:
pytest.skip("No authors available for testing")
author = response["authors"][0]
# Verify URL format
assert "url" in author
author_url = author["url"]
assert author_url.startswith("http")
assert "/author/" in author_url
assert author["slug"] in author_url
async def test_authors_unique_slugs(self, mcp_server):
"""Test that all authors have unique slugs."""
from ghost_mcp.tools.content.authors import get_authors
# Get all authors
result = await get_authors()
response = json.loads(result)
if len(response["authors"]) <= 1:
pytest.skip("Not enough authors to test uniqueness")
# Extract all slugs
slugs = [author["slug"] for author in response["authors"]]
# Verify uniqueness
assert len(slugs) == len(set(slugs)), "Author slugs are not unique"
async def test_authors_unique_emails(self, mcp_server):
"""Test that all authors have unique email addresses."""
from ghost_mcp.tools.content.authors import get_authors
# Get all authors
result = await get_authors()
response = json.loads(result)
if len(response["authors"]) <= 1:
pytest.skip("Not enough authors to test uniqueness")
# Extract all emails
emails = [author["email"] for author in response["authors"]]
# Verify uniqueness
assert len(emails) == len(set(emails)), "Author emails are not unique"

View file

@ -0,0 +1,85 @@
"""End-to-end tests for Ghost connection functionality."""
import json
import pytest
from .conftest import BaseE2ETest
@pytest.mark.e2e
class TestConnectionE2E(BaseE2ETest):
"""Test Ghost connection functionality end-to-end."""
async def test_check_ghost_connection_success(self, mcp_server):
"""Test successful Ghost connection check."""
# Call the MCP tool
result = await self.call_mcp_tool(mcp_server, "check_ghost_connection")
# Parse the JSON result
status = json.loads(result)
# Verify connection status
assert status["ghost_url"] == "http://localhost:2368"
assert status["content_api_configured"] is True
assert status["admin_api_configured"] is True
assert status["connection_test"] == "completed"
assert status["content_api_status"] == "connected"
assert status["admin_api_status"] == "connected"
async def test_check_ghost_connection_config_fields(self, mcp_server):
"""Test that connection check returns all expected configuration fields."""
result = await self.call_mcp_tool(mcp_server, "check_ghost_connection")
status = json.loads(result)
# Verify all expected fields are present
expected_fields = {
"ghost_url",
"content_api_configured",
"admin_api_configured",
"mode",
"connection_test",
"content_api_status",
"admin_api_status"
}
assert set(status.keys()) >= expected_fields
async def test_connection_with_ghost_client(self, ghost_client):
"""Test direct connection using Ghost client."""
# Test Content API connection
response = await ghost_client._make_request("GET", "settings/", api_type="content")
assert "settings" in response
# Test Admin API connection
response = await ghost_client._make_request("GET", "site/", api_type="admin")
assert "site" in response
async def test_ghost_instance_health(self, ghost_client):
"""Test that Ghost instance is healthy and responsive."""
# Test getting site settings to verify the instance is functional
response = await ghost_client._make_request("GET", "settings/", api_type="content")
# Verify we get expected settings structure
assert "settings" in response
settings = response["settings"]
# Check for some expected settings
setting_keys = [setting["key"] for setting in settings]
expected_keys = ["title", "description", "url"]
for key in expected_keys:
assert key in setting_keys
async def test_api_version_compatibility(self, ghost_client):
"""Test that the API version is compatible."""
# Make a request to test API version response headers
response = await ghost_client._make_request("GET", "site/", api_type="admin")
# Verify we get a proper response structure
assert "site" in response
site = response["site"]
# Check for expected site fields
expected_fields = ["title", "url", "version"]
for field in expected_fields:
assert field in site

318
tests/e2e/test_e2e_pages.py Normal file
View file

@ -0,0 +1,318 @@
"""End-to-end tests for Ghost pages functionality."""
import json
import pytest
from .conftest import BaseE2ETest
@pytest.mark.e2e
class TestPagesContentAPIE2E(BaseE2ETest):
"""Test pages Content API functionality end-to-end."""
async def test_get_pages(self, mcp_server, sample_page):
"""Test getting pages."""
from ghost_mcp.tools.content.pages import get_pages
# Get pages
result = await get_pages()
response = json.loads(result)
# Verify response structure
assert "pages" in response
assert "meta" in response
assert isinstance(response["pages"], list)
async def test_get_pages_with_pagination(self, mcp_server):
"""Test getting pages with pagination parameters."""
from ghost_mcp.tools.content.pages import get_pages
# Get pages with limit
result = await get_pages(limit=5)
response = json.loads(result)
assert "pages" in response
assert len(response["pages"]) <= 5
# Test pagination metadata
assert "meta" in response
assert "pagination" in response["meta"]
async def test_get_pages_with_include_fields(self, mcp_server):
"""Test getting pages with include fields."""
from ghost_mcp.tools.content.pages import get_pages
# Get pages with tags and authors included
result = await get_pages(include="tags,authors")
response = json.loads(result)
# Verify pages structure
assert "pages" in response
if response["pages"]:
page = response["pages"][0]
assert "tags" in page
assert "authors" in page
async def test_get_page_by_id(self, mcp_server, sample_page):
"""Test getting a page by ID."""
from ghost_mcp.tools.content.pages import get_page_by_id
# Get page by ID
result = await get_page_by_id(sample_page["id"])
response = json.loads(result)
# Verify response
assert "pages" in response
assert len(response["pages"]) == 1
page = response["pages"][0]
assert page["id"] == sample_page["id"]
assert page["title"] == sample_page["title"]
async def test_get_page_by_slug(self, mcp_server, sample_page):
"""Test getting a page by slug."""
from ghost_mcp.tools.content.pages import get_page_by_slug
# Get page by slug
result = await get_page_by_slug(sample_page["slug"])
response = json.loads(result)
# Verify response
assert "pages" in response
assert len(response["pages"]) == 1
page = response["pages"][0]
assert page["slug"] == sample_page["slug"]
assert page["title"] == sample_page["title"]
async def test_get_page_by_nonexistent_id(self, mcp_server):
"""Test getting a page with non-existent ID returns proper error."""
from ghost_mcp.tools.content.pages import get_page_by_id
with pytest.raises(Exception) as exc_info:
await get_page_by_id("nonexistent-id")
# Verify we get an appropriate error
assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower()
async def test_get_page_by_nonexistent_slug(self, mcp_server):
"""Test getting a page with non-existent slug returns proper error."""
from ghost_mcp.tools.content.pages import get_page_by_slug
with pytest.raises(Exception) as exc_info:
await get_page_by_slug("nonexistent-slug")
# Verify we get an appropriate error
assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower()
@pytest.mark.e2e
@pytest.mark.admin
class TestPagesAdminAPIE2E(BaseE2ETest):
"""Test pages Admin API functionality end-to-end."""
async def test_create_page_draft(self, mcp_server, test_page_data, cleanup_test_content):
"""Test creating a draft page."""
from ghost_mcp.tools.admin.pages import create_page
# Create page
result = await create_page(
title=test_page_data["title"],
content=test_page_data["content"],
content_format=test_page_data["content_format"],
status=test_page_data["status"]
)
response = json.loads(result)
# Verify response
assert "pages" in response
assert len(response["pages"]) == 1
page = response["pages"][0]
assert page["title"] == test_page_data["title"]
assert page["status"] == "draft"
assert "id" in page
assert "slug" in page
# Track for cleanup
cleanup_test_content["track_page"](page["id"])
async def test_create_page_published(self, mcp_server, test_page_data, cleanup_test_content):
"""Test creating a published page."""
from ghost_mcp.tools.admin.pages import create_page
# Create published page
result = await create_page(
title=test_page_data["title"],
content=test_page_data["content"],
content_format=test_page_data["content_format"],
status="published"
)
response = json.loads(result)
# Verify response
assert "pages" in response
page = response["pages"][0]
assert page["status"] == "published"
assert "published_at" in page
assert page["published_at"] is not None
# Track for cleanup
cleanup_test_content["track_page"](page["id"])
async def test_create_page_with_custom_slug(self, mcp_server, test_page_data, cleanup_test_content):
"""Test creating a page with custom slug."""
from ghost_mcp.tools.admin.pages import create_page
custom_slug = "custom-test-page-slug"
# Create page with custom slug
result = await create_page(
title=test_page_data["title"],
content=test_page_data["content"],
content_format=test_page_data["content_format"],
status=test_page_data["status"],
slug=custom_slug
)
response = json.loads(result)
# Verify custom slug
page = response["pages"][0]
assert page["slug"] == custom_slug
# Track for cleanup
cleanup_test_content["track_page"](page["id"])
async def test_create_page_with_special_characters(self, mcp_server, cleanup_test_content):
"""Test creating a page with special characters in title and content."""
from ghost_mcp.tools.admin.pages import create_page
# Title and content with special characters
special_title = "Test Page with Special Characters: éñ中文 📄"
special_content = json.dumps({
"root": {
"children": [
{
"children": [
{
"detail": 0,
"format": 0,
"mode": "normal",
"style": "",
"text": "Page content with émojis 📖 and unicode: 中文字符",
"type": "text",
"version": 1
}
],
"direction": "ltr",
"format": "",
"indent": 0,
"type": "paragraph",
"version": 1
}
],
"direction": "ltr",
"format": "",
"indent": 0,
"type": "root",
"version": 1
}
})
# Create page with special characters
result = await create_page(
title=special_title,
content=special_content,
content_format="lexical",
status="draft"
)
response = json.loads(result)
# Verify special characters are preserved
page = response["pages"][0]
assert page["title"] == special_title
# Track for cleanup
cleanup_test_content["track_page"](page["id"])
async def test_create_page_minimal_data(self, mcp_server, cleanup_test_content):
"""Test creating a page with minimal required data."""
from ghost_mcp.tools.admin.pages import create_page
# Create page with only title
result = await create_page(title="Minimal Test Page")
response = json.loads(result)
# Verify page was created with defaults
page = response["pages"][0]
assert page["title"] == "Minimal Test Page"
assert page["status"] == "draft" # Should default to draft
assert "slug" in page
# Track for cleanup
cleanup_test_content["track_page"](page["id"])
async def test_page_slug_generation(self, mcp_server, cleanup_test_content):
"""Test that page slugs are generated correctly from titles."""
from ghost_mcp.tools.admin.pages import create_page
test_title = "Test Page Title With Spaces And Special-Characters!"
# Create page and check slug generation
result = await create_page(title=test_title)
response = json.loads(result)
page = response["pages"][0]
slug = page["slug"]
# Verify slug is URL-friendly
assert " " not in slug
assert slug.islower()
assert "test-page-title" in slug
# Track for cleanup
cleanup_test_content["track_page"](page["id"])
async def test_create_page_empty_content(self, mcp_server, cleanup_test_content):
"""Test creating a page with empty content."""
from ghost_mcp.tools.admin.pages import create_page
# Create page with empty content
result = await create_page(
title="Empty Content Page",
content="",
status="draft"
)
response = json.loads(result)
# Verify page was created
page = response["pages"][0]
assert page["title"] == "Empty Content Page"
assert "id" in page
# Track for cleanup
cleanup_test_content["track_page"](page["id"])
async def test_pages_vs_posts_distinction(self, mcp_server, sample_page, sample_published_post):
"""Test that pages and posts are properly distinguished."""
from ghost_mcp.tools.content.pages import get_pages
from ghost_mcp.tools.content.posts import get_posts
# Get pages and posts
pages_result = await get_pages()
posts_result = await get_posts()
pages_response = json.loads(pages_result)
posts_response = json.loads(posts_result)
# Get IDs from both
page_ids = [page["id"] for page in pages_response["pages"]]
post_ids = [post["id"] for post in posts_response["posts"]]
# Verify our sample page is in pages, not posts
assert sample_page["id"] in page_ids
assert sample_page["id"] not in post_ids
# Verify our sample post is in posts, not pages
assert sample_published_post["id"] in post_ids
assert sample_published_post["id"] not in page_ids

334
tests/e2e/test_e2e_posts.py Normal file
View file

@ -0,0 +1,334 @@
"""End-to-end tests for Ghost posts functionality."""
import json
import pytest
from .conftest import BaseE2ETest
@pytest.mark.e2e
class TestPostsContentAPIE2E(BaseE2ETest):
"""Test posts Content API functionality end-to-end."""
async def test_get_posts(self, mcp_server, sample_published_post):
"""Test getting published posts."""
from ghost_mcp.tools.content.posts import get_posts
# Get posts
result = await get_posts()
response = json.loads(result)
# Verify response structure
assert "posts" in response
assert "meta" in response
assert isinstance(response["posts"], list)
# Verify our test post appears in the list
post_titles = [post["title"] for post in response["posts"]]
assert sample_published_post["title"] in post_titles
async def test_get_posts_with_pagination(self, mcp_server, sample_published_post):
"""Test getting posts with pagination parameters."""
from ghost_mcp.tools.content.posts import get_posts
# Get posts with limit
result = await get_posts(limit=5)
response = json.loads(result)
assert "posts" in response
assert len(response["posts"]) <= 5
# Test pagination metadata
assert "meta" in response
assert "pagination" in response["meta"]
async def test_get_posts_with_include_fields(self, mcp_server, sample_published_post):
"""Test getting posts with include fields."""
from ghost_mcp.tools.content.posts import get_posts
# Get posts with tags and authors included
result = await get_posts(include="tags,authors")
response = json.loads(result)
# Verify posts include tags and authors
if response["posts"]:
post = response["posts"][0]
assert "tags" in post
assert "authors" in post
async def test_get_post_by_id(self, mcp_server, sample_published_post):
"""Test getting a post by ID."""
from ghost_mcp.tools.content.posts import get_post_by_id
# Get post by ID
result = await get_post_by_id(sample_published_post["id"])
response = json.loads(result)
# Verify response
assert "posts" in response
assert len(response["posts"]) == 1
post = response["posts"][0]
assert post["id"] == sample_published_post["id"]
assert post["title"] == sample_published_post["title"]
async def test_get_post_by_slug(self, mcp_server, sample_published_post):
"""Test getting a post by slug."""
from ghost_mcp.tools.content.posts import get_post_by_slug
# Get post by slug
result = await get_post_by_slug(sample_published_post["slug"])
response = json.loads(result)
# Verify response
assert "posts" in response
assert len(response["posts"]) == 1
post = response["posts"][0]
assert post["slug"] == sample_published_post["slug"]
assert post["title"] == sample_published_post["title"]
async def test_search_posts(self, mcp_server, sample_published_post):
"""Test searching posts."""
from ghost_mcp.tools.content.posts import search_posts
# Extract a unique word from the test post title
search_term = sample_published_post["title"].split()[0]
# Search for posts
result = await search_posts(search_term)
response = json.loads(result)
# Verify response
assert "posts" in response
assert isinstance(response["posts"], list)
# Verify our test post appears in search results
if response["posts"]:
post_titles = [post["title"] for post in response["posts"]]
matching_posts = [title for title in post_titles if search_term in title]
assert len(matching_posts) > 0
async def test_get_post_by_nonexistent_id(self, mcp_server):
"""Test getting a post with non-existent ID returns proper error."""
with pytest.raises(Exception) as exc_info:
await self.call_mcp_tool(mcp_server, "get_post_by_id", post_id="nonexistent-id")
# Verify we get an appropriate error
assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower()
async def test_get_post_by_nonexistent_slug(self, mcp_server):
"""Test getting a post with non-existent slug returns proper error."""
with pytest.raises(Exception) as exc_info:
await self.call_mcp_tool(mcp_server, "get_post_by_slug", slug="nonexistent-slug")
# Verify we get an appropriate error
assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower()
@pytest.mark.e2e
@pytest.mark.admin
class TestPostsAdminAPIE2E(BaseE2ETest):
"""Test posts Admin API functionality end-to-end."""
async def test_create_post_draft(self, mcp_server, test_post_data, cleanup_test_content):
"""Test creating a draft post."""
# Create post
result = await self.call_mcp_tool(
mcp_server, "create_post",
title=test_post_data["title"],
content=test_post_data["content"],
content_format=test_post_data["content_format"],
status=test_post_data["status"]
)
response = json.loads(result)
# Verify response
assert "posts" in response
assert len(response["posts"]) == 1
post = response["posts"][0]
assert post["title"] == test_post_data["title"]
assert post["status"] == "draft"
assert "id" in post
# Track for cleanup
cleanup_test_content["track_post"](post["id"])
async def test_create_post_published(self, mcp_server, test_post_data, cleanup_test_content):
"""Test creating a published post."""
from ghost_mcp.tools.admin.posts import create_post
# Create published post
result = await create_post(
title=test_post_data["title"],
content=test_post_data["content"],
content_format=test_post_data["content_format"],
status="published"
)
response = json.loads(result)
# Verify response
assert "posts" in response
post = response["posts"][0]
assert post["status"] == "published"
assert "published_at" in post
assert post["published_at"] is not None
# Track for cleanup
cleanup_test_content["track_post"](post["id"])
async def test_create_post_with_metadata(self, mcp_server, test_post_data, cleanup_test_content):
"""Test creating a post with metadata fields."""
from ghost_mcp.tools.admin.posts import create_post
# Create post with metadata
result = await create_post(
title=test_post_data["title"],
content=test_post_data["content"],
content_format=test_post_data["content_format"],
status=test_post_data["status"],
excerpt="Test excerpt for e2e testing",
featured=True,
meta_title="Test Meta Title",
meta_description="Test meta description"
)
response = json.loads(result)
# Verify metadata
post = response["posts"][0]
assert post["excerpt"] == "Test excerpt for e2e testing"
assert post["featured"] is True
assert post["meta_title"] == "Test Meta Title"
assert post["meta_description"] == "Test meta description"
# Track for cleanup
cleanup_test_content["track_post"](post["id"])
async def test_update_post(self, mcp_server, sample_post, cleanup_test_content):
"""Test updating a post."""
from ghost_mcp.tools.admin.posts import update_post
# Update the post
new_title = f"Updated {sample_post['title']}"
result = await update_post(
post_id=sample_post["id"],
title=new_title,
status="published"
)
response = json.loads(result)
# Verify update
post = response["posts"][0]
assert post["title"] == new_title
assert post["status"] == "published"
assert post["id"] == sample_post["id"]
async def test_delete_post(self, mcp_server, sample_post, cleanup_test_content):
"""Test deleting a post."""
from ghost_mcp.tools.admin.posts import delete_post
post_id = sample_post["id"]
# Delete the post
result = await delete_post(post_id)
# Verify deletion message
assert "deleted" in result.lower() or "success" in result.lower()
# Verify post is actually deleted by trying to get it
from ghost_mcp.tools.content.posts import get_post_by_id
with pytest.raises(Exception):
await get_post_by_id(post_id)
# Remove from cleanup tracking since it's already deleted
if post_id in cleanup_test_content:
cleanup_test_content.remove(post_id)
async def test_get_admin_posts_includes_drafts(self, mcp_server, sample_post):
"""Test that admin posts endpoint includes draft posts."""
from ghost_mcp.tools.admin.posts import get_admin_posts
# Get admin posts
result = await get_admin_posts()
response = json.loads(result)
# Verify response includes posts
assert "posts" in response
assert isinstance(response["posts"], list)
# Find our draft post
post_ids = [post["id"] for post in response["posts"]]
assert sample_post["id"] in post_ids
# Verify we can see draft status
draft_posts = [post for post in response["posts"] if post["status"] == "draft"]
assert len(draft_posts) > 0
async def test_create_post_with_special_characters(self, mcp_server, cleanup_test_content):
"""Test creating a post with special characters in title and content."""
from ghost_mcp.tools.admin.posts import create_post
# Title and content with special characters
special_title = "Test Post with Special Characters: éñ中文 🚀"
special_content = json.dumps({
"root": {
"children": [
{
"children": [
{
"detail": 0,
"format": 0,
"mode": "normal",
"style": "",
"text": "Content with émojis 🎉 and unicode: 中文字符",
"type": "text",
"version": 1
}
],
"direction": "ltr",
"format": "",
"indent": 0,
"type": "paragraph",
"version": 1
}
],
"direction": "ltr",
"format": "",
"indent": 0,
"type": "root",
"version": 1
}
})
# Create post with special characters
result = await create_post(
title=special_title,
content=special_content,
content_format="lexical",
status="draft"
)
response = json.loads(result)
# Verify special characters are preserved
post = response["posts"][0]
assert post["title"] == special_title
# Track for cleanup
cleanup_test_content["track_post"](post["id"])
async def test_update_post_nonexistent(self, mcp_server):
"""Test updating a non-existent post returns proper error."""
with pytest.raises(Exception) as exc_info:
await self.call_mcp_tool(mcp_server, "update_post", post_id="nonexistent-id", title="New Title")
# Verify we get an appropriate error
assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower()
async def test_delete_post_nonexistent(self, mcp_server):
"""Test deleting a non-existent post returns proper error."""
with pytest.raises(Exception) as exc_info:
await self.call_mcp_tool(mcp_server, "delete_post", post_id="nonexistent-id")
# Verify we get an appropriate error
assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower()

View file

@ -0,0 +1,288 @@
"""End-to-end tests for Ghost search functionality."""
import json
import pytest
from .conftest import BaseE2ETest
@pytest.mark.e2e
class TestSearchE2E(BaseE2ETest):
"""Test search functionality end-to-end."""
async def test_search_posts_basic(self, mcp_server, sample_published_post):
"""Test basic post search functionality."""
from ghost_mcp.tools.content.posts import search_posts
# Extract a searchable term from the test post title
search_term = sample_published_post["title"].split()[0]
# Search for posts
result = await search_posts(search_term)
response = json.loads(result)
# Verify response structure
assert "posts" in response
assert isinstance(response["posts"], list)
# Should find our test post
post_titles = [post["title"] for post in response["posts"]]
matching_posts = [title for title in post_titles if search_term in title]
assert len(matching_posts) > 0, f"Should find posts matching '{search_term}'"
async def test_search_posts_with_limit(self, mcp_server, sample_published_post):
"""Test post search with result limit."""
from ghost_mcp.tools.content.posts import search_posts
# Use a broad search term
search_term = "test"
# Search with limit
result = await search_posts(search_term, limit=3)
response = json.loads(result)
# Verify limit is respected
assert "posts" in response
assert len(response["posts"]) <= 3
async def test_search_posts_case_insensitive(self, mcp_server, sample_published_post):
"""Test that post search is case insensitive."""
from ghost_mcp.tools.content.posts import search_posts
# Get a word from the title in different cases
original_word = sample_published_post["title"].split()[0]
lowercase_search = original_word.lower()
uppercase_search = original_word.upper()
# Search with lowercase
lowercase_result = await search_posts(lowercase_search)
lowercase_response = json.loads(lowercase_result)
# Search with uppercase
uppercase_result = await search_posts(uppercase_search)
uppercase_response = json.loads(uppercase_result)
# Both should return similar results
assert len(lowercase_response["posts"]) > 0
assert len(uppercase_response["posts"]) > 0
# Should find the same post regardless of case
lowercase_titles = [post["title"] for post in lowercase_response["posts"]]
uppercase_titles = [post["title"] for post in uppercase_response["posts"]]
test_post_title = sample_published_post["title"]
if test_post_title in lowercase_titles:
assert test_post_title in uppercase_titles
async def test_search_posts_partial_match(self, mcp_server, sample_published_post):
"""Test that post search supports partial word matching."""
from ghost_mcp.tools.content.posts import search_posts
# Get a partial word from the title
full_word = sample_published_post["title"].split()[0]
if len(full_word) > 3:
partial_word = full_word[:3] # First 3 characters
# Search with partial word
result = await search_posts(partial_word)
response = json.loads(result)
# Should find posts containing the partial match
assert "posts" in response
# Note: Not all Ghost installations support partial matching,
# so we just verify the search doesn't error
async def test_search_posts_special_characters(self, mcp_server):
"""Test post search with special characters."""
from ghost_mcp.tools.content.posts import search_posts
# Test search with special characters
special_searches = ["test!", "test-post", "test_post"]
for search_term in special_searches:
try:
result = await search_posts(search_term)
response = json.loads(result)
# Should return valid response structure
assert "posts" in response
assert isinstance(response["posts"], list)
except Exception as e:
# Some special characters might not be supported,
# but should not cause server errors
assert "500" not in str(e), f"Server error with search term '{search_term}'"
async def test_search_posts_empty_query(self, mcp_server):
"""Test post search with empty query."""
from ghost_mcp.tools.content.posts import search_posts
# Search with empty string
try:
result = await search_posts("")
response = json.loads(result)
# Should return empty results or all posts
assert "posts" in response
assert isinstance(response["posts"], list)
except Exception as e:
# Empty query might not be allowed, but should return proper error
assert "400" in str(e) or "validation" in str(e).lower()
async def test_search_posts_nonexistent_term(self, mcp_server):
"""Test post search with non-existent term."""
from ghost_mcp.tools.content.posts import search_posts
# Search for something that shouldn't exist
nonexistent_term = "xyzneverexistingtermabc123"
result = await search_posts(nonexistent_term)
response = json.loads(result)
# Should return empty results
assert "posts" in response
assert len(response["posts"]) == 0
async def test_search_posts_common_words(self, mcp_server):
"""Test post search with common words."""
from ghost_mcp.tools.content.posts import search_posts
# Test with common words
common_words = ["the", "and", "is", "test"]
for word in common_words:
result = await search_posts(word)
response = json.loads(result)
# Should return valid response
assert "posts" in response
assert isinstance(response["posts"], list)
async def test_search_posts_unicode_characters(self, mcp_server):
"""Test post search with unicode characters."""
from ghost_mcp.tools.content.posts import search_posts
# Test with unicode characters
unicode_searches = ["café", "naïve", "résumé", "中文"]
for search_term in unicode_searches:
try:
result = await search_posts(search_term)
response = json.loads(result)
# Should return valid response structure
assert "posts" in response
assert isinstance(response["posts"], list)
except Exception as e:
# Unicode might not be fully supported in all Ghost configurations
# but should not cause server errors
assert "500" not in str(e), f"Server error with unicode search '{search_term}'"
async def test_search_posts_long_query(self, mcp_server):
"""Test post search with very long query."""
from ghost_mcp.tools.content.posts import search_posts
# Very long search query
long_query = "this is a very long search query that tests the system's ability to handle long search terms without breaking or causing performance issues" * 3
try:
result = await search_posts(long_query)
response = json.loads(result)
# Should handle long queries gracefully
assert "posts" in response
assert isinstance(response["posts"], list)
except Exception as e:
# Long queries might be rejected, but should return proper error
assert "400" in str(e) or "413" in str(e) or "length" in str(e).lower()
async def test_search_posts_multiple_words(self, mcp_server, sample_published_post):
"""Test post search with multiple words."""
from ghost_mcp.tools.content.posts import search_posts
# Get multiple words from the title
title_words = sample_published_post["title"].split()
if len(title_words) >= 2:
# Search with first two words
multi_word_search = f"{title_words[0]} {title_words[1]}"
result = await search_posts(multi_word_search)
response = json.loads(result)
# Should return valid results
assert "posts" in response
assert isinstance(response["posts"], list)
async def test_search_posts_pagination_metadata(self, mcp_server, sample_published_post):
"""Test that search results include proper pagination metadata."""
from ghost_mcp.tools.content.posts import search_posts
# Search for posts
search_term = "test"
result = await search_posts(search_term, limit=5)
response = json.loads(result)
# Should have posts array
assert "posts" in response
# May or may not have meta depending on implementation,
# but if present should be properly structured
if "meta" in response:
meta = response["meta"]
assert isinstance(meta, dict)
if "pagination" in meta:
pagination = meta["pagination"]
assert isinstance(pagination, dict)
async def test_search_posts_include_fields(self, mcp_server, sample_published_post):
"""Test search with include fields parameter."""
from ghost_mcp.tools.content.posts import search_posts
# Extract search term
search_term = sample_published_post["title"].split()[0]
# Search with include fields
result = await search_posts(search_term, include="tags,authors")
response = json.loads(result)
# Verify include fields work in search
if response["posts"]:
post = response["posts"][0]
assert "tags" in post
assert "authors" in post
async def test_search_posts_response_structure(self, mcp_server, sample_published_post):
"""Test that search results have proper post structure."""
from ghost_mcp.tools.content.posts import search_posts
# Search for posts
search_term = sample_published_post["title"].split()[0]
result = await search_posts(search_term)
response = json.loads(result)
# Verify each post has essential fields
if response["posts"]:
post = response["posts"][0]
essential_fields = ["id", "title", "slug", "status", "created_at", "updated_at", "url"]
for field in essential_fields:
assert field in post, f"Search result should include '{field}'"
async def test_search_only_published_posts(self, mcp_server, sample_post, sample_published_post):
"""Test that search only returns published posts, not drafts."""
from ghost_mcp.tools.content.posts import search_posts
# Search with a term that might match both posts
search_term = "test"
result = await search_posts(search_term)
response = json.loads(result)
# Verify all returned posts are published
for post in response["posts"]:
assert post["status"] == "published", "Search should only return published posts"
# Verify our published post might be in results
published_post_ids = [post["id"] for post in response["posts"]]
# Our draft post should NOT be in search results
assert sample_post["id"] not in published_post_ids, "Draft posts should not appear in search"

View file

@ -0,0 +1,292 @@
"""End-to-end tests for Ghost settings functionality."""
import json
import pytest
from .conftest import BaseE2ETest
@pytest.mark.e2e
class TestSettingsContentAPIE2E(BaseE2ETest):
"""Test settings Content API functionality end-to-end."""
async def test_get_settings(self, mcp_server):
"""Test getting public settings."""
from ghost_mcp.tools.content.settings import get_settings
# Get settings
result = await get_settings()
response = json.loads(result)
# Verify response structure
assert "settings" in response
assert isinstance(response["settings"], list)
# Should have multiple settings
assert len(response["settings"]) > 0
# Verify settings structure
setting = response["settings"][0]
essential_fields = ["key", "value"]
for field in essential_fields:
assert field in setting
async def test_get_settings_essential_keys(self, mcp_server):
"""Test that essential settings keys are present."""
from ghost_mcp.tools.content.settings import get_settings
# Get settings
result = await get_settings()
response = json.loads(result)
# Extract all setting keys
setting_keys = [setting["key"] for setting in response["settings"]]
# Essential settings that should be present
essential_keys = [
"title",
"description",
"url",
"timezone",
"locale"
]
# Verify essential keys are present
for key in essential_keys:
assert key in setting_keys, f"Essential setting '{key}' not found"
async def test_get_settings_data_types(self, mcp_server):
"""Test that settings have correct data types."""
from ghost_mcp.tools.content.settings import get_settings
# Get settings
result = await get_settings()
response = json.loads(result)
# Check data types for each setting
for setting in response["settings"]:
assert isinstance(setting["key"], str), f"Setting key should be string: {setting}"
# Value can be string, bool, or null
assert setting["value"] is None or isinstance(setting["value"], (str, bool, int))
async def test_get_settings_site_title(self, mcp_server):
"""Test that site title setting is accessible."""
from ghost_mcp.tools.content.settings import get_settings
# Get settings
result = await get_settings()
response = json.loads(result)
# Find title setting
title_settings = [s for s in response["settings"] if s["key"] == "title"]
assert len(title_settings) == 1, "Should have exactly one title setting"
title_setting = title_settings[0]
assert isinstance(title_setting["value"], str)
assert len(title_setting["value"]) > 0, "Site title should not be empty"
async def test_get_settings_site_url(self, mcp_server):
"""Test that site URL setting is accessible and valid."""
from ghost_mcp.tools.content.settings import get_settings
# Get settings
result = await get_settings()
response = json.loads(result)
# Find url setting
url_settings = [s for s in response["settings"] if s["key"] == "url"]
assert len(url_settings) == 1, "Should have exactly one url setting"
url_setting = url_settings[0]
assert isinstance(url_setting["value"], str)
assert url_setting["value"].startswith("http"), "Site URL should start with http"
assert "localhost:2368" in url_setting["value"], "Should be localhost test instance"
async def test_get_site_info(self, mcp_server):
"""Test getting basic site information."""
from ghost_mcp.tools.content.settings import get_site_info
# Get site info
result = await get_site_info()
response = json.loads(result)
# Verify response structure
assert "site" in response
site = response["site"]
# Verify essential site info fields
essential_fields = ["title", "url", "version"]
for field in essential_fields:
assert field in site, f"Site info should include '{field}'"
async def test_get_site_info_title_matches_settings(self, mcp_server):
"""Test that site info title matches settings title."""
from ghost_mcp.tools.content.settings import get_site_info, get_settings
# Get both site info and settings
site_info_result = await get_site_info()
settings_result = await get_settings()
site_info_response = json.loads(site_info_result)
settings_response = json.loads(settings_result)
# Extract titles
site_title = site_info_response["site"]["title"]
title_settings = [s for s in settings_response["settings"] if s["key"] == "title"]
settings_title = title_settings[0]["value"]
# Titles should match
assert site_title == settings_title, "Site info title should match settings title"
async def test_get_site_info_url_matches_settings(self, mcp_server):
"""Test that site info URL matches settings URL."""
from ghost_mcp.tools.content.settings import get_site_info, get_settings
# Get both site info and settings
site_info_result = await get_site_info()
settings_result = await get_settings()
site_info_response = json.loads(site_info_result)
settings_response = json.loads(settings_result)
# Extract URLs
site_url = site_info_response["site"]["url"]
url_settings = [s for s in settings_response["settings"] if s["key"] == "url"]
settings_url = url_settings[0]["value"]
# URLs should match
assert site_url == settings_url, "Site info URL should match settings URL"
async def test_get_site_info_version_format(self, mcp_server):
"""Test that site info includes valid Ghost version."""
from ghost_mcp.tools.content.settings import get_site_info
# Get site info
result = await get_site_info()
response = json.loads(result)
site = response["site"]
version = site["version"]
# Version should be a non-empty string
assert isinstance(version, str)
assert len(version) > 0
# Should contain a dot (version format like 5.x.x)
assert "." in version, "Version should be in x.y.z format"
async def test_settings_no_sensitive_data(self, mcp_server):
"""Test that settings don't expose sensitive information."""
from ghost_mcp.tools.content.settings import get_settings
# Get settings
result = await get_settings()
response = json.loads(result)
# Extract all setting keys
setting_keys = [setting["key"] for setting in response["settings"]]
# Keys that should NOT be present in public settings
sensitive_keys = [
"db_password",
"mailgun_api_key",
"admin_api_key",
"content_api_key",
"smtp_password",
"oauth_client_secret"
]
# Verify sensitive keys are not exposed
for sensitive_key in sensitive_keys:
assert sensitive_key not in setting_keys, f"Sensitive key '{sensitive_key}' should not be exposed"
async def test_settings_readonly_access(self, mcp_server):
"""Test that Content API only provides read access to settings."""
from ghost_mcp.tools.content.settings import get_settings
# This test verifies that we can read settings but not modify them
# through the Content API (which is read-only)
# Get settings should work
result = await get_settings()
response = json.loads(result)
# Should return valid settings
assert "settings" in response
assert len(response["settings"]) > 0
# Note: Write operations would be through Admin API, which requires
# separate authentication and is not typically exposed through MCP tools
async def test_get_settings_pagination_metadata(self, mcp_server):
"""Test that settings include proper metadata structure."""
from ghost_mcp.tools.content.settings import get_settings
# Get settings
result = await get_settings()
response = json.loads(result)
# Should have settings array
assert "settings" in response
# May or may not have meta depending on Ghost version,
# but if present should be properly structured
if "meta" in response:
meta = response["meta"]
assert isinstance(meta, dict)
async def test_settings_timezone_format(self, mcp_server):
"""Test that timezone setting is in valid format."""
from ghost_mcp.tools.content.settings import get_settings
# Get settings
result = await get_settings()
response = json.loads(result)
# Find timezone setting
timezone_settings = [s for s in response["settings"] if s["key"] == "timezone"]
if timezone_settings: # timezone might not always be present
timezone_setting = timezone_settings[0]
timezone_value = timezone_setting["value"]
# Should be a string
assert isinstance(timezone_value, str)
# Common timezone formats
valid_formats = [
timezone_value.startswith("Etc/"),
timezone_value.startswith("America/"),
timezone_value.startswith("Europe/"),
timezone_value.startswith("Asia/"),
timezone_value == "UTC",
"/" in timezone_value # General timezone format
]
assert any(valid_formats), f"Invalid timezone format: {timezone_value}"
async def test_settings_locale_format(self, mcp_server):
"""Test that locale setting is in valid format."""
from ghost_mcp.tools.content.settings import get_settings
# Get settings
result = await get_settings()
response = json.loads(result)
# Find locale setting
locale_settings = [s for s in response["settings"] if s["key"] == "locale"]
if locale_settings: # locale might not always be present
locale_setting = locale_settings[0]
locale_value = locale_setting["value"]
# Should be a string
assert isinstance(locale_value, str)
assert len(locale_value) >= 2, "Locale should be at least 2 characters"
# Common locale formats (en, en-US, etc.)
# Should contain only letters, hyphens, and underscores
import re
assert re.match(r'^[a-zA-Z_-]+$', locale_value), f"Invalid locale format: {locale_value}"

330
tests/e2e/test_e2e_tags.py Normal file
View file

@ -0,0 +1,330 @@
"""End-to-end tests for Ghost tags functionality."""
import json
import pytest
from .conftest import BaseE2ETest
@pytest.mark.e2e
class TestTagsContentAPIE2E(BaseE2ETest):
"""Test tags Content API functionality end-to-end."""
async def test_get_tags(self, mcp_server, sample_tag):
"""Test getting tags."""
from ghost_mcp.tools.content.tags import get_tags
# Get tags
result = await get_tags()
response = json.loads(result)
# Verify response structure
assert "tags" in response
assert "meta" in response
assert isinstance(response["tags"], list)
# Verify our test tag appears in the list
tag_names = [tag["name"] for tag in response["tags"]]
assert sample_tag["name"] in tag_names
async def test_get_tags_with_pagination(self, mcp_server):
"""Test getting tags with pagination parameters."""
from ghost_mcp.tools.content.tags import get_tags
# Get tags with limit
result = await get_tags(limit=5)
response = json.loads(result)
assert "tags" in response
assert len(response["tags"]) <= 5
# Test pagination metadata
assert "meta" in response
assert "pagination" in response["meta"]
async def test_get_tags_with_include_count(self, mcp_server):
"""Test getting tags with post count included."""
from ghost_mcp.tools.content.tags import get_tags
# Get tags with count.posts included
result = await get_tags(include="count.posts")
response = json.loads(result)
# Verify tags include count information
if response["tags"]:
tag = response["tags"][0]
assert "count" in tag
assert "posts" in tag["count"]
assert isinstance(tag["count"]["posts"], int)
async def test_get_tags_with_filter(self, mcp_server, sample_tag):
"""Test getting tags with filter."""
from ghost_mcp.tools.content.tags import get_tags
# Filter tags by visibility
result = await get_tags(filter="visibility:public")
response = json.loads(result)
# Verify filtering works
assert "tags" in response
if response["tags"]:
for tag in response["tags"]:
assert tag.get("visibility", "public") == "public"
async def test_get_tags_with_order(self, mcp_server):
"""Test getting tags with custom ordering."""
from ghost_mcp.tools.content.tags import get_tags
# Get tags ordered by name
result = await get_tags(order="name asc")
response = json.loads(result)
# Verify ordering (should be alphabetical)
if len(response["tags"]) > 1:
tag_names = [tag["name"] for tag in response["tags"]]
assert tag_names == sorted(tag_names)
async def test_get_tag_by_id(self, mcp_server, sample_tag):
"""Test getting a tag by ID."""
from ghost_mcp.tools.content.tags import get_tag_by_id
# Get tag by ID
result = await get_tag_by_id(sample_tag["id"])
response = json.loads(result)
# Verify response
assert "tags" in response
assert len(response["tags"]) == 1
tag = response["tags"][0]
assert tag["id"] == sample_tag["id"]
assert tag["name"] == sample_tag["name"]
async def test_get_tag_by_slug(self, mcp_server, sample_tag):
"""Test getting a tag by slug."""
from ghost_mcp.tools.content.tags import get_tag_by_slug
# Get tag by slug
result = await get_tag_by_slug(sample_tag["slug"])
response = json.loads(result)
# Verify response
assert "tags" in response
assert len(response["tags"]) == 1
tag = response["tags"][0]
assert tag["slug"] == sample_tag["slug"]
assert tag["name"] == sample_tag["name"]
async def test_get_tag_by_nonexistent_id(self, mcp_server):
"""Test getting a tag with non-existent ID returns proper error."""
from ghost_mcp.tools.content.tags import get_tag_by_id
with pytest.raises(Exception) as exc_info:
await get_tag_by_id("nonexistent-id")
# Verify we get an appropriate error
assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower()
async def test_get_tag_by_nonexistent_slug(self, mcp_server):
"""Test getting a tag with non-existent slug returns proper error."""
from ghost_mcp.tools.content.tags import get_tag_by_slug
with pytest.raises(Exception) as exc_info:
await get_tag_by_slug("nonexistent-slug")
# Verify we get an appropriate error
assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower()
@pytest.mark.e2e
@pytest.mark.admin
class TestTagsAdminAPIE2E(BaseE2ETest):
"""Test tags Admin API functionality end-to-end."""
async def test_create_tag_basic(self, mcp_server, test_tag_data, cleanup_test_content):
"""Test creating a basic tag."""
from ghost_mcp.tools.admin.tags import create_tag
# Create tag
result = await create_tag(
name=test_tag_data["name"],
description=test_tag_data["description"]
)
response = json.loads(result)
# Verify response
assert "tags" in response
assert len(response["tags"]) == 1
tag = response["tags"][0]
assert tag["name"] == test_tag_data["name"]
assert tag["description"] == test_tag_data["description"]
assert "id" in tag
assert "slug" in tag
# Track for cleanup
cleanup_test_content["track_tag"](tag["id"])
async def test_create_tag_minimal(self, mcp_server, cleanup_test_content):
"""Test creating a tag with minimal data (name only)."""
from ghost_mcp.tools.admin.tags import create_tag
tag_name = "minimal-test-tag"
# Create tag with only name
result = await create_tag(name=tag_name)
response = json.loads(result)
# Verify tag was created
tag = response["tags"][0]
assert tag["name"] == tag_name
assert tag["description"] == "" # Should default to empty
assert "slug" in tag
# Track for cleanup
cleanup_test_content["track_tag"](tag["id"])
async def test_create_tag_with_special_characters(self, mcp_server, cleanup_test_content):
"""Test creating a tag with special characters."""
from ghost_mcp.tools.admin.tags import create_tag
special_name = "Special Tag éñ中文 🏷️"
special_description = "Description with émojis 🎯 and unicode: 中文字符"
# Create tag with special characters
result = await create_tag(
name=special_name,
description=special_description
)
response = json.loads(result)
# Verify special characters are preserved
tag = response["tags"][0]
assert tag["name"] == special_name
assert tag["description"] == special_description
# Track for cleanup
cleanup_test_content["track_tag"](tag["id"])
async def test_tag_slug_generation(self, mcp_server, cleanup_test_content):
"""Test that tag slugs are generated correctly from names."""
from ghost_mcp.tools.admin.tags import create_tag
tag_name = "Test Tag With Spaces And Special-Characters!"
# Create tag and check slug generation
result = await create_tag(name=tag_name)
response = json.loads(result)
tag = response["tags"][0]
slug = tag["slug"]
# Verify slug is URL-friendly
assert " " not in slug
assert slug.islower()
assert "test-tag-with-spaces" in slug
# Track for cleanup
cleanup_test_content["track_tag"](tag["id"])
async def test_create_duplicate_tag_name(self, mcp_server, sample_tag):
"""Test creating a tag with duplicate name returns proper error."""
from ghost_mcp.tools.admin.tags import create_tag
# Try to create a tag with the same name as sample_tag
with pytest.raises(Exception) as exc_info:
await create_tag(name=sample_tag["name"])
# Verify we get an appropriate error
error_msg = str(exc_info.value).lower()
assert "duplicate" in error_msg or "already exists" in error_msg or "422" in str(exc_info.value)
async def test_create_tag_empty_name(self, mcp_server):
"""Test creating a tag with empty name returns proper error."""
from ghost_mcp.tools.admin.tags import create_tag
# Try to create a tag with empty name
with pytest.raises(Exception) as exc_info:
await create_tag(name="")
# Verify we get a validation error
error_msg = str(exc_info.value).lower()
assert "validation" in error_msg or "required" in error_msg or "400" in str(exc_info.value)
async def test_tag_visibility_public_by_default(self, mcp_server, cleanup_test_content):
"""Test that tags are public by default."""
from ghost_mcp.tools.admin.tags import create_tag
# Create a tag
result = await create_tag(name="public-visibility-test")
response = json.loads(result)
tag = response["tags"][0]
# Tags should be public by default (visibility field might be omitted or set to "public")
assert tag.get("visibility", "public") == "public"
# Track for cleanup
cleanup_test_content["track_tag"](tag["id"])
async def test_tag_creation_fields(self, mcp_server, cleanup_test_content):
"""Test that created tags have all expected fields."""
from ghost_mcp.tools.admin.tags import create_tag
# Create a tag
result = await create_tag(
name="fields-test-tag",
description="Test description for field validation"
)
response = json.loads(result)
tag = response["tags"][0]
# Verify essential fields are present
essential_fields = ["id", "name", "slug", "description", "created_at", "updated_at"]
for field in essential_fields:
assert field in tag
# Verify data types
assert isinstance(tag["id"], str)
assert isinstance(tag["name"], str)
assert isinstance(tag["slug"], str)
assert isinstance(tag["description"], str)
# Track for cleanup
cleanup_test_content["track_tag"](tag["id"])
async def test_long_tag_name(self, mcp_server, cleanup_test_content):
"""Test creating a tag with a very long name."""
from ghost_mcp.tools.admin.tags import create_tag
# Create a tag with a long name (but within reasonable limits)
long_name = "This is a very long tag name that tests the system's ability to handle longer tag names without breaking"
result = await create_tag(name=long_name)
response = json.loads(result)
tag = response["tags"][0]
assert tag["name"] == long_name
# Track for cleanup
cleanup_test_content["track_tag"](tag["id"])
async def test_tag_with_long_description(self, mcp_server, cleanup_test_content):
"""Test creating a tag with a very long description."""
from ghost_mcp.tools.admin.tags import create_tag
long_description = ("This is a very long description that tests the system's ability to handle "
"longer tag descriptions without breaking. " * 10)
result = await create_tag(
name="long-description-tag",
description=long_description
)
response = json.loads(result)
tag = response["tags"][0]
assert tag["description"] == long_description
# Track for cleanup
cleanup_test_content["track_tag"](tag["id"])