From 89674f277b6ad0e9554b081c8273b77ab7c53772 Mon Sep 17 00:00:00 2001 From: Luiz Costa Date: Tue, 23 Sep 2025 03:28:35 -0300 Subject: [PATCH] WIP e2e tests --- Makefile | 11 +- pyproject.toml | 7 +- tests/e2e/__init__.py | 1 + tests/e2e/conftest.py | 281 ++++++++++++++++++++++++++ tests/e2e/test_e2e_authors.py | 312 +++++++++++++++++++++++++++++ tests/e2e/test_e2e_connection.py | 85 ++++++++ tests/e2e/test_e2e_pages.py | 318 +++++++++++++++++++++++++++++ tests/e2e/test_e2e_posts.py | 334 +++++++++++++++++++++++++++++++ tests/e2e/test_e2e_search.py | 288 ++++++++++++++++++++++++++ tests/e2e/test_e2e_settings.py | 292 +++++++++++++++++++++++++++ tests/e2e/test_e2e_tags.py | 330 ++++++++++++++++++++++++++++++ 11 files changed, 2257 insertions(+), 2 deletions(-) create mode 100644 tests/e2e/__init__.py create mode 100644 tests/e2e/conftest.py create mode 100644 tests/e2e/test_e2e_authors.py create mode 100644 tests/e2e/test_e2e_connection.py create mode 100644 tests/e2e/test_e2e_pages.py create mode 100644 tests/e2e/test_e2e_posts.py create mode 100644 tests/e2e/test_e2e_search.py create mode 100644 tests/e2e/test_e2e_settings.py create mode 100644 tests/e2e/test_e2e_tags.py diff --git a/Makefile b/Makefile index 426a18e..6c5808a 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # Ghost MCP Development Makefile -.PHONY: help install install-local deps-install-python deps-install-dev deps-deps-install-uv install-pip venv start-ghost stop-ghost restart-ghost setup-tokens test test-unit test-integration test-coverage test-fast test-parallel test-connection clean-test run dev format lint clean logs status check-deps setup docs +.PHONY: help install install-local deps-install-python deps-install-dev deps-deps-install-uv install-pip venv start-ghost stop-ghost restart-ghost setup-tokens test test-unit test-integration test-coverage test-fast test-parallel test-e2e test-connection clean-test run dev format lint clean logs status check-deps setup docs .PHONY: help help: ## Show this help message @@ -114,6 +114,15 @@ test-fast: ## Run tests with fail-fast and short traceback test-parallel: ## Run tests in parallel uv run pytest tests/ -n auto +test-e2e: ## Run end-to-end tests against real Ghost instance + @echo "🧪 Running end-to-end tests..." + @if [ ! -f .env ]; then \ + echo "❌ .env file not found. Run 'make setup-tokens' first"; \ + exit 1; \ + fi + @echo "⚠️ Note: These tests require a running Ghost instance (make start-ghost)" + uv run pytest tests/e2e/ -v -m e2e + test-connection: ## Test Ghost API connectivity @if [ ! -f .env ]; then \ echo "❌ .env file not found. Run 'make setup-tokens' first"; \ diff --git a/pyproject.toml b/pyproject.toml index 91950b0..cab293c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,4 +74,9 @@ python_files = ["test_*.py", "*_test.py"] python_classes = ["Test*"] python_functions = ["test_*"] addopts = "-v --cov=ghost_mcp --cov-report=html --cov-report=term-missing" -asyncio_mode = "auto" \ No newline at end of file +asyncio_mode = "auto" +markers = [ + "e2e: marks tests as end-to-end tests requiring real Ghost instance", + "admin: marks tests as requiring Ghost Admin API access", + "content: marks tests as requiring only Ghost Content API access" +] \ No newline at end of file diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py new file mode 100644 index 0000000..eb639e8 --- /dev/null +++ b/tests/e2e/__init__.py @@ -0,0 +1 @@ +"""End-to-end tests for Ghost MCP tools.""" \ No newline at end of file diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py new file mode 100644 index 0000000..bf86d72 --- /dev/null +++ b/tests/e2e/conftest.py @@ -0,0 +1,281 @@ +"""Fixtures for end-to-end tests.""" + +import asyncio +import json +import os +import uuid +from typing import AsyncGenerator, Dict, List, Any + +import pytest +import httpx + +from ghost_mcp.server import mcp +from ghost_mcp.client import GhostClient +from ghost_mcp.config import config + + +@pytest.fixture(scope="session") +def ensure_ghost_running(): + """Ensure Ghost container is running before tests.""" + import subprocess + + try: + # Check if Ghost is accessible + response = httpx.get("http://localhost:2368", timeout=5) + if response.status_code != 200: + raise Exception("Ghost not accessible") + except Exception: + # Try to start Ghost containers + result = subprocess.run( + ["docker", "compose", "ps", "-q", "ghost"], + capture_output=True, + text=True, + cwd="/var/home/luiz/projects/thenets/ghost-mcp" + ) + + if not result.stdout.strip(): + pytest.skip("Ghost container not running. Run 'make start-ghost' first.") + + # Verify environment configuration + if not os.getenv("GHOST_CONTENT_API_KEY") or not os.getenv("GHOST_ADMIN_API_KEY"): + pytest.skip("Ghost API keys not configured. Run 'make setup-tokens' first.") + + +@pytest.fixture +async def ghost_client() -> AsyncGenerator[GhostClient, None]: + """Provide a Ghost client for tests.""" + async with GhostClient() as client: + yield client + + +@pytest.fixture +def mcp_server(): + """Provide the MCP server instance.""" + return mcp + + +@pytest.fixture +async def test_post_data() -> Dict[str, Any]: + """Provide test data for creating posts.""" + unique_id = str(uuid.uuid4())[:8] + return { + "title": f"Test Post {unique_id}", + "content": json.dumps({ + "root": { + "children": [ + { + "children": [ + { + "detail": 0, + "format": 0, + "mode": "normal", + "style": "", + "text": f"This is a test post content for e2e testing. ID: {unique_id}", + "type": "text", + "version": 1 + } + ], + "direction": "ltr", + "format": "", + "indent": 0, + "type": "paragraph", + "version": 1 + } + ], + "direction": "ltr", + "format": "", + "indent": 0, + "type": "root", + "version": 1 + } + }), + "content_format": "lexical", + "status": "draft" + } + + +@pytest.fixture +async def test_page_data() -> Dict[str, Any]: + """Provide test data for creating pages.""" + unique_id = str(uuid.uuid4())[:8] + return { + "title": f"Test Page {unique_id}", + "content": json.dumps({ + "root": { + "children": [ + { + "children": [ + { + "detail": 0, + "format": 0, + "mode": "normal", + "style": "", + "text": f"This is a test page content for e2e testing. ID: {unique_id}", + "type": "text", + "version": 1 + } + ], + "direction": "ltr", + "format": "", + "indent": 0, + "type": "paragraph", + "version": 1 + } + ], + "direction": "ltr", + "format": "", + "indent": 0, + "type": "root", + "version": 1 + } + }), + "content_format": "lexical", + "status": "draft" + } + + +@pytest.fixture +async def test_tag_data() -> Dict[str, str]: + """Provide test data for creating tags.""" + unique_id = str(uuid.uuid4())[:8] + return { + "name": f"test-tag-{unique_id}", + "description": f"Test tag for e2e testing {unique_id}" + } + + +@pytest.fixture +async def cleanup_test_content(ghost_client: GhostClient): + """Clean up test content after each test.""" + created_posts = [] + created_pages = [] + created_tags = [] + + def track_post(post_id: str): + created_posts.append(post_id) + + def track_page(page_id: str): + created_pages.append(page_id) + + def track_tag(tag_id: str): + created_tags.append(tag_id) + + # Provide tracking functions + yield { + "track_post": track_post, + "track_page": track_page, + "track_tag": track_tag + } + + # Cleanup after test + for post_id in created_posts: + try: + await ghost_client._make_request("DELETE", f"posts/{post_id}/", api_type="admin") + except Exception: + pass # Ignore cleanup errors + + for page_id in created_pages: + try: + await ghost_client._make_request("DELETE", f"pages/{page_id}/", api_type="admin") + except Exception: + pass # Ignore cleanup errors + + for tag_id in created_tags: + try: + await ghost_client._make_request("DELETE", f"tags/{tag_id}/", api_type="admin") + except Exception: + pass # Ignore cleanup errors + + +@pytest.fixture +async def sample_post(ghost_client: GhostClient, test_post_data: Dict[str, Any], cleanup_test_content): + """Create a sample post for testing.""" + # Create a post + response = await ghost_client._make_request( + "POST", + "posts/", + api_type="admin", + json_data={"posts": [test_post_data]} + ) + + post_data = response["posts"][0] + cleanup_test_content["track_post"](post_data["id"]) + + return post_data + + +@pytest.fixture +async def sample_published_post(ghost_client: GhostClient, test_post_data: Dict[str, Any], cleanup_test_content): + """Create a sample published post for testing.""" + # Modify data for published post + test_post_data["status"] = "published" + + # Create a published post + response = await ghost_client._make_request( + "POST", + "posts/", + api_type="admin", + json_data={"posts": [test_post_data]} + ) + + post_data = response["posts"][0] + cleanup_test_content["track_post"](post_data["id"]) + + return post_data + + +@pytest.fixture +async def sample_page(ghost_client: GhostClient, test_page_data: Dict[str, Any], cleanup_test_content): + """Create a sample page for testing.""" + # Create a page + response = await ghost_client._make_request( + "POST", + "pages/", + api_type="admin", + json_data={"pages": [test_page_data]} + ) + + page_data = response["pages"][0] + cleanup_test_content["track_page"](page_data["id"]) + + return page_data + + +@pytest.fixture +async def sample_tag(ghost_client: GhostClient, test_tag_data: Dict[str, str], cleanup_test_content): + """Create a sample tag for testing.""" + # Create a tag + response = await ghost_client._make_request( + "POST", + "tags/", + api_type="admin", + json_data={"tags": [test_tag_data]} + ) + + tag_data = response["tags"][0] + cleanup_test_content["track_tag"](tag_data["id"]) + + return tag_data + + +@pytest.mark.e2e +class BaseE2ETest: + """Base class for e2e tests.""" + + @pytest.fixture(autouse=True) + def setup_test(self, ensure_ghost_running): + """Auto-use the Ghost running check.""" + pass + + def get_mcp_tool(self, mcp_server, tool_name: str): + """Get an MCP tool by name from the server.""" + tools = {tool.name: tool for tool in mcp_server.tools} + if tool_name not in tools: + available_tools = list(tools.keys()) + raise ValueError(f"Tool '{tool_name}' not found. Available tools: {available_tools}") + return tools[tool_name] + + async def call_mcp_tool(self, mcp_server, tool_name: str, **kwargs): + """Call an MCP tool with the given arguments.""" + tool = self.get_mcp_tool(mcp_server, tool_name) + return await tool.func(**kwargs) \ No newline at end of file diff --git a/tests/e2e/test_e2e_authors.py b/tests/e2e/test_e2e_authors.py new file mode 100644 index 0000000..86e4bbf --- /dev/null +++ b/tests/e2e/test_e2e_authors.py @@ -0,0 +1,312 @@ +"""End-to-end tests for Ghost authors functionality.""" + +import json +import pytest + +from .conftest import BaseE2ETest + + +@pytest.mark.e2e +class TestAuthorsContentAPIE2E(BaseE2ETest): + """Test authors Content API functionality end-to-end.""" + + async def test_get_authors(self, mcp_server): + """Test getting authors.""" + from ghost_mcp.tools.content.authors import get_authors + + # Get authors + result = await get_authors() + response = json.loads(result) + + # Verify response structure + assert "authors" in response + assert "meta" in response + assert isinstance(response["authors"], list) + + # Should have at least the default Ghost author + assert len(response["authors"]) >= 1 + + # Verify author structure + if response["authors"]: + author = response["authors"][0] + essential_fields = ["id", "name", "slug", "email"] + for field in essential_fields: + assert field in author + + async def test_get_authors_with_pagination(self, mcp_server): + """Test getting authors with pagination parameters.""" + from ghost_mcp.tools.content.authors import get_authors + + # Get authors with limit + result = await get_authors(limit=5) + response = json.loads(result) + + assert "authors" in response + assert len(response["authors"]) <= 5 + + # Test pagination metadata + assert "meta" in response + assert "pagination" in response["meta"] + + async def test_get_authors_with_include_count(self, mcp_server): + """Test getting authors with post count included.""" + from ghost_mcp.tools.content.authors import get_authors + + # Get authors with count.posts included + result = await get_authors(include="count.posts") + response = json.loads(result) + + # Verify authors include count information + if response["authors"]: + author = response["authors"][0] + assert "count" in author + assert "posts" in author["count"] + assert isinstance(author["count"]["posts"], int) + + async def test_get_authors_with_filter(self, mcp_server): + """Test getting authors with filter.""" + from ghost_mcp.tools.content.authors import get_authors + + # Filter authors by status + result = await get_authors(filter="status:active") + response = json.loads(result) + + # Verify filtering works + assert "authors" in response + if response["authors"]: + for author in response["authors"]: + # Status field might be omitted for active authors in some versions + status = author.get("status", "active") + assert status == "active" + + async def test_get_authors_with_order(self, mcp_server): + """Test getting authors with custom ordering.""" + from ghost_mcp.tools.content.authors import get_authors + + # Get authors ordered by name + result = await get_authors(order="name asc") + response = json.loads(result) + + # Verify ordering (should be alphabetical) + if len(response["authors"]) > 1: + author_names = [author["name"] for author in response["authors"]] + assert author_names == sorted(author_names) + + async def test_get_author_by_id(self, mcp_server): + """Test getting an author by ID.""" + from ghost_mcp.tools.content.authors import get_authors, get_author_by_id + + # First get all authors to find an existing ID + all_authors_result = await get_authors() + all_authors_response = json.loads(all_authors_result) + + if not all_authors_response["authors"]: + pytest.skip("No authors available for testing") + + # Use the first author's ID + test_author = all_authors_response["authors"][0] + author_id = test_author["id"] + + # Get author by ID + result = await get_author_by_id(author_id) + response = json.loads(result) + + # Verify response + assert "authors" in response + assert len(response["authors"]) == 1 + + author = response["authors"][0] + assert author["id"] == author_id + assert author["name"] == test_author["name"] + + async def test_get_author_by_slug(self, mcp_server): + """Test getting an author by slug.""" + from ghost_mcp.tools.content.authors import get_authors, get_author_by_slug + + # First get all authors to find an existing slug + all_authors_result = await get_authors() + all_authors_response = json.loads(all_authors_result) + + if not all_authors_response["authors"]: + pytest.skip("No authors available for testing") + + # Use the first author's slug + test_author = all_authors_response["authors"][0] + author_slug = test_author["slug"] + + # Get author by slug + result = await get_author_by_slug(author_slug) + response = json.loads(result) + + # Verify response + assert "authors" in response + assert len(response["authors"]) == 1 + + author = response["authors"][0] + assert author["slug"] == author_slug + assert author["name"] == test_author["name"] + + async def test_get_author_by_nonexistent_id(self, mcp_server): + """Test getting an author with non-existent ID returns proper error.""" + from ghost_mcp.tools.content.authors import get_author_by_id + + with pytest.raises(Exception) as exc_info: + await get_author_by_id("nonexistent-id") + + # Verify we get an appropriate error + assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower() + + async def test_get_author_by_nonexistent_slug(self, mcp_server): + """Test getting an author with non-existent slug returns proper error.""" + from ghost_mcp.tools.content.authors import get_author_by_slug + + with pytest.raises(Exception) as exc_info: + await get_author_by_slug("nonexistent-slug") + + # Verify we get an appropriate error + assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower() + + async def test_author_fields_structure(self, mcp_server): + """Test that authors have expected field structure.""" + from ghost_mcp.tools.content.authors import get_authors + + # Get authors + result = await get_authors() + response = json.loads(result) + + if not response["authors"]: + pytest.skip("No authors available for testing") + + author = response["authors"][0] + + # Verify essential fields are present + essential_fields = [ + "id", "name", "slug", "email", "created_at", "updated_at", "url" + ] + for field in essential_fields: + assert field in author, f"Field '{field}' missing from author" + + # Verify data types + assert isinstance(author["id"], str) + assert isinstance(author["name"], str) + assert isinstance(author["slug"], str) + assert isinstance(author["email"], str) + + async def test_author_with_posts_count(self, mcp_server, sample_published_post): + """Test author post count when author has posts.""" + from ghost_mcp.tools.content.authors import get_authors + + # Get authors with post count + result = await get_authors(include="count.posts") + response = json.loads(result) + + if not response["authors"]: + pytest.skip("No authors available for testing") + + # Find an author with posts + authors_with_posts = [ + author for author in response["authors"] + if author.get("count", {}).get("posts", 0) > 0 + ] + + # There should be at least one author with posts (the author of our sample post) + assert len(authors_with_posts) > 0 + + author_with_posts = authors_with_posts[0] + assert author_with_posts["count"]["posts"] > 0 + + async def test_author_profile_fields(self, mcp_server): + """Test that authors include profile-related fields.""" + from ghost_mcp.tools.content.authors import get_authors + + # Get authors + result = await get_authors() + response = json.loads(result) + + if not response["authors"]: + pytest.skip("No authors available for testing") + + author = response["authors"][0] + + # These fields may be null but should be present + profile_fields = [ + "bio", "website", "location", "facebook", "twitter", + "profile_image", "cover_image" + ] + + for field in profile_fields: + assert field in author + + async def test_default_ghost_author_exists(self, mcp_server): + """Test that the default Ghost author exists.""" + from ghost_mcp.tools.content.authors import get_authors + + # Get all authors + result = await get_authors() + response = json.loads(result) + + # Should have at least one author (the default Ghost author) + assert len(response["authors"]) >= 1 + + # Look for the default Ghost author + ghost_authors = [ + author for author in response["authors"] + if "ghost" in author["name"].lower() or "ghost" in author["slug"].lower() + ] + + # There should be at least one Ghost-related author + assert len(ghost_authors) >= 1 + + async def test_author_url_format(self, mcp_server): + """Test that author URLs follow expected format.""" + from ghost_mcp.tools.content.authors import get_authors + + # Get authors + result = await get_authors() + response = json.loads(result) + + if not response["authors"]: + pytest.skip("No authors available for testing") + + author = response["authors"][0] + + # Verify URL format + assert "url" in author + author_url = author["url"] + assert author_url.startswith("http") + assert "/author/" in author_url + assert author["slug"] in author_url + + async def test_authors_unique_slugs(self, mcp_server): + """Test that all authors have unique slugs.""" + from ghost_mcp.tools.content.authors import get_authors + + # Get all authors + result = await get_authors() + response = json.loads(result) + + if len(response["authors"]) <= 1: + pytest.skip("Not enough authors to test uniqueness") + + # Extract all slugs + slugs = [author["slug"] for author in response["authors"]] + + # Verify uniqueness + assert len(slugs) == len(set(slugs)), "Author slugs are not unique" + + async def test_authors_unique_emails(self, mcp_server): + """Test that all authors have unique email addresses.""" + from ghost_mcp.tools.content.authors import get_authors + + # Get all authors + result = await get_authors() + response = json.loads(result) + + if len(response["authors"]) <= 1: + pytest.skip("Not enough authors to test uniqueness") + + # Extract all emails + emails = [author["email"] for author in response["authors"]] + + # Verify uniqueness + assert len(emails) == len(set(emails)), "Author emails are not unique" \ No newline at end of file diff --git a/tests/e2e/test_e2e_connection.py b/tests/e2e/test_e2e_connection.py new file mode 100644 index 0000000..9f7fdeb --- /dev/null +++ b/tests/e2e/test_e2e_connection.py @@ -0,0 +1,85 @@ +"""End-to-end tests for Ghost connection functionality.""" + +import json +import pytest + +from .conftest import BaseE2ETest + + +@pytest.mark.e2e +class TestConnectionE2E(BaseE2ETest): + """Test Ghost connection functionality end-to-end.""" + + async def test_check_ghost_connection_success(self, mcp_server): + """Test successful Ghost connection check.""" + # Call the MCP tool + result = await self.call_mcp_tool(mcp_server, "check_ghost_connection") + + # Parse the JSON result + status = json.loads(result) + + # Verify connection status + assert status["ghost_url"] == "http://localhost:2368" + assert status["content_api_configured"] is True + assert status["admin_api_configured"] is True + assert status["connection_test"] == "completed" + assert status["content_api_status"] == "connected" + assert status["admin_api_status"] == "connected" + + async def test_check_ghost_connection_config_fields(self, mcp_server): + """Test that connection check returns all expected configuration fields.""" + result = await self.call_mcp_tool(mcp_server, "check_ghost_connection") + status = json.loads(result) + + # Verify all expected fields are present + expected_fields = { + "ghost_url", + "content_api_configured", + "admin_api_configured", + "mode", + "connection_test", + "content_api_status", + "admin_api_status" + } + + assert set(status.keys()) >= expected_fields + + async def test_connection_with_ghost_client(self, ghost_client): + """Test direct connection using Ghost client.""" + # Test Content API connection + response = await ghost_client._make_request("GET", "settings/", api_type="content") + assert "settings" in response + + # Test Admin API connection + response = await ghost_client._make_request("GET", "site/", api_type="admin") + assert "site" in response + + async def test_ghost_instance_health(self, ghost_client): + """Test that Ghost instance is healthy and responsive.""" + # Test getting site settings to verify the instance is functional + response = await ghost_client._make_request("GET", "settings/", api_type="content") + + # Verify we get expected settings structure + assert "settings" in response + settings = response["settings"] + + # Check for some expected settings + setting_keys = [setting["key"] for setting in settings] + expected_keys = ["title", "description", "url"] + + for key in expected_keys: + assert key in setting_keys + + async def test_api_version_compatibility(self, ghost_client): + """Test that the API version is compatible.""" + # Make a request to test API version response headers + response = await ghost_client._make_request("GET", "site/", api_type="admin") + + # Verify we get a proper response structure + assert "site" in response + site = response["site"] + + # Check for expected site fields + expected_fields = ["title", "url", "version"] + for field in expected_fields: + assert field in site \ No newline at end of file diff --git a/tests/e2e/test_e2e_pages.py b/tests/e2e/test_e2e_pages.py new file mode 100644 index 0000000..942c355 --- /dev/null +++ b/tests/e2e/test_e2e_pages.py @@ -0,0 +1,318 @@ +"""End-to-end tests for Ghost pages functionality.""" + +import json +import pytest + +from .conftest import BaseE2ETest + + +@pytest.mark.e2e +class TestPagesContentAPIE2E(BaseE2ETest): + """Test pages Content API functionality end-to-end.""" + + async def test_get_pages(self, mcp_server, sample_page): + """Test getting pages.""" + from ghost_mcp.tools.content.pages import get_pages + + # Get pages + result = await get_pages() + response = json.loads(result) + + # Verify response structure + assert "pages" in response + assert "meta" in response + assert isinstance(response["pages"], list) + + async def test_get_pages_with_pagination(self, mcp_server): + """Test getting pages with pagination parameters.""" + from ghost_mcp.tools.content.pages import get_pages + + # Get pages with limit + result = await get_pages(limit=5) + response = json.loads(result) + + assert "pages" in response + assert len(response["pages"]) <= 5 + + # Test pagination metadata + assert "meta" in response + assert "pagination" in response["meta"] + + async def test_get_pages_with_include_fields(self, mcp_server): + """Test getting pages with include fields.""" + from ghost_mcp.tools.content.pages import get_pages + + # Get pages with tags and authors included + result = await get_pages(include="tags,authors") + response = json.loads(result) + + # Verify pages structure + assert "pages" in response + if response["pages"]: + page = response["pages"][0] + assert "tags" in page + assert "authors" in page + + async def test_get_page_by_id(self, mcp_server, sample_page): + """Test getting a page by ID.""" + from ghost_mcp.tools.content.pages import get_page_by_id + + # Get page by ID + result = await get_page_by_id(sample_page["id"]) + response = json.loads(result) + + # Verify response + assert "pages" in response + assert len(response["pages"]) == 1 + + page = response["pages"][0] + assert page["id"] == sample_page["id"] + assert page["title"] == sample_page["title"] + + async def test_get_page_by_slug(self, mcp_server, sample_page): + """Test getting a page by slug.""" + from ghost_mcp.tools.content.pages import get_page_by_slug + + # Get page by slug + result = await get_page_by_slug(sample_page["slug"]) + response = json.loads(result) + + # Verify response + assert "pages" in response + assert len(response["pages"]) == 1 + + page = response["pages"][0] + assert page["slug"] == sample_page["slug"] + assert page["title"] == sample_page["title"] + + async def test_get_page_by_nonexistent_id(self, mcp_server): + """Test getting a page with non-existent ID returns proper error.""" + from ghost_mcp.tools.content.pages import get_page_by_id + + with pytest.raises(Exception) as exc_info: + await get_page_by_id("nonexistent-id") + + # Verify we get an appropriate error + assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower() + + async def test_get_page_by_nonexistent_slug(self, mcp_server): + """Test getting a page with non-existent slug returns proper error.""" + from ghost_mcp.tools.content.pages import get_page_by_slug + + with pytest.raises(Exception) as exc_info: + await get_page_by_slug("nonexistent-slug") + + # Verify we get an appropriate error + assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower() + + +@pytest.mark.e2e +@pytest.mark.admin +class TestPagesAdminAPIE2E(BaseE2ETest): + """Test pages Admin API functionality end-to-end.""" + + async def test_create_page_draft(self, mcp_server, test_page_data, cleanup_test_content): + """Test creating a draft page.""" + from ghost_mcp.tools.admin.pages import create_page + + # Create page + result = await create_page( + title=test_page_data["title"], + content=test_page_data["content"], + content_format=test_page_data["content_format"], + status=test_page_data["status"] + ) + response = json.loads(result) + + # Verify response + assert "pages" in response + assert len(response["pages"]) == 1 + + page = response["pages"][0] + assert page["title"] == test_page_data["title"] + assert page["status"] == "draft" + assert "id" in page + assert "slug" in page + + # Track for cleanup + cleanup_test_content["track_page"](page["id"]) + + async def test_create_page_published(self, mcp_server, test_page_data, cleanup_test_content): + """Test creating a published page.""" + from ghost_mcp.tools.admin.pages import create_page + + # Create published page + result = await create_page( + title=test_page_data["title"], + content=test_page_data["content"], + content_format=test_page_data["content_format"], + status="published" + ) + response = json.loads(result) + + # Verify response + assert "pages" in response + page = response["pages"][0] + assert page["status"] == "published" + assert "published_at" in page + assert page["published_at"] is not None + + # Track for cleanup + cleanup_test_content["track_page"](page["id"]) + + async def test_create_page_with_custom_slug(self, mcp_server, test_page_data, cleanup_test_content): + """Test creating a page with custom slug.""" + from ghost_mcp.tools.admin.pages import create_page + + custom_slug = "custom-test-page-slug" + + # Create page with custom slug + result = await create_page( + title=test_page_data["title"], + content=test_page_data["content"], + content_format=test_page_data["content_format"], + status=test_page_data["status"], + slug=custom_slug + ) + response = json.loads(result) + + # Verify custom slug + page = response["pages"][0] + assert page["slug"] == custom_slug + + # Track for cleanup + cleanup_test_content["track_page"](page["id"]) + + async def test_create_page_with_special_characters(self, mcp_server, cleanup_test_content): + """Test creating a page with special characters in title and content.""" + from ghost_mcp.tools.admin.pages import create_page + + # Title and content with special characters + special_title = "Test Page with Special Characters: éñ中文 📄" + special_content = json.dumps({ + "root": { + "children": [ + { + "children": [ + { + "detail": 0, + "format": 0, + "mode": "normal", + "style": "", + "text": "Page content with émojis 📖 and unicode: 中文字符", + "type": "text", + "version": 1 + } + ], + "direction": "ltr", + "format": "", + "indent": 0, + "type": "paragraph", + "version": 1 + } + ], + "direction": "ltr", + "format": "", + "indent": 0, + "type": "root", + "version": 1 + } + }) + + # Create page with special characters + result = await create_page( + title=special_title, + content=special_content, + content_format="lexical", + status="draft" + ) + response = json.loads(result) + + # Verify special characters are preserved + page = response["pages"][0] + assert page["title"] == special_title + + # Track for cleanup + cleanup_test_content["track_page"](page["id"]) + + async def test_create_page_minimal_data(self, mcp_server, cleanup_test_content): + """Test creating a page with minimal required data.""" + from ghost_mcp.tools.admin.pages import create_page + + # Create page with only title + result = await create_page(title="Minimal Test Page") + response = json.loads(result) + + # Verify page was created with defaults + page = response["pages"][0] + assert page["title"] == "Minimal Test Page" + assert page["status"] == "draft" # Should default to draft + assert "slug" in page + + # Track for cleanup + cleanup_test_content["track_page"](page["id"]) + + async def test_page_slug_generation(self, mcp_server, cleanup_test_content): + """Test that page slugs are generated correctly from titles.""" + from ghost_mcp.tools.admin.pages import create_page + + test_title = "Test Page Title With Spaces And Special-Characters!" + + # Create page and check slug generation + result = await create_page(title=test_title) + response = json.loads(result) + + page = response["pages"][0] + slug = page["slug"] + + # Verify slug is URL-friendly + assert " " not in slug + assert slug.islower() + assert "test-page-title" in slug + + # Track for cleanup + cleanup_test_content["track_page"](page["id"]) + + async def test_create_page_empty_content(self, mcp_server, cleanup_test_content): + """Test creating a page with empty content.""" + from ghost_mcp.tools.admin.pages import create_page + + # Create page with empty content + result = await create_page( + title="Empty Content Page", + content="", + status="draft" + ) + response = json.loads(result) + + # Verify page was created + page = response["pages"][0] + assert page["title"] == "Empty Content Page" + assert "id" in page + + # Track for cleanup + cleanup_test_content["track_page"](page["id"]) + + async def test_pages_vs_posts_distinction(self, mcp_server, sample_page, sample_published_post): + """Test that pages and posts are properly distinguished.""" + from ghost_mcp.tools.content.pages import get_pages + from ghost_mcp.tools.content.posts import get_posts + + # Get pages and posts + pages_result = await get_pages() + posts_result = await get_posts() + + pages_response = json.loads(pages_result) + posts_response = json.loads(posts_result) + + # Get IDs from both + page_ids = [page["id"] for page in pages_response["pages"]] + post_ids = [post["id"] for post in posts_response["posts"]] + + # Verify our sample page is in pages, not posts + assert sample_page["id"] in page_ids + assert sample_page["id"] not in post_ids + + # Verify our sample post is in posts, not pages + assert sample_published_post["id"] in post_ids + assert sample_published_post["id"] not in page_ids \ No newline at end of file diff --git a/tests/e2e/test_e2e_posts.py b/tests/e2e/test_e2e_posts.py new file mode 100644 index 0000000..c61d689 --- /dev/null +++ b/tests/e2e/test_e2e_posts.py @@ -0,0 +1,334 @@ +"""End-to-end tests for Ghost posts functionality.""" + +import json +import pytest + +from .conftest import BaseE2ETest + + +@pytest.mark.e2e +class TestPostsContentAPIE2E(BaseE2ETest): + """Test posts Content API functionality end-to-end.""" + + async def test_get_posts(self, mcp_server, sample_published_post): + """Test getting published posts.""" + from ghost_mcp.tools.content.posts import get_posts + + # Get posts + result = await get_posts() + response = json.loads(result) + + # Verify response structure + assert "posts" in response + assert "meta" in response + assert isinstance(response["posts"], list) + + # Verify our test post appears in the list + post_titles = [post["title"] for post in response["posts"]] + assert sample_published_post["title"] in post_titles + + async def test_get_posts_with_pagination(self, mcp_server, sample_published_post): + """Test getting posts with pagination parameters.""" + from ghost_mcp.tools.content.posts import get_posts + + # Get posts with limit + result = await get_posts(limit=5) + response = json.loads(result) + + assert "posts" in response + assert len(response["posts"]) <= 5 + + # Test pagination metadata + assert "meta" in response + assert "pagination" in response["meta"] + + async def test_get_posts_with_include_fields(self, mcp_server, sample_published_post): + """Test getting posts with include fields.""" + from ghost_mcp.tools.content.posts import get_posts + + # Get posts with tags and authors included + result = await get_posts(include="tags,authors") + response = json.loads(result) + + # Verify posts include tags and authors + if response["posts"]: + post = response["posts"][0] + assert "tags" in post + assert "authors" in post + + async def test_get_post_by_id(self, mcp_server, sample_published_post): + """Test getting a post by ID.""" + from ghost_mcp.tools.content.posts import get_post_by_id + + # Get post by ID + result = await get_post_by_id(sample_published_post["id"]) + response = json.loads(result) + + # Verify response + assert "posts" in response + assert len(response["posts"]) == 1 + + post = response["posts"][0] + assert post["id"] == sample_published_post["id"] + assert post["title"] == sample_published_post["title"] + + async def test_get_post_by_slug(self, mcp_server, sample_published_post): + """Test getting a post by slug.""" + from ghost_mcp.tools.content.posts import get_post_by_slug + + # Get post by slug + result = await get_post_by_slug(sample_published_post["slug"]) + response = json.loads(result) + + # Verify response + assert "posts" in response + assert len(response["posts"]) == 1 + + post = response["posts"][0] + assert post["slug"] == sample_published_post["slug"] + assert post["title"] == sample_published_post["title"] + + async def test_search_posts(self, mcp_server, sample_published_post): + """Test searching posts.""" + from ghost_mcp.tools.content.posts import search_posts + + # Extract a unique word from the test post title + search_term = sample_published_post["title"].split()[0] + + # Search for posts + result = await search_posts(search_term) + response = json.loads(result) + + # Verify response + assert "posts" in response + assert isinstance(response["posts"], list) + + # Verify our test post appears in search results + if response["posts"]: + post_titles = [post["title"] for post in response["posts"]] + matching_posts = [title for title in post_titles if search_term in title] + assert len(matching_posts) > 0 + + async def test_get_post_by_nonexistent_id(self, mcp_server): + """Test getting a post with non-existent ID returns proper error.""" + with pytest.raises(Exception) as exc_info: + await self.call_mcp_tool(mcp_server, "get_post_by_id", post_id="nonexistent-id") + + # Verify we get an appropriate error + assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower() + + async def test_get_post_by_nonexistent_slug(self, mcp_server): + """Test getting a post with non-existent slug returns proper error.""" + with pytest.raises(Exception) as exc_info: + await self.call_mcp_tool(mcp_server, "get_post_by_slug", slug="nonexistent-slug") + + # Verify we get an appropriate error + assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower() + + +@pytest.mark.e2e +@pytest.mark.admin +class TestPostsAdminAPIE2E(BaseE2ETest): + """Test posts Admin API functionality end-to-end.""" + + async def test_create_post_draft(self, mcp_server, test_post_data, cleanup_test_content): + """Test creating a draft post.""" + # Create post + result = await self.call_mcp_tool( + mcp_server, "create_post", + title=test_post_data["title"], + content=test_post_data["content"], + content_format=test_post_data["content_format"], + status=test_post_data["status"] + ) + response = json.loads(result) + + # Verify response + assert "posts" in response + assert len(response["posts"]) == 1 + + post = response["posts"][0] + assert post["title"] == test_post_data["title"] + assert post["status"] == "draft" + assert "id" in post + + # Track for cleanup + cleanup_test_content["track_post"](post["id"]) + + async def test_create_post_published(self, mcp_server, test_post_data, cleanup_test_content): + """Test creating a published post.""" + from ghost_mcp.tools.admin.posts import create_post + + # Create published post + result = await create_post( + title=test_post_data["title"], + content=test_post_data["content"], + content_format=test_post_data["content_format"], + status="published" + ) + response = json.loads(result) + + # Verify response + assert "posts" in response + post = response["posts"][0] + assert post["status"] == "published" + assert "published_at" in post + assert post["published_at"] is not None + + # Track for cleanup + cleanup_test_content["track_post"](post["id"]) + + async def test_create_post_with_metadata(self, mcp_server, test_post_data, cleanup_test_content): + """Test creating a post with metadata fields.""" + from ghost_mcp.tools.admin.posts import create_post + + # Create post with metadata + result = await create_post( + title=test_post_data["title"], + content=test_post_data["content"], + content_format=test_post_data["content_format"], + status=test_post_data["status"], + excerpt="Test excerpt for e2e testing", + featured=True, + meta_title="Test Meta Title", + meta_description="Test meta description" + ) + response = json.loads(result) + + # Verify metadata + post = response["posts"][0] + assert post["excerpt"] == "Test excerpt for e2e testing" + assert post["featured"] is True + assert post["meta_title"] == "Test Meta Title" + assert post["meta_description"] == "Test meta description" + + # Track for cleanup + cleanup_test_content["track_post"](post["id"]) + + async def test_update_post(self, mcp_server, sample_post, cleanup_test_content): + """Test updating a post.""" + from ghost_mcp.tools.admin.posts import update_post + + # Update the post + new_title = f"Updated {sample_post['title']}" + result = await update_post( + post_id=sample_post["id"], + title=new_title, + status="published" + ) + response = json.loads(result) + + # Verify update + post = response["posts"][0] + assert post["title"] == new_title + assert post["status"] == "published" + assert post["id"] == sample_post["id"] + + async def test_delete_post(self, mcp_server, sample_post, cleanup_test_content): + """Test deleting a post.""" + from ghost_mcp.tools.admin.posts import delete_post + + post_id = sample_post["id"] + + # Delete the post + result = await delete_post(post_id) + + # Verify deletion message + assert "deleted" in result.lower() or "success" in result.lower() + + # Verify post is actually deleted by trying to get it + from ghost_mcp.tools.content.posts import get_post_by_id + with pytest.raises(Exception): + await get_post_by_id(post_id) + + # Remove from cleanup tracking since it's already deleted + if post_id in cleanup_test_content: + cleanup_test_content.remove(post_id) + + async def test_get_admin_posts_includes_drafts(self, mcp_server, sample_post): + """Test that admin posts endpoint includes draft posts.""" + from ghost_mcp.tools.admin.posts import get_admin_posts + + # Get admin posts + result = await get_admin_posts() + response = json.loads(result) + + # Verify response includes posts + assert "posts" in response + assert isinstance(response["posts"], list) + + # Find our draft post + post_ids = [post["id"] for post in response["posts"]] + assert sample_post["id"] in post_ids + + # Verify we can see draft status + draft_posts = [post for post in response["posts"] if post["status"] == "draft"] + assert len(draft_posts) > 0 + + async def test_create_post_with_special_characters(self, mcp_server, cleanup_test_content): + """Test creating a post with special characters in title and content.""" + from ghost_mcp.tools.admin.posts import create_post + + # Title and content with special characters + special_title = "Test Post with Special Characters: éñ中文 🚀" + special_content = json.dumps({ + "root": { + "children": [ + { + "children": [ + { + "detail": 0, + "format": 0, + "mode": "normal", + "style": "", + "text": "Content with émojis 🎉 and unicode: 中文字符", + "type": "text", + "version": 1 + } + ], + "direction": "ltr", + "format": "", + "indent": 0, + "type": "paragraph", + "version": 1 + } + ], + "direction": "ltr", + "format": "", + "indent": 0, + "type": "root", + "version": 1 + } + }) + + # Create post with special characters + result = await create_post( + title=special_title, + content=special_content, + content_format="lexical", + status="draft" + ) + response = json.loads(result) + + # Verify special characters are preserved + post = response["posts"][0] + assert post["title"] == special_title + + # Track for cleanup + cleanup_test_content["track_post"](post["id"]) + + async def test_update_post_nonexistent(self, mcp_server): + """Test updating a non-existent post returns proper error.""" + with pytest.raises(Exception) as exc_info: + await self.call_mcp_tool(mcp_server, "update_post", post_id="nonexistent-id", title="New Title") + + # Verify we get an appropriate error + assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower() + + async def test_delete_post_nonexistent(self, mcp_server): + """Test deleting a non-existent post returns proper error.""" + with pytest.raises(Exception) as exc_info: + await self.call_mcp_tool(mcp_server, "delete_post", post_id="nonexistent-id") + + # Verify we get an appropriate error + assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower() \ No newline at end of file diff --git a/tests/e2e/test_e2e_search.py b/tests/e2e/test_e2e_search.py new file mode 100644 index 0000000..c86ca53 --- /dev/null +++ b/tests/e2e/test_e2e_search.py @@ -0,0 +1,288 @@ +"""End-to-end tests for Ghost search functionality.""" + +import json +import pytest + +from .conftest import BaseE2ETest + + +@pytest.mark.e2e +class TestSearchE2E(BaseE2ETest): + """Test search functionality end-to-end.""" + + async def test_search_posts_basic(self, mcp_server, sample_published_post): + """Test basic post search functionality.""" + from ghost_mcp.tools.content.posts import search_posts + + # Extract a searchable term from the test post title + search_term = sample_published_post["title"].split()[0] + + # Search for posts + result = await search_posts(search_term) + response = json.loads(result) + + # Verify response structure + assert "posts" in response + assert isinstance(response["posts"], list) + + # Should find our test post + post_titles = [post["title"] for post in response["posts"]] + matching_posts = [title for title in post_titles if search_term in title] + assert len(matching_posts) > 0, f"Should find posts matching '{search_term}'" + + async def test_search_posts_with_limit(self, mcp_server, sample_published_post): + """Test post search with result limit.""" + from ghost_mcp.tools.content.posts import search_posts + + # Use a broad search term + search_term = "test" + + # Search with limit + result = await search_posts(search_term, limit=3) + response = json.loads(result) + + # Verify limit is respected + assert "posts" in response + assert len(response["posts"]) <= 3 + + async def test_search_posts_case_insensitive(self, mcp_server, sample_published_post): + """Test that post search is case insensitive.""" + from ghost_mcp.tools.content.posts import search_posts + + # Get a word from the title in different cases + original_word = sample_published_post["title"].split()[0] + lowercase_search = original_word.lower() + uppercase_search = original_word.upper() + + # Search with lowercase + lowercase_result = await search_posts(lowercase_search) + lowercase_response = json.loads(lowercase_result) + + # Search with uppercase + uppercase_result = await search_posts(uppercase_search) + uppercase_response = json.loads(uppercase_result) + + # Both should return similar results + assert len(lowercase_response["posts"]) > 0 + assert len(uppercase_response["posts"]) > 0 + + # Should find the same post regardless of case + lowercase_titles = [post["title"] for post in lowercase_response["posts"]] + uppercase_titles = [post["title"] for post in uppercase_response["posts"]] + + test_post_title = sample_published_post["title"] + if test_post_title in lowercase_titles: + assert test_post_title in uppercase_titles + + async def test_search_posts_partial_match(self, mcp_server, sample_published_post): + """Test that post search supports partial word matching.""" + from ghost_mcp.tools.content.posts import search_posts + + # Get a partial word from the title + full_word = sample_published_post["title"].split()[0] + if len(full_word) > 3: + partial_word = full_word[:3] # First 3 characters + + # Search with partial word + result = await search_posts(partial_word) + response = json.loads(result) + + # Should find posts containing the partial match + assert "posts" in response + # Note: Not all Ghost installations support partial matching, + # so we just verify the search doesn't error + + async def test_search_posts_special_characters(self, mcp_server): + """Test post search with special characters.""" + from ghost_mcp.tools.content.posts import search_posts + + # Test search with special characters + special_searches = ["test!", "test-post", "test_post"] + + for search_term in special_searches: + try: + result = await search_posts(search_term) + response = json.loads(result) + + # Should return valid response structure + assert "posts" in response + assert isinstance(response["posts"], list) + except Exception as e: + # Some special characters might not be supported, + # but should not cause server errors + assert "500" not in str(e), f"Server error with search term '{search_term}'" + + async def test_search_posts_empty_query(self, mcp_server): + """Test post search with empty query.""" + from ghost_mcp.tools.content.posts import search_posts + + # Search with empty string + try: + result = await search_posts("") + response = json.loads(result) + + # Should return empty results or all posts + assert "posts" in response + assert isinstance(response["posts"], list) + except Exception as e: + # Empty query might not be allowed, but should return proper error + assert "400" in str(e) or "validation" in str(e).lower() + + async def test_search_posts_nonexistent_term(self, mcp_server): + """Test post search with non-existent term.""" + from ghost_mcp.tools.content.posts import search_posts + + # Search for something that shouldn't exist + nonexistent_term = "xyzneverexistingtermabc123" + + result = await search_posts(nonexistent_term) + response = json.loads(result) + + # Should return empty results + assert "posts" in response + assert len(response["posts"]) == 0 + + async def test_search_posts_common_words(self, mcp_server): + """Test post search with common words.""" + from ghost_mcp.tools.content.posts import search_posts + + # Test with common words + common_words = ["the", "and", "is", "test"] + + for word in common_words: + result = await search_posts(word) + response = json.loads(result) + + # Should return valid response + assert "posts" in response + assert isinstance(response["posts"], list) + + async def test_search_posts_unicode_characters(self, mcp_server): + """Test post search with unicode characters.""" + from ghost_mcp.tools.content.posts import search_posts + + # Test with unicode characters + unicode_searches = ["café", "naïve", "résumé", "中文"] + + for search_term in unicode_searches: + try: + result = await search_posts(search_term) + response = json.loads(result) + + # Should return valid response structure + assert "posts" in response + assert isinstance(response["posts"], list) + except Exception as e: + # Unicode might not be fully supported in all Ghost configurations + # but should not cause server errors + assert "500" not in str(e), f"Server error with unicode search '{search_term}'" + + async def test_search_posts_long_query(self, mcp_server): + """Test post search with very long query.""" + from ghost_mcp.tools.content.posts import search_posts + + # Very long search query + long_query = "this is a very long search query that tests the system's ability to handle long search terms without breaking or causing performance issues" * 3 + + try: + result = await search_posts(long_query) + response = json.loads(result) + + # Should handle long queries gracefully + assert "posts" in response + assert isinstance(response["posts"], list) + except Exception as e: + # Long queries might be rejected, but should return proper error + assert "400" in str(e) or "413" in str(e) or "length" in str(e).lower() + + async def test_search_posts_multiple_words(self, mcp_server, sample_published_post): + """Test post search with multiple words.""" + from ghost_mcp.tools.content.posts import search_posts + + # Get multiple words from the title + title_words = sample_published_post["title"].split() + if len(title_words) >= 2: + # Search with first two words + multi_word_search = f"{title_words[0]} {title_words[1]}" + + result = await search_posts(multi_word_search) + response = json.loads(result) + + # Should return valid results + assert "posts" in response + assert isinstance(response["posts"], list) + + async def test_search_posts_pagination_metadata(self, mcp_server, sample_published_post): + """Test that search results include proper pagination metadata.""" + from ghost_mcp.tools.content.posts import search_posts + + # Search for posts + search_term = "test" + result = await search_posts(search_term, limit=5) + response = json.loads(result) + + # Should have posts array + assert "posts" in response + + # May or may not have meta depending on implementation, + # but if present should be properly structured + if "meta" in response: + meta = response["meta"] + assert isinstance(meta, dict) + + if "pagination" in meta: + pagination = meta["pagination"] + assert isinstance(pagination, dict) + + async def test_search_posts_include_fields(self, mcp_server, sample_published_post): + """Test search with include fields parameter.""" + from ghost_mcp.tools.content.posts import search_posts + + # Extract search term + search_term = sample_published_post["title"].split()[0] + + # Search with include fields + result = await search_posts(search_term, include="tags,authors") + response = json.loads(result) + + # Verify include fields work in search + if response["posts"]: + post = response["posts"][0] + assert "tags" in post + assert "authors" in post + + async def test_search_posts_response_structure(self, mcp_server, sample_published_post): + """Test that search results have proper post structure.""" + from ghost_mcp.tools.content.posts import search_posts + + # Search for posts + search_term = sample_published_post["title"].split()[0] + result = await search_posts(search_term) + response = json.loads(result) + + # Verify each post has essential fields + if response["posts"]: + post = response["posts"][0] + + essential_fields = ["id", "title", "slug", "status", "created_at", "updated_at", "url"] + for field in essential_fields: + assert field in post, f"Search result should include '{field}'" + + async def test_search_only_published_posts(self, mcp_server, sample_post, sample_published_post): + """Test that search only returns published posts, not drafts.""" + from ghost_mcp.tools.content.posts import search_posts + + # Search with a term that might match both posts + search_term = "test" + result = await search_posts(search_term) + response = json.loads(result) + + # Verify all returned posts are published + for post in response["posts"]: + assert post["status"] == "published", "Search should only return published posts" + + # Verify our published post might be in results + published_post_ids = [post["id"] for post in response["posts"]] + + # Our draft post should NOT be in search results + assert sample_post["id"] not in published_post_ids, "Draft posts should not appear in search" \ No newline at end of file diff --git a/tests/e2e/test_e2e_settings.py b/tests/e2e/test_e2e_settings.py new file mode 100644 index 0000000..b4f51cd --- /dev/null +++ b/tests/e2e/test_e2e_settings.py @@ -0,0 +1,292 @@ +"""End-to-end tests for Ghost settings functionality.""" + +import json +import pytest + +from .conftest import BaseE2ETest + + +@pytest.mark.e2e +class TestSettingsContentAPIE2E(BaseE2ETest): + """Test settings Content API functionality end-to-end.""" + + async def test_get_settings(self, mcp_server): + """Test getting public settings.""" + from ghost_mcp.tools.content.settings import get_settings + + # Get settings + result = await get_settings() + response = json.loads(result) + + # Verify response structure + assert "settings" in response + assert isinstance(response["settings"], list) + + # Should have multiple settings + assert len(response["settings"]) > 0 + + # Verify settings structure + setting = response["settings"][0] + essential_fields = ["key", "value"] + for field in essential_fields: + assert field in setting + + async def test_get_settings_essential_keys(self, mcp_server): + """Test that essential settings keys are present.""" + from ghost_mcp.tools.content.settings import get_settings + + # Get settings + result = await get_settings() + response = json.loads(result) + + # Extract all setting keys + setting_keys = [setting["key"] for setting in response["settings"]] + + # Essential settings that should be present + essential_keys = [ + "title", + "description", + "url", + "timezone", + "locale" + ] + + # Verify essential keys are present + for key in essential_keys: + assert key in setting_keys, f"Essential setting '{key}' not found" + + async def test_get_settings_data_types(self, mcp_server): + """Test that settings have correct data types.""" + from ghost_mcp.tools.content.settings import get_settings + + # Get settings + result = await get_settings() + response = json.loads(result) + + # Check data types for each setting + for setting in response["settings"]: + assert isinstance(setting["key"], str), f"Setting key should be string: {setting}" + # Value can be string, bool, or null + assert setting["value"] is None or isinstance(setting["value"], (str, bool, int)) + + async def test_get_settings_site_title(self, mcp_server): + """Test that site title setting is accessible.""" + from ghost_mcp.tools.content.settings import get_settings + + # Get settings + result = await get_settings() + response = json.loads(result) + + # Find title setting + title_settings = [s for s in response["settings"] if s["key"] == "title"] + assert len(title_settings) == 1, "Should have exactly one title setting" + + title_setting = title_settings[0] + assert isinstance(title_setting["value"], str) + assert len(title_setting["value"]) > 0, "Site title should not be empty" + + async def test_get_settings_site_url(self, mcp_server): + """Test that site URL setting is accessible and valid.""" + from ghost_mcp.tools.content.settings import get_settings + + # Get settings + result = await get_settings() + response = json.loads(result) + + # Find url setting + url_settings = [s for s in response["settings"] if s["key"] == "url"] + assert len(url_settings) == 1, "Should have exactly one url setting" + + url_setting = url_settings[0] + assert isinstance(url_setting["value"], str) + assert url_setting["value"].startswith("http"), "Site URL should start with http" + assert "localhost:2368" in url_setting["value"], "Should be localhost test instance" + + async def test_get_site_info(self, mcp_server): + """Test getting basic site information.""" + from ghost_mcp.tools.content.settings import get_site_info + + # Get site info + result = await get_site_info() + response = json.loads(result) + + # Verify response structure + assert "site" in response + site = response["site"] + + # Verify essential site info fields + essential_fields = ["title", "url", "version"] + for field in essential_fields: + assert field in site, f"Site info should include '{field}'" + + async def test_get_site_info_title_matches_settings(self, mcp_server): + """Test that site info title matches settings title.""" + from ghost_mcp.tools.content.settings import get_site_info, get_settings + + # Get both site info and settings + site_info_result = await get_site_info() + settings_result = await get_settings() + + site_info_response = json.loads(site_info_result) + settings_response = json.loads(settings_result) + + # Extract titles + site_title = site_info_response["site"]["title"] + + title_settings = [s for s in settings_response["settings"] if s["key"] == "title"] + settings_title = title_settings[0]["value"] + + # Titles should match + assert site_title == settings_title, "Site info title should match settings title" + + async def test_get_site_info_url_matches_settings(self, mcp_server): + """Test that site info URL matches settings URL.""" + from ghost_mcp.tools.content.settings import get_site_info, get_settings + + # Get both site info and settings + site_info_result = await get_site_info() + settings_result = await get_settings() + + site_info_response = json.loads(site_info_result) + settings_response = json.loads(settings_result) + + # Extract URLs + site_url = site_info_response["site"]["url"] + + url_settings = [s for s in settings_response["settings"] if s["key"] == "url"] + settings_url = url_settings[0]["value"] + + # URLs should match + assert site_url == settings_url, "Site info URL should match settings URL" + + async def test_get_site_info_version_format(self, mcp_server): + """Test that site info includes valid Ghost version.""" + from ghost_mcp.tools.content.settings import get_site_info + + # Get site info + result = await get_site_info() + response = json.loads(result) + + site = response["site"] + version = site["version"] + + # Version should be a non-empty string + assert isinstance(version, str) + assert len(version) > 0 + + # Should contain a dot (version format like 5.x.x) + assert "." in version, "Version should be in x.y.z format" + + async def test_settings_no_sensitive_data(self, mcp_server): + """Test that settings don't expose sensitive information.""" + from ghost_mcp.tools.content.settings import get_settings + + # Get settings + result = await get_settings() + response = json.loads(result) + + # Extract all setting keys + setting_keys = [setting["key"] for setting in response["settings"]] + + # Keys that should NOT be present in public settings + sensitive_keys = [ + "db_password", + "mailgun_api_key", + "admin_api_key", + "content_api_key", + "smtp_password", + "oauth_client_secret" + ] + + # Verify sensitive keys are not exposed + for sensitive_key in sensitive_keys: + assert sensitive_key not in setting_keys, f"Sensitive key '{sensitive_key}' should not be exposed" + + async def test_settings_readonly_access(self, mcp_server): + """Test that Content API only provides read access to settings.""" + from ghost_mcp.tools.content.settings import get_settings + + # This test verifies that we can read settings but not modify them + # through the Content API (which is read-only) + + # Get settings should work + result = await get_settings() + response = json.loads(result) + + # Should return valid settings + assert "settings" in response + assert len(response["settings"]) > 0 + + # Note: Write operations would be through Admin API, which requires + # separate authentication and is not typically exposed through MCP tools + + async def test_get_settings_pagination_metadata(self, mcp_server): + """Test that settings include proper metadata structure.""" + from ghost_mcp.tools.content.settings import get_settings + + # Get settings + result = await get_settings() + response = json.loads(result) + + # Should have settings array + assert "settings" in response + + # May or may not have meta depending on Ghost version, + # but if present should be properly structured + if "meta" in response: + meta = response["meta"] + assert isinstance(meta, dict) + + async def test_settings_timezone_format(self, mcp_server): + """Test that timezone setting is in valid format.""" + from ghost_mcp.tools.content.settings import get_settings + + # Get settings + result = await get_settings() + response = json.loads(result) + + # Find timezone setting + timezone_settings = [s for s in response["settings"] if s["key"] == "timezone"] + + if timezone_settings: # timezone might not always be present + timezone_setting = timezone_settings[0] + timezone_value = timezone_setting["value"] + + # Should be a string + assert isinstance(timezone_value, str) + + # Common timezone formats + valid_formats = [ + timezone_value.startswith("Etc/"), + timezone_value.startswith("America/"), + timezone_value.startswith("Europe/"), + timezone_value.startswith("Asia/"), + timezone_value == "UTC", + "/" in timezone_value # General timezone format + ] + + assert any(valid_formats), f"Invalid timezone format: {timezone_value}" + + async def test_settings_locale_format(self, mcp_server): + """Test that locale setting is in valid format.""" + from ghost_mcp.tools.content.settings import get_settings + + # Get settings + result = await get_settings() + response = json.loads(result) + + # Find locale setting + locale_settings = [s for s in response["settings"] if s["key"] == "locale"] + + if locale_settings: # locale might not always be present + locale_setting = locale_settings[0] + locale_value = locale_setting["value"] + + # Should be a string + assert isinstance(locale_value, str) + assert len(locale_value) >= 2, "Locale should be at least 2 characters" + + # Common locale formats (en, en-US, etc.) + # Should contain only letters, hyphens, and underscores + import re + assert re.match(r'^[a-zA-Z_-]+$', locale_value), f"Invalid locale format: {locale_value}" \ No newline at end of file diff --git a/tests/e2e/test_e2e_tags.py b/tests/e2e/test_e2e_tags.py new file mode 100644 index 0000000..8651328 --- /dev/null +++ b/tests/e2e/test_e2e_tags.py @@ -0,0 +1,330 @@ +"""End-to-end tests for Ghost tags functionality.""" + +import json +import pytest + +from .conftest import BaseE2ETest + + +@pytest.mark.e2e +class TestTagsContentAPIE2E(BaseE2ETest): + """Test tags Content API functionality end-to-end.""" + + async def test_get_tags(self, mcp_server, sample_tag): + """Test getting tags.""" + from ghost_mcp.tools.content.tags import get_tags + + # Get tags + result = await get_tags() + response = json.loads(result) + + # Verify response structure + assert "tags" in response + assert "meta" in response + assert isinstance(response["tags"], list) + + # Verify our test tag appears in the list + tag_names = [tag["name"] for tag in response["tags"]] + assert sample_tag["name"] in tag_names + + async def test_get_tags_with_pagination(self, mcp_server): + """Test getting tags with pagination parameters.""" + from ghost_mcp.tools.content.tags import get_tags + + # Get tags with limit + result = await get_tags(limit=5) + response = json.loads(result) + + assert "tags" in response + assert len(response["tags"]) <= 5 + + # Test pagination metadata + assert "meta" in response + assert "pagination" in response["meta"] + + async def test_get_tags_with_include_count(self, mcp_server): + """Test getting tags with post count included.""" + from ghost_mcp.tools.content.tags import get_tags + + # Get tags with count.posts included + result = await get_tags(include="count.posts") + response = json.loads(result) + + # Verify tags include count information + if response["tags"]: + tag = response["tags"][0] + assert "count" in tag + assert "posts" in tag["count"] + assert isinstance(tag["count"]["posts"], int) + + async def test_get_tags_with_filter(self, mcp_server, sample_tag): + """Test getting tags with filter.""" + from ghost_mcp.tools.content.tags import get_tags + + # Filter tags by visibility + result = await get_tags(filter="visibility:public") + response = json.loads(result) + + # Verify filtering works + assert "tags" in response + if response["tags"]: + for tag in response["tags"]: + assert tag.get("visibility", "public") == "public" + + async def test_get_tags_with_order(self, mcp_server): + """Test getting tags with custom ordering.""" + from ghost_mcp.tools.content.tags import get_tags + + # Get tags ordered by name + result = await get_tags(order="name asc") + response = json.loads(result) + + # Verify ordering (should be alphabetical) + if len(response["tags"]) > 1: + tag_names = [tag["name"] for tag in response["tags"]] + assert tag_names == sorted(tag_names) + + async def test_get_tag_by_id(self, mcp_server, sample_tag): + """Test getting a tag by ID.""" + from ghost_mcp.tools.content.tags import get_tag_by_id + + # Get tag by ID + result = await get_tag_by_id(sample_tag["id"]) + response = json.loads(result) + + # Verify response + assert "tags" in response + assert len(response["tags"]) == 1 + + tag = response["tags"][0] + assert tag["id"] == sample_tag["id"] + assert tag["name"] == sample_tag["name"] + + async def test_get_tag_by_slug(self, mcp_server, sample_tag): + """Test getting a tag by slug.""" + from ghost_mcp.tools.content.tags import get_tag_by_slug + + # Get tag by slug + result = await get_tag_by_slug(sample_tag["slug"]) + response = json.loads(result) + + # Verify response + assert "tags" in response + assert len(response["tags"]) == 1 + + tag = response["tags"][0] + assert tag["slug"] == sample_tag["slug"] + assert tag["name"] == sample_tag["name"] + + async def test_get_tag_by_nonexistent_id(self, mcp_server): + """Test getting a tag with non-existent ID returns proper error.""" + from ghost_mcp.tools.content.tags import get_tag_by_id + + with pytest.raises(Exception) as exc_info: + await get_tag_by_id("nonexistent-id") + + # Verify we get an appropriate error + assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower() + + async def test_get_tag_by_nonexistent_slug(self, mcp_server): + """Test getting a tag with non-existent slug returns proper error.""" + from ghost_mcp.tools.content.tags import get_tag_by_slug + + with pytest.raises(Exception) as exc_info: + await get_tag_by_slug("nonexistent-slug") + + # Verify we get an appropriate error + assert "404" in str(exc_info.value) or "not found" in str(exc_info.value).lower() + + +@pytest.mark.e2e +@pytest.mark.admin +class TestTagsAdminAPIE2E(BaseE2ETest): + """Test tags Admin API functionality end-to-end.""" + + async def test_create_tag_basic(self, mcp_server, test_tag_data, cleanup_test_content): + """Test creating a basic tag.""" + from ghost_mcp.tools.admin.tags import create_tag + + # Create tag + result = await create_tag( + name=test_tag_data["name"], + description=test_tag_data["description"] + ) + response = json.loads(result) + + # Verify response + assert "tags" in response + assert len(response["tags"]) == 1 + + tag = response["tags"][0] + assert tag["name"] == test_tag_data["name"] + assert tag["description"] == test_tag_data["description"] + assert "id" in tag + assert "slug" in tag + + # Track for cleanup + cleanup_test_content["track_tag"](tag["id"]) + + async def test_create_tag_minimal(self, mcp_server, cleanup_test_content): + """Test creating a tag with minimal data (name only).""" + from ghost_mcp.tools.admin.tags import create_tag + + tag_name = "minimal-test-tag" + + # Create tag with only name + result = await create_tag(name=tag_name) + response = json.loads(result) + + # Verify tag was created + tag = response["tags"][0] + assert tag["name"] == tag_name + assert tag["description"] == "" # Should default to empty + assert "slug" in tag + + # Track for cleanup + cleanup_test_content["track_tag"](tag["id"]) + + async def test_create_tag_with_special_characters(self, mcp_server, cleanup_test_content): + """Test creating a tag with special characters.""" + from ghost_mcp.tools.admin.tags import create_tag + + special_name = "Special Tag éñ中文 🏷️" + special_description = "Description with émojis 🎯 and unicode: 中文字符" + + # Create tag with special characters + result = await create_tag( + name=special_name, + description=special_description + ) + response = json.loads(result) + + # Verify special characters are preserved + tag = response["tags"][0] + assert tag["name"] == special_name + assert tag["description"] == special_description + + # Track for cleanup + cleanup_test_content["track_tag"](tag["id"]) + + async def test_tag_slug_generation(self, mcp_server, cleanup_test_content): + """Test that tag slugs are generated correctly from names.""" + from ghost_mcp.tools.admin.tags import create_tag + + tag_name = "Test Tag With Spaces And Special-Characters!" + + # Create tag and check slug generation + result = await create_tag(name=tag_name) + response = json.loads(result) + + tag = response["tags"][0] + slug = tag["slug"] + + # Verify slug is URL-friendly + assert " " not in slug + assert slug.islower() + assert "test-tag-with-spaces" in slug + + # Track for cleanup + cleanup_test_content["track_tag"](tag["id"]) + + async def test_create_duplicate_tag_name(self, mcp_server, sample_tag): + """Test creating a tag with duplicate name returns proper error.""" + from ghost_mcp.tools.admin.tags import create_tag + + # Try to create a tag with the same name as sample_tag + with pytest.raises(Exception) as exc_info: + await create_tag(name=sample_tag["name"]) + + # Verify we get an appropriate error + error_msg = str(exc_info.value).lower() + assert "duplicate" in error_msg or "already exists" in error_msg or "422" in str(exc_info.value) + + async def test_create_tag_empty_name(self, mcp_server): + """Test creating a tag with empty name returns proper error.""" + from ghost_mcp.tools.admin.tags import create_tag + + # Try to create a tag with empty name + with pytest.raises(Exception) as exc_info: + await create_tag(name="") + + # Verify we get a validation error + error_msg = str(exc_info.value).lower() + assert "validation" in error_msg or "required" in error_msg or "400" in str(exc_info.value) + + async def test_tag_visibility_public_by_default(self, mcp_server, cleanup_test_content): + """Test that tags are public by default.""" + from ghost_mcp.tools.admin.tags import create_tag + + # Create a tag + result = await create_tag(name="public-visibility-test") + response = json.loads(result) + + tag = response["tags"][0] + # Tags should be public by default (visibility field might be omitted or set to "public") + assert tag.get("visibility", "public") == "public" + + # Track for cleanup + cleanup_test_content["track_tag"](tag["id"]) + + async def test_tag_creation_fields(self, mcp_server, cleanup_test_content): + """Test that created tags have all expected fields.""" + from ghost_mcp.tools.admin.tags import create_tag + + # Create a tag + result = await create_tag( + name="fields-test-tag", + description="Test description for field validation" + ) + response = json.loads(result) + + tag = response["tags"][0] + + # Verify essential fields are present + essential_fields = ["id", "name", "slug", "description", "created_at", "updated_at"] + for field in essential_fields: + assert field in tag + + # Verify data types + assert isinstance(tag["id"], str) + assert isinstance(tag["name"], str) + assert isinstance(tag["slug"], str) + assert isinstance(tag["description"], str) + + # Track for cleanup + cleanup_test_content["track_tag"](tag["id"]) + + async def test_long_tag_name(self, mcp_server, cleanup_test_content): + """Test creating a tag with a very long name.""" + from ghost_mcp.tools.admin.tags import create_tag + + # Create a tag with a long name (but within reasonable limits) + long_name = "This is a very long tag name that tests the system's ability to handle longer tag names without breaking" + + result = await create_tag(name=long_name) + response = json.loads(result) + + tag = response["tags"][0] + assert tag["name"] == long_name + + # Track for cleanup + cleanup_test_content["track_tag"](tag["id"]) + + async def test_tag_with_long_description(self, mcp_server, cleanup_test_content): + """Test creating a tag with a very long description.""" + from ghost_mcp.tools.admin.tags import create_tag + + long_description = ("This is a very long description that tests the system's ability to handle " + "longer tag descriptions without breaking. " * 10) + + result = await create_tag( + name="long-description-tag", + description=long_description + ) + response = json.loads(result) + + tag = response["tags"][0] + assert tag["description"] == long_description + + # Track for cleanup + cleanup_test_content["track_tag"](tag["id"]) \ No newline at end of file