Skip to content

Server API

The server API provides tools for running Gemini servers programmatically. Use these APIs to create custom server implementations, add middleware, and configure server behavior.

Overview

The server API consists of:

  • ServerConfig - Configuration management and TOML loading
  • start_server() - Server startup and lifecycle management
  • Middleware - Request processing and security (rate limiting, access control, certificate auth)
  • Handlers - Request handling and response generation

Quick Start

Minimal Server

Start a basic Gemini server with default settings:

import asyncio
from pathlib import Path
from nauyaca.server.config import ServerConfig
from nauyaca.server.server import start_server

async def main():
    config = ServerConfig(
        host="localhost",
        port=1965,
        document_root=Path("./capsule"),
        certfile=Path("cert.pem"),
        keyfile=Path("key.pem")
    )

    await start_server(config)

if __name__ == "__main__":
    asyncio.run(main())

Server with Configuration File

Load configuration from a TOML file:

import asyncio
from pathlib import Path
from nauyaca.server.config import ServerConfig
from nauyaca.server.server import start_server

async def main():
    # Load from TOML
    config = ServerConfig.from_toml(Path("config.toml"))

    # Start server with configuration
    await start_server(
        config,
        enable_directory_listing=True,
        log_level="INFO"
    )

if __name__ == "__main__":
    asyncio.run(main())

Server with Custom Middleware

Add custom middleware to the server:

import asyncio
from pathlib import Path
from nauyaca.server.config import ServerConfig
from nauyaca.server.server import start_server
from nauyaca.server.middleware import RateLimitConfig, AccessControlConfig

async def main():
    config = ServerConfig(
        host="0.0.0.0",  # Listen on all interfaces
        port=1965,
        document_root=Path("/var/gemini/capsule"),
        certfile=Path("/etc/gemini/cert.pem"),
        keyfile=Path("/etc/gemini/key.pem")
    )

    # Configure rate limiting
    rate_limit_config = RateLimitConfig(
        capacity=20,        # Allow burst of 20 requests
        refill_rate=2.0,    # Refill 2 tokens per second
        retry_after=60      # Ask clients to wait 60s if limited
    )

    # Configure access control
    access_control_config = AccessControlConfig(
        allow_list=["192.168.1.0/24", "10.0.0.0/8"],
        deny_list=["192.168.1.100"],
        default_allow=False  # Deny by default
    )

    await start_server(
        config,
        enable_rate_limiting=True,
        rate_limit_config=rate_limit_config,
        access_control_config=access_control_config,
        log_level="DEBUG"
    )

if __name__ == "__main__":
    asyncio.run(main())

ServerConfig

ServerConfig dataclass

ServerConfig(
    host: str = "localhost",
    port: int = DEFAULT_PORT,
    document_root: Path | str = ".",
    certfile: Path | str | None = None,
    keyfile: Path | str | None = None,
    enable_rate_limiting: bool = True,
    rate_limit_capacity: int = 10,
    rate_limit_refill_rate: float = 1.0,
    rate_limit_retry_after: int = 30,
    enable_access_control: bool = True,
    access_control_allow_list: list[str] | None = None,
    access_control_deny_list: list[str] | None = None,
    access_control_default_allow: bool = True,
    max_file_size: int = DEFAULT_MAX_FILE_SIZE,
    certificate_auth_paths: list[dict[str, Any]]
    | None = None,
    require_client_cert: bool = False,
    hash_client_ips: bool = True,
    enable_titan: bool = False,
    titan_upload_dir: Path | str | None = None,
    titan_max_upload_size: int = 10 * 1024 * 1024,
    titan_allowed_mime_types: list[str] | None = None,
    titan_auth_tokens: list[str] | None = None,
    titan_enable_delete: bool = False,
    locations: list[LocationConfig] | None = None,
)

Configuration for Gemini server.

Attributes:

Name Type Description
host str

Server host address (default: "localhost").

port int

Server port (default: 1965).

document_root Path | str

Path to directory containing files to serve.

certfile Path | str | None

Path to TLS certificate file.

keyfile Path | str | None

Path to TLS private key file.

Examples:

>>> config = ServerConfig(
...     host="localhost",
...     port=1965,
...     document_root=Path("/var/gemini/capsule"),
...     certfile=Path("/etc/gemini/cert.pem"),
...     keyfile=Path("/etc/gemini/key.pem")
... )

__post_init__

__post_init__() -> None

Validate and normalize configuration after initialization.

Source code in src/nauyaca/server/config.py
def __post_init__(self) -> None:
    """Validate and normalize configuration after initialization."""
    # Convert string paths to Path objects
    if isinstance(self.document_root, str):
        self.document_root = Path(self.document_root)

    if isinstance(self.certfile, str):
        self.certfile = Path(self.certfile)

    if isinstance(self.keyfile, str):
        self.keyfile = Path(self.keyfile)

    # Validate document root
    if not self.document_root.exists():
        raise ValueError(f"Document root does not exist: {self.document_root}")

    if not self.document_root.is_dir():
        raise ValueError(f"Document root is not a directory: {self.document_root}")

    # Validate certificate files if provided
    if self.certfile and not self.certfile.exists():
        raise ValueError(f"Certificate file does not exist: {self.certfile}")

    if self.keyfile and not self.keyfile.exists():
        raise ValueError(f"Key file does not exist: {self.keyfile}")

    # Validate port range
    if not (1 <= self.port <= 65535):
        raise ValueError(f"Invalid port number: {self.port} (must be 1-65535)")

    # Validate Titan configuration
    if isinstance(self.titan_upload_dir, str):
        self.titan_upload_dir = Path(self.titan_upload_dir)

    if self.enable_titan:
        if self.titan_upload_dir is None:
            raise ValueError("titan_upload_dir is required when Titan is enabled")
        # Create upload directory if it doesn't exist
        if not self.titan_upload_dir.exists():
            self.titan_upload_dir.mkdir(parents=True, exist_ok=True)

from_env classmethod

from_env() -> dict[str, Any]

Extract configuration from environment variables.

Reads NAUYACA_* environment variables and returns a dict suitable for overriding ServerConfig fields. Only returns values that are actually set in the environment.

Supported variables

NAUYACA_HOST: Server host address NAUYACA_PORT: Server port (integer) NAUYACA_DOCUMENT_ROOT: Path to document root NAUYACA_CERTFILE: Path to TLS certificate NAUYACA_KEYFILE: Path to TLS private key

Returns:

Type Description
dict[str, Any]

Dict with keys matching ServerConfig fields.

Raises:

Type Description
ValueError

If NAUYACA_PORT is not a valid integer.

Examples:

>>> import os
>>> os.environ['NAUYACA_HOST'] = '0.0.0.0'
>>> env_config = ServerConfig.from_env()
>>> env_config.get('host')
'0.0.0.0'
Source code in src/nauyaca/server/config.py
@classmethod
def from_env(cls) -> dict[str, Any]:
    """Extract configuration from environment variables.

    Reads NAUYACA_* environment variables and returns a dict suitable
    for overriding ServerConfig fields. Only returns values that are
    actually set in the environment.

    Supported variables:
        NAUYACA_HOST: Server host address
        NAUYACA_PORT: Server port (integer)
        NAUYACA_DOCUMENT_ROOT: Path to document root
        NAUYACA_CERTFILE: Path to TLS certificate
        NAUYACA_KEYFILE: Path to TLS private key

    Returns:
        Dict with keys matching ServerConfig fields.

    Raises:
        ValueError: If NAUYACA_PORT is not a valid integer.

    Examples:
        >>> import os
        >>> os.environ['NAUYACA_HOST'] = '0.0.0.0'
        >>> env_config = ServerConfig.from_env()
        >>> env_config.get('host')
        '0.0.0.0'
    """
    import os

    config: dict[str, Any] = {}

    # String fields
    if host := os.getenv("NAUYACA_HOST"):
        config["host"] = host

    # Integer fields
    port_str = os.getenv("NAUYACA_PORT")
    if port_str is not None:
        try:
            config["port"] = int(port_str)
        except ValueError as e:
            raise ValueError(
                f"Invalid NAUYACA_PORT: {port_str!r} (must be an integer 1-65535)"
            ) from e

    # Path fields
    if document_root := os.getenv("NAUYACA_DOCUMENT_ROOT"):
        config["document_root"] = document_root

    if certfile := os.getenv("NAUYACA_CERTFILE"):
        config["certfile"] = certfile

    if keyfile := os.getenv("NAUYACA_KEYFILE"):
        config["keyfile"] = keyfile

    return config

from_toml classmethod

from_toml(path: Path) -> ServerConfig

Load configuration from TOML file.

Parameters:

Name Type Description Default
path Path

Path to TOML configuration file.

required

Returns:

Type Description
ServerConfig

ServerConfig instance loaded from TOML.

Raises:

Type Description
FileNotFoundError

If config file doesn't exist.

ValueError

If config is invalid or cannot be parsed.

Examples:

>>> config = ServerConfig.from_toml(Path("config.toml"))
>>> print(config.host, config.port)
localhost 1965
Source code in src/nauyaca/server/config.py
@classmethod
def from_toml(cls, path: Path) -> "ServerConfig":
    """Load configuration from TOML file.

    Args:
        path: Path to TOML configuration file.

    Returns:
        ServerConfig instance loaded from TOML.

    Raises:
        FileNotFoundError: If config file doesn't exist.
        ValueError: If config is invalid or cannot be parsed.

    Examples:
        >>> config = ServerConfig.from_toml(Path("config.toml"))
        >>> print(config.host, config.port)
        localhost 1965
    """
    if not path.exists():
        raise FileNotFoundError(f"Config file not found: {path}")

    try:
        with open(path, "rb") as f:
            data = tomllib.load(f)
    except Exception as e:
        raise ValueError(f"Failed to parse TOML file: {e}") from e

    # Extract sections
    server = data.get("server", {})
    rate_limit = data.get("rate_limit", {})
    access_control = data.get("access_control", {})
    certificate_auth = data.get("certificate_auth", {})
    logging_config = data.get("logging", {})
    titan = data.get("titan", {})

    # Parse locations if present
    locations_data = data.get("locations", [])
    locations: list[LocationConfig] | None = None
    if locations_data:
        locations = [LocationConfig.from_dict(loc) for loc in locations_data]

    # Build config with proper type conversions
    return cls(
        # Server settings
        host=server.get("host", "localhost"),
        port=server.get("port", DEFAULT_PORT),
        document_root=server.get("document_root", "."),
        certfile=server.get("certfile"),
        keyfile=server.get("keyfile"),
        max_file_size=server.get("max_file_size", DEFAULT_MAX_FILE_SIZE),
        # Rate limiting
        enable_rate_limiting=rate_limit.get("enabled", True),
        rate_limit_capacity=rate_limit.get("capacity", 10),
        rate_limit_refill_rate=rate_limit.get("refill_rate", 1.0),
        rate_limit_retry_after=rate_limit.get("retry_after", 30),
        # Access control
        enable_access_control=access_control.get("enabled", True),
        access_control_allow_list=access_control.get("allow_list"),
        access_control_deny_list=access_control.get("deny_list"),
        access_control_default_allow=access_control.get("default_allow", True),
        # Path-based certificate authentication
        certificate_auth_paths=certificate_auth.get("paths"),
        require_client_cert=server.get("require_client_cert", False),
        # Logging/privacy
        hash_client_ips=logging_config.get("hash_ips", True),
        # Titan upload configuration
        enable_titan=titan.get("enabled", False),
        titan_upload_dir=titan.get("upload_dir"),
        titan_max_upload_size=titan.get("max_upload_size", 10 * 1024 * 1024),
        titan_allowed_mime_types=titan.get("allowed_mime_types"),
        titan_auth_tokens=titan.get("auth_tokens"),
        titan_enable_delete=titan.get("enable_delete", False),
        # Location-based routing
        locations=locations,
    )

get_access_control_config

get_access_control_config() -> AccessControlConfig | None

Get access control configuration.

Returns:

Type Description
AccessControlConfig | None

AccessControlConfig instance if enabled and lists are configured,

AccessControlConfig | None

None otherwise.

Source code in src/nauyaca/server/config.py
def get_access_control_config(self) -> AccessControlConfig | None:
    """Get access control configuration.

    Returns:
        AccessControlConfig instance if enabled and lists are configured,
        None otherwise.
    """
    if not self.enable_access_control:
        return None

    if not (self.access_control_allow_list or self.access_control_deny_list):
        return None

    return AccessControlConfig(
        allow_list=self.access_control_allow_list,
        deny_list=self.access_control_deny_list,
        default_allow=self.access_control_default_allow,
    )

get_certificate_auth_config

get_certificate_auth_config() -> (
    CertificateAuthConfig | None
)

Get certificate authentication configuration.

Returns:

Type Description
CertificateAuthConfig | None

CertificateAuthConfig instance if path rules are configured, None otherwise.

Source code in src/nauyaca/server/config.py
def get_certificate_auth_config(self) -> CertificateAuthConfig | None:
    """Get certificate authentication configuration.

    Returns:
        CertificateAuthConfig instance if path rules are configured, None otherwise.
    """
    if not self.certificate_auth_paths:
        return None

    path_rules = []
    for path_config in self.certificate_auth_paths:
        # Convert fingerprints list to set if present
        fingerprints_list = path_config.get("allowed_fingerprints")
        fingerprints = set(fingerprints_list) if fingerprints_list else None

        path_rules.append(
            CertificateAuthPathRule(
                prefix=path_config["prefix"],
                require_cert=path_config.get("require_cert", False),
                allowed_fingerprints=fingerprints,
            )
        )

    return CertificateAuthConfig(path_rules=path_rules)

get_location_router

get_location_router(
    enable_directory_listing: bool = False,
) -> Router | None

Build a router from location configurations.

Creates handlers for each location and registers them with a Router. If no locations are configured, returns None.

Parameters:

Name Type Description Default
enable_directory_listing bool

Default directory listing setting for static handlers that don't specify it.

False

Returns:

Type Description
Router | None

Router instance configured with location handlers, or None if

Router | None

no locations are configured.

Examples:

>>> config = ServerConfig.from_toml(Path("config.toml"))
>>> router = config.get_location_router()
>>> if router:
...     response = router.route(request)
Source code in src/nauyaca/server/config.py
def get_location_router(
    self, enable_directory_listing: bool = False
) -> "Router | None":
    """Build a router from location configurations.

    Creates handlers for each location and registers them with a Router.
    If no locations are configured, returns None.

    Args:
        enable_directory_listing: Default directory listing setting for
            static handlers that don't specify it.

    Returns:
        Router instance configured with location handlers, or None if
        no locations are configured.

    Examples:
        >>> config = ServerConfig.from_toml(Path("config.toml"))
        >>> router = config.get_location_router()
        >>> if router:
        ...     response = router.route(request)
    """
    if not self.locations:
        return None

    from .handler import RequestHandler, StaticFileHandler
    from .proxy import ProxyHandler
    from .router import Router, RouteType

    def create_handler(loc: LocationConfig) -> RequestHandler:
        """Create handler instance from location config."""
        if loc.handler_type == HandlerType.STATIC:
            # document_root is validated in LocationConfig.__post_init__
            assert loc.document_root is not None
            return StaticFileHandler(
                document_root=loc.document_root,
                enable_directory_listing=loc.enable_directory_listing
                or enable_directory_listing,
                default_indices=loc.default_indices,
                max_file_size=loc.max_file_size or self.max_file_size,
            )
        elif loc.handler_type == HandlerType.PROXY:
            # upstream is validated in LocationConfig.__post_init__
            assert loc.upstream is not None
            return ProxyHandler(
                upstream=loc.upstream,
                prefix=loc.prefix,
                strip_prefix=loc.strip_prefix,
                timeout=loc.timeout,
            )
        else:
            raise ValueError(f"Unknown handler type: {loc.handler_type}")

    router = Router()
    for location in self.locations:
        handler = create_handler(location)
        router.add_route(location.prefix, handler.handle, route_type=RouteType.PREFIX)

    return router

get_rate_limit_config

get_rate_limit_config() -> RateLimitConfig

Get rate limit configuration.

Returns:

Type Description
RateLimitConfig

RateLimitConfig instance with current settings.

Source code in src/nauyaca/server/config.py
def get_rate_limit_config(self) -> RateLimitConfig:
    """Get rate limit configuration.

    Returns:
        RateLimitConfig instance with current settings.
    """
    return RateLimitConfig(
        capacity=self.rate_limit_capacity,
        refill_rate=self.rate_limit_refill_rate,
        retry_after=self.rate_limit_retry_after,
    )

get_upload_handler

get_upload_handler() -> FileUploadHandler | None

Get the Titan upload handler if Titan is enabled.

Returns:

Type Description
FileUploadHandler | None

FileUploadHandler instance if Titan is enabled, None otherwise.

Source code in src/nauyaca/server/config.py
def get_upload_handler(self) -> "FileUploadHandler | None":
    """Get the Titan upload handler if Titan is enabled.

    Returns:
        FileUploadHandler instance if Titan is enabled, None otherwise.
    """
    if not self.enable_titan or self.titan_upload_dir is None:
        return None

    from .handler import FileUploadHandler

    # Convert auth tokens list to set
    auth_tokens = set(self.titan_auth_tokens) if self.titan_auth_tokens else None

    return FileUploadHandler(
        upload_dir=self.titan_upload_dir,
        max_size=self.titan_max_upload_size,
        allowed_types=self.titan_allowed_mime_types,
        auth_tokens=auth_tokens,
        enable_delete=self.titan_enable_delete,
    )

validate

validate() -> None

Validate the server configuration.

Raises:

Type Description
ValueError

If configuration is invalid.

Source code in src/nauyaca/server/config.py
def validate(self) -> None:
    """Validate the server configuration.

    Raises:
        ValueError: If configuration is invalid.
    """
    # Additional runtime validation can be added here
    if (self.certfile is None) != (self.keyfile is None):
        raise ValueError(
            "Both certfile and keyfile must be provided together, "
            "or both must be None"
        )

Loading from TOML

The ServerConfig.from_toml() method loads configuration from a TOML file:

from pathlib import Path
from nauyaca.server.config import ServerConfig

# Load from file
config = ServerConfig.from_toml(Path("config.toml"))

# Access configuration values
print(f"Server will run on {config.host}:{config.port}")
print(f"Serving files from {config.document_root}")

Example TOML configuration:

[server]
host = "0.0.0.0"
port = 1965
document_root = "/var/gemini/capsule"
certfile = "/etc/gemini/cert.pem"
keyfile = "/etc/gemini/key.pem"
max_file_size = 104857600  # 100 MiB

[rate_limit]
enabled = true
capacity = 10
refill_rate = 1.0
retry_after = 30

[access_control]
allow_list = ["192.168.1.0/24"]
default_allow = false

[logging]
hash_ips = true

Programmatic Configuration

Create configuration entirely in code:

from pathlib import Path
from nauyaca.server.config import ServerConfig

config = ServerConfig(
    host="localhost",
    port=1965,
    document_root=Path("/var/gemini/capsule"),
    certfile=Path("/etc/gemini/cert.pem"),
    keyfile=Path("/etc/gemini/key.pem"),

    # Rate limiting
    enable_rate_limiting=True,
    rate_limit_capacity=10,
    rate_limit_refill_rate=1.0,
    rate_limit_retry_after=30,

    # Access control
    access_control_allow_list=["192.168.1.0/24"],
    access_control_default_allow=False,

    # Security
    hash_client_ips=True,
    max_file_size=100 * 1024 * 1024  # 100 MiB
)

# Validate configuration
config.validate()

# Get middleware configurations
rate_limit_config = config.get_rate_limit_config()
access_control_config = config.get_access_control_config()

Server Functions

start_server async

start_server(
    config: ServerConfig,
    enable_directory_listing: bool = False,
    log_level: str = "INFO",
    log_file: Path | None = None,
    json_logs: bool = False,
    enable_rate_limiting: bool = True,
    rate_limit_config: RateLimitConfig | None = None,
    access_control_config: AccessControlConfig
    | None = None,
    certificate_auth_config: CertificateAuthConfig
    | None = None,
    hash_ips: bool | None = None,
    max_file_size: int | None = None,
) -> None

Start a Gemini server with the given configuration.

This function sets up a Gemini server with static file serving, routing, TLS configuration, and middleware. It runs until interrupted.

Parameters:

Name Type Description Default
config ServerConfig

Server configuration.

required
enable_directory_listing bool

Enable automatic directory listings.

False
log_level str

Logging level (DEBUG, INFO, WARNING, ERROR).

'INFO'
log_file Path | None

Optional path to log file. If None, logs to stdout.

None
json_logs bool

If True, output logs in JSON format.

False
enable_rate_limiting bool

Enable rate limiting middleware.

True
rate_limit_config RateLimitConfig | None

Rate limiting configuration. Uses defaults if None.

None
access_control_config AccessControlConfig | None

Access control configuration. None to disable.

None
certificate_auth_config CertificateAuthConfig | None

Certificate auth configuration. None to disable.

None
hash_ips bool | None

Hash client IPs in logs. If None, uses config.hash_client_ips.

None
max_file_size int | None

Maximum file size to serve. If None, uses config.max_file_size.

None

Raises:

Type Description
ValueError

If configuration is invalid.

OSError

If unable to bind to the specified host/port.

Examples:

>>> import asyncio
>>> from pathlib import Path
>>> config = ServerConfig(
...     host="localhost",
...     port=1965,
...     document_root=Path("./capsule"),
...     certfile=Path("cert.pem"),
...     keyfile=Path("key.pem")
... )
>>> asyncio.run(start_server(config))
Source code in src/nauyaca/server/server.py
async def start_server(
    config: ServerConfig,
    enable_directory_listing: bool = False,
    log_level: str = "INFO",
    log_file: Path | None = None,
    json_logs: bool = False,
    enable_rate_limiting: bool = True,
    rate_limit_config: RateLimitConfig | None = None,
    access_control_config: AccessControlConfig | None = None,
    certificate_auth_config: CertificateAuthConfig | None = None,
    hash_ips: bool | None = None,
    max_file_size: int | None = None,
) -> None:
    """Start a Gemini server with the given configuration.

    This function sets up a Gemini server with static file serving,
    routing, TLS configuration, and middleware. It runs until interrupted.

    Args:
        config: Server configuration.
        enable_directory_listing: Enable automatic directory listings.
        log_level: Logging level (DEBUG, INFO, WARNING, ERROR).
        log_file: Optional path to log file. If None, logs to stdout.
        json_logs: If True, output logs in JSON format.
        enable_rate_limiting: Enable rate limiting middleware.
        rate_limit_config: Rate limiting configuration. Uses defaults if None.
        access_control_config: Access control configuration. None to disable.
        certificate_auth_config: Certificate auth configuration. None to disable.
        hash_ips: Hash client IPs in logs. If None, uses config.hash_client_ips.
        max_file_size: Maximum file size to serve. If None, uses config.max_file_size.

    Raises:
        ValueError: If configuration is invalid.
        OSError: If unable to bind to the specified host/port.

    Examples:
        >>> import asyncio
        >>> from pathlib import Path
        >>> config = ServerConfig(
        ...     host="localhost",
        ...     port=1965,
        ...     document_root=Path("./capsule"),
        ...     certfile=Path("cert.pem"),
        ...     keyfile=Path("key.pem")
        ... )
        >>> asyncio.run(start_server(config))
    """
    # Resolve hash_ips from config if not explicitly set
    effective_hash_ips = hash_ips if hash_ips is not None else config.hash_client_ips

    # Configure logging first
    configure_logging(
        log_level=log_level,
        log_file=log_file,
        json_logs=json_logs,
        hash_ips=effective_hash_ips,
    )
    logger = get_logger(__name__)

    # Validate configuration
    config.validate()

    # Resolve max_file_size from config if not explicitly set
    effective_max_file_size = (
        max_file_size if max_file_size is not None else config.max_file_size
    )

    # Set up default 404 handler
    def default_404_handler(request: object) -> GeminiResponse:
        from ..protocol.request import GeminiRequest

        if isinstance(request, GeminiRequest):
            path = request.path
        else:
            path = "/"
        return GeminiResponse(
            status=StatusCode.NOT_FOUND.value,
            meta="text/gemini",
            body=error_404(path),
        )

    # Create router - use location-based routing if configured, else simple static
    location_router = config.get_location_router(enable_directory_listing)

    if location_router:
        # Location-based routing configured
        router = location_router
        router.set_default_handler(default_404_handler)
        # locations is guaranteed non-empty when location_router exists
        assert config.locations is not None
        logger.info(
            "location_routing_enabled",
            location_count=len(config.locations),
            prefixes=[loc.prefix for loc in config.locations],
        )
    else:
        # Fallback: simple static file handler for document_root
        from .router import RouteType

        router = Router()
        static_handler = StaticFileHandler(
            config.document_root,
            enable_directory_listing=enable_directory_listing,
            max_file_size=effective_max_file_size,
        )
        router.set_default_handler(default_404_handler)
        router.add_route("/", static_handler.handle, route_type=RouteType.PREFIX)

    # Determine if we need to request client certificates
    # PyOpenSSL is used if:
    # 1. config.require_client_cert is explicitly True, OR
    # 2. ANY certificate auth path rule requires certificates or has fingerprint whitelist
    request_client_cert = config.require_client_cert or (
        certificate_auth_config is not None
        and any(
            rule.require_cert or rule.allowed_fingerprints is not None
            for rule in certificate_auth_config.path_rules
        )
    )

    # Determine if we need PyOpenSSL for client certificate support
    # PyOpenSSL is required because Python's ssl module with OpenSSL 3.x
    # silently rejects self-signed client certificates
    use_pyopenssl = request_client_cert

    # Create SSL context (only used when NOT using PyOpenSSL)
    ssl_context: ssl.SSLContext | None = None
    pyopenssl_ctx = None

    if use_pyopenssl:
        # Use PyOpenSSL for proper self-signed client cert support
        if config.certfile and config.keyfile:
            pyopenssl_ctx = create_pyopenssl_server_context(
                str(config.certfile),
                str(config.keyfile),
                request_client_cert=True,
            )
            logger.info(
                "tls_configured",
                certfile=str(config.certfile),
                keyfile=str(config.keyfile),
                request_client_cert=True,
                tls_backend="pyopenssl",
            )
        else:
            # For testing: create self-signed certificate with PyOpenSSL
            pyopenssl_ctx = _create_self_signed_pyopenssl_context()
            logger.warning(
                "using_self_signed_certificate",
                mode="testing_only",
                tls_backend="pyopenssl",
            )
    else:
        # Standard ssl module is fine when not requesting client certs
        if config.certfile and config.keyfile:
            ssl_context = create_server_context(
                str(config.certfile),
                str(config.keyfile),
                request_client_cert=False,
            )
            logger.info(
                "tls_configured",
                certfile=str(config.certfile),
                keyfile=str(config.keyfile),
                request_client_cert=False,
                tls_backend="stdlib",
            )
        else:
            # For testing: create self-signed certificate
            ssl_context = _create_self_signed_context(request_client_cert=False)
            logger.warning(
                "using_self_signed_certificate",
                mode="testing_only",
                tls_backend="stdlib",
            )

    # Set up middleware chain
    middlewares: list[Any] = []

    # Add certificate auth if configured (check this first - before IP-based checks)
    if certificate_auth_config:
        cert_auth = CertificateAuth(certificate_auth_config)
        middlewares.append(cert_auth)
        logger.info(
            "certificate_auth_enabled",
            path_rules_count=len(certificate_auth_config.path_rules),
            paths_requiring_cert=[
                rule.prefix
                for rule in certificate_auth_config.path_rules
                if rule.require_cert or rule.allowed_fingerprints is not None
            ],
        )

    # Add access control if configured
    if access_control_config:
        access_control = AccessControl(access_control_config)
        middlewares.append(access_control)
        logger.info(
            "access_control_enabled",
            allow_list=access_control_config.allow_list,
            deny_list=access_control_config.deny_list,
            default_allow=access_control_config.default_allow,
        )

    # Add rate limiting if enabled
    if enable_rate_limiting:
        rate_limiter = RateLimiter(rate_limit_config)
        rate_limiter.start()  # Start cleanup task
        middlewares.append(rate_limiter)
        logger.info(
            "rate_limiting_enabled",
            capacity=rate_limiter.config.capacity,
            refill_rate=rate_limiter.config.refill_rate,
            retry_after=rate_limiter.config.retry_after,
        )

    # Create middleware chain if any middlewares configured
    middleware_chain = MiddlewareChain(middlewares) if middlewares else None

    # Get event loop
    loop = asyncio.get_running_loop()

    # Create server using Protocol pattern
    if use_pyopenssl and pyopenssl_ctx is not None:
        # Use PyOpenSSL TLS wrapper - NO ssl= parameter since TLS is handled manually
        server = await loop.create_server(
            lambda: TLSServerProtocol(
                lambda: GeminiServerProtocol(router.route, middleware_chain),
                pyopenssl_ctx,
            ),
            config.host,
            config.port,
            # NO ssl= parameter - TLS handled by TLSServerProtocol
        )
    else:
        # Standard ssl module
        server = await loop.create_server(
            lambda: GeminiServerProtocol(router.route, middleware_chain),
            config.host,
            config.port,
            ssl=ssl_context,
        )

    logger.info(
        "server_started",
        host=config.host,
        port=config.port,
        document_root=str(config.document_root),
        directory_listing_enabled=enable_directory_listing,
    )

    async with server:
        await server.serve_forever()

Server Lifecycle

The start_server() function runs indefinitely until interrupted:

import asyncio
import signal
from nauyaca.server.config import ServerConfig
from nauyaca.server.server import start_server

async def main():
    config = ServerConfig.from_toml(Path("config.toml"))

    # This runs until Ctrl+C or SIGTERM
    await start_server(config)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Server stopped by user")

Graceful Shutdown

Handle graceful shutdown with signal handlers:

import asyncio
import signal
from nauyaca.server.config import ServerConfig
from nauyaca.server.server import start_server

shutdown_event = asyncio.Event()

def handle_shutdown(signum, frame):
    shutdown_event.set()

async def main():
    # Set up signal handlers
    signal.signal(signal.SIGINT, handle_shutdown)
    signal.signal(signal.SIGTERM, handle_shutdown)

    config = ServerConfig.from_toml(Path("config.toml"))

    # Run server with shutdown handling
    server_task = asyncio.create_task(start_server(config))
    shutdown_task = asyncio.create_task(shutdown_event.wait())

    # Wait for either server to finish or shutdown signal
    done, pending = await asyncio.wait(
        [server_task, shutdown_task],
        return_when=asyncio.FIRST_COMPLETED
    )

    # Cancel remaining tasks
    for task in pending:
        task.cancel()

    print("Server shutdown complete")

if __name__ == "__main__":
    asyncio.run(main())

Middleware

Middleware components process requests before they reach handlers. They can:

  • Block requests (rate limiting, access control)
  • Require authentication (client certificates)
  • Log requests
  • Modify request context

Middleware Protocol

All middleware must implement the Middleware protocol:

Middleware

Bases: Protocol

Protocol for middleware components.

process_request async

process_request(
    request_url: str,
    client_ip: str,
    client_cert_fingerprint: str | None = None,
) -> tuple[bool, str | None]

Process a request.

Parameters:

Name Type Description Default
request_url str

The requested URL.

required
client_ip str

The client's IP address.

required
client_cert_fingerprint str | None

SHA-256 fingerprint of client certificate, or None if client didn't present a certificate.

None

Returns:

Type Description
bool

Tuple of (allow, error_response):

str | None
  • (True, None) if request should proceed
tuple[bool, str | None]
  • (False, gemini_response) if request should be rejected
Source code in src/nauyaca/server/middleware.py
async def process_request(
    self,
    request_url: str,
    client_ip: str,
    client_cert_fingerprint: str | None = None,
) -> tuple[bool, str | None]:
    """Process a request.

    Args:
        request_url: The requested URL.
        client_ip: The client's IP address.
        client_cert_fingerprint: SHA-256 fingerprint of client certificate,
            or None if client didn't present a certificate.

    Returns:
        Tuple of (allow, error_response):
        - (True, None) if request should proceed
        - (False, gemini_response) if request should be rejected
    """
    ...

Rate Limiting

RateLimitConfig dataclass

RateLimitConfig(
    capacity: int = 10,
    refill_rate: float = 1.0,
    retry_after: int = 30,
)

Configuration for rate limiting.

RateLimiter

RateLimiter(config: RateLimitConfig | None = None)

Rate limiting middleware using token bucket algorithm.

Tracks per-IP request rates and returns status 44 (SLOW DOWN) when limits are exceeded.

Initialize rate limiter.

Parameters:

Name Type Description Default
config RateLimitConfig | None

Rate limit configuration. Uses defaults if None.

None
Source code in src/nauyaca/server/middleware.py
def __init__(self, config: RateLimitConfig | None = None):
    """Initialize rate limiter.

    Args:
        config: Rate limit configuration. Uses defaults if None.
    """
    self.config = config or RateLimitConfig()
    self.buckets: dict[str, TokenBucket] = {}
    self._cleanup_task: asyncio.Task | None = None

start

start() -> None

Start background cleanup task.

Source code in src/nauyaca/server/middleware.py
def start(self) -> None:
    """Start background cleanup task."""
    if self._cleanup_task is None:
        self._cleanup_task = asyncio.create_task(self._cleanup_loop())

stop async

stop() -> None

Stop background cleanup task.

Source code in src/nauyaca/server/middleware.py
async def stop(self) -> None:
    """Stop background cleanup task."""
    if self._cleanup_task:
        self._cleanup_task.cancel()
        try:
            await self._cleanup_task
        except asyncio.CancelledError:
            pass

process_request async

process_request(
    request_url: str,
    client_ip: str,
    client_cert_fingerprint: str | None = None,
) -> tuple[bool, str | None]

Process request with rate limiting.

Parameters:

Name Type Description Default
request_url str

The requested URL.

required
client_ip str

The client's IP address.

required
client_cert_fingerprint str | None

Client certificate fingerprint (unused).

None

Returns:

Type Description
tuple[bool, str | None]

Tuple of (allow, error_response).

Source code in src/nauyaca/server/middleware.py
async def process_request(
    self,
    request_url: str,
    client_ip: str,
    client_cert_fingerprint: str | None = None,
) -> tuple[bool, str | None]:
    """Process request with rate limiting.

    Args:
        request_url: The requested URL.
        client_ip: The client's IP address.
        client_cert_fingerprint: Client certificate fingerprint (unused).

    Returns:
        Tuple of (allow, error_response).
    """
    # Get or create token bucket for this IP
    if client_ip not in self.buckets:
        self.buckets[client_ip] = TokenBucket(
            self.config.capacity, self.config.refill_rate
        )

    bucket = self.buckets[client_ip]

    # Try to consume token
    if bucket.consume():
        return True, None

    # Rate limit exceeded - return 44 SLOW DOWN
    retry_after = self.config.retry_after
    response = f"44 Rate limit exceeded. Retry after {retry_after} seconds\r\n"
    return False, response

Example usage:

from nauyaca.server.middleware import RateLimiter, RateLimitConfig

# Create rate limiter
config = RateLimitConfig(
    capacity=10,        # Allow 10 requests in burst
    refill_rate=1.0,    # Add 1 token per second
    retry_after=30      # Tell clients to wait 30s
)

rate_limiter = RateLimiter(config)
rate_limiter.start()  # Start background cleanup

# Use with server
await start_server(
    server_config,
    enable_rate_limiting=True,
    rate_limit_config=config
)

Access Control

AccessControlConfig dataclass

AccessControlConfig(
    allow_list: list[str] | None = None,
    deny_list: list[str] | None = None,
    default_allow: bool = True,
)

Configuration for IP-based access control.

AccessControl

AccessControl(config: AccessControlConfig | None = None)

IP-based access control middleware.

Supports allow/deny lists with CIDR notation. Returns status 53 (PROXY REQUEST REFUSED) for blocked IPs.

Initialize access control.

Parameters:

Name Type Description Default
config AccessControlConfig | None

Access control configuration. Uses defaults if None.

None
Source code in src/nauyaca/server/middleware.py
def __init__(self, config: AccessControlConfig | None = None):
    """Initialize access control.

    Args:
        config: Access control configuration. Uses defaults if None.
    """
    self.config = config or AccessControlConfig()

    # Parse allow list
    self.allow_networks: list[IPv4Network | IPv6Network] = []
    if self.config.allow_list:
        for cidr in self.config.allow_list:
            try:
                self.allow_networks.append(ip_network(cidr))
            except ValueError:
                # Try as single IP
                try:
                    self.allow_networks.append(ip_network(f"{cidr}/32"))
                except ValueError:
                    # Try IPv6
                    self.allow_networks.append(ip_network(f"{cidr}/128"))

    # Parse deny list
    self.deny_networks: list[IPv4Network | IPv6Network] = []
    if self.config.deny_list:
        for cidr in self.config.deny_list:
            try:
                self.deny_networks.append(ip_network(cidr))
            except ValueError:
                # Try as single IP
                try:
                    self.deny_networks.append(ip_network(f"{cidr}/32"))
                except ValueError:
                    # Try IPv6
                    self.deny_networks.append(ip_network(f"{cidr}/128"))

process_request async

process_request(
    request_url: str,
    client_ip: str,
    client_cert_fingerprint: str | None = None,
) -> tuple[bool, str | None]

Process request with access control.

Parameters:

Name Type Description Default
request_url str

The requested URL.

required
client_ip str

The client's IP address.

required
client_cert_fingerprint str | None

Client certificate fingerprint (unused).

None

Returns:

Type Description
tuple[bool, str | None]

Tuple of (allow, error_response).

Source code in src/nauyaca/server/middleware.py
async def process_request(
    self,
    request_url: str,
    client_ip: str,
    client_cert_fingerprint: str | None = None,
) -> tuple[bool, str | None]:
    """Process request with access control.

    Args:
        request_url: The requested URL.
        client_ip: The client's IP address.
        client_cert_fingerprint: Client certificate fingerprint (unused).

    Returns:
        Tuple of (allow, error_response).
    """
    if self._is_allowed(client_ip):
        return True, None

    # IP is blocked
    response = "53 Access denied\r\n"
    return False, response

Example usage:

from nauyaca.server.middleware import AccessControl, AccessControlConfig

# Allow specific networks, deny specific IPs
config = AccessControlConfig(
    allow_list=["192.168.1.0/24", "10.0.0.0/8"],
    deny_list=["192.168.1.100", "10.0.0.50"],
    default_allow=False  # Deny everything not in allow list
)

access_control = AccessControl(config)

# Use with server
await start_server(
    server_config,
    access_control_config=config
)

CIDR Notation

Access control lists support CIDR notation for network ranges:

  • 192.168.1.0/24 matches 192.168.1.0-192.168.1.255
  • 10.0.0.0/8 matches 10.0.0.0-10.255.255.255
  • 192.168.1.100 matches a single IP (equivalent to /32)

Certificate Authentication

CertificateAuthPathRule dataclass

CertificateAuthPathRule(
    prefix: str,
    require_cert: bool = False,
    allowed_fingerprints: set[str] | None = None,
)

Certificate auth rule for a specific path prefix.

Per Gemini application best practices, certificate requirements are typically activated for specific URL prefixes (e.g., /app/, /admin/) rather than globally, allowing public and authenticated content to coexist.

CertificateAuthConfig dataclass

CertificateAuthConfig(
    path_rules: list[CertificateAuthPathRule] = list(),
)

Configuration for path-based certificate authentication.

Path rules are checked in order - the first matching rule applies. If no rule matches a request path, the request is allowed without certificate requirements.

CertificateAuth

CertificateAuth(
    config: CertificateAuthConfig | None = None,
)

Certificate-based authentication middleware.

Can require client certificates (status 60) and/or validate against a whitelist of allowed certificate fingerprints (status 61) on a per-path basis.

Per Gemini application best practices, this enables: - Account registration flows using client certificates - Certificate-based access control for specific paths - User identity verification via certificate fingerprints - Mixed public/authenticated content serving

Initialize certificate authentication middleware.

Parameters:

Name Type Description Default
config CertificateAuthConfig | None

Certificate auth configuration. Uses defaults if None.

None
Source code in src/nauyaca/server/middleware.py
def __init__(self, config: CertificateAuthConfig | None = None):
    """Initialize certificate authentication middleware.

    Args:
        config: Certificate auth configuration. Uses defaults if None.
    """
    self.config = config or CertificateAuthConfig()

process_request async

process_request(
    request_url: str,
    client_ip: str,
    client_cert_fingerprint: str | None = None,
) -> tuple[bool, str | None]

Process request with path-based certificate authentication.

Parameters:

Name Type Description Default
request_url str

The requested URL.

required
client_ip str

The client's IP address.

required
client_cert_fingerprint str | None

SHA-256 fingerprint of client certificate.

None

Returns:

Type Description
tuple[bool, str | None]

Tuple of (allow, error_response).

Source code in src/nauyaca/server/middleware.py
async def process_request(
    self,
    request_url: str,
    client_ip: str,
    client_cert_fingerprint: str | None = None,
) -> tuple[bool, str | None]:
    """Process request with path-based certificate authentication.

    Args:
        request_url: The requested URL.
        client_ip: The client's IP address.
        client_cert_fingerprint: SHA-256 fingerprint of client certificate.

    Returns:
        Tuple of (allow, error_response).
    """
    # Extract path from URL
    path = self._extract_path(request_url)

    # Find matching rule (first match wins)
    rule = self._find_matching_rule(path)

    if rule is None:
        # No rule matches - allow without cert
        return True, None

    # Apply rule's requirements
    if rule.require_cert and client_cert_fingerprint is None:
        return False, "60 Client certificate required\r\n"

    if rule.allowed_fingerprints is not None:
        if client_cert_fingerprint is None:
            # Whitelist requires a cert
            return False, "60 Client certificate required\r\n"

        if client_cert_fingerprint not in rule.allowed_fingerprints:
            return False, "61 Certificate not authorized\r\n"

    return True, None

Example usage:

from nauyaca.server.middleware import (
    CertificateAuth,
    CertificateAuthConfig,
    CertificateAuthPathRule
)

# Require certificates for specific paths
config = CertificateAuthConfig(
    path_rules=[
        # Public path - no cert required
        CertificateAuthPathRule(
            prefix="/public/",
            require_cert=False
        ),
        # Admin area - cert required
        CertificateAuthPathRule(
            prefix="/admin/",
            require_cert=True
        ),
        # App area - specific certs only
        CertificateAuthPathRule(
            prefix="/app/",
            require_cert=True,
            allowed_fingerprints={
                "a1b2c3...",  # SHA-256 fingerprints
                "d4e5f6..."
            }
        )
    ]
)

# Use with server
await start_server(
    server_config,
    certificate_auth_config=config
)

Certificate Fingerprints

Certificate fingerprints are SHA-256 hashes of the DER-encoded certificate. Use the nauyaca cert fingerprint command to calculate fingerprints for client certificates.

PyOpenSSL Required

When using certificate authentication, Nauyaca automatically uses PyOpenSSL instead of the standard library's ssl module. This is required because OpenSSL 3.x silently rejects self-signed client certificates, which are common in Geminispace.

Middleware Chain

MiddlewareChain

MiddlewareChain(middlewares: list[Middleware])

Chain multiple middleware components together.

Initialize middleware chain.

Parameters:

Name Type Description Default
middlewares list[Middleware]

List of middleware instances.

required
Source code in src/nauyaca/server/middleware.py
def __init__(self, middlewares: list[Middleware]):
    """Initialize middleware chain.

    Args:
        middlewares: List of middleware instances.
    """
    self.middlewares = middlewares

process_request async

process_request(
    request_url: str,
    client_ip: str,
    client_cert_fingerprint: str | None = None,
) -> tuple[bool, str | None]

Process request through all middleware.

Parameters:

Name Type Description Default
request_url str

The requested URL.

required
client_ip str

The client's IP address.

required
client_cert_fingerprint str | None

SHA-256 fingerprint of client certificate.

None

Returns:

Type Description
tuple[bool, str | None]

Tuple of (allow, error_response). Returns first rejection.

Source code in src/nauyaca/server/middleware.py
async def process_request(
    self,
    request_url: str,
    client_ip: str,
    client_cert_fingerprint: str | None = None,
) -> tuple[bool, str | None]:
    """Process request through all middleware.

    Args:
        request_url: The requested URL.
        client_ip: The client's IP address.
        client_cert_fingerprint: SHA-256 fingerprint of client certificate.

    Returns:
        Tuple of (allow, error_response). Returns first rejection.
    """
    for middleware in self.middlewares:
        allow, response = await middleware.process_request(
            request_url, client_ip, client_cert_fingerprint
        )
        if not allow:
            return False, response

    return True, None

Middleware are executed in order:

  1. CertificateAuth - Check client certificates first
  2. AccessControl - Then check IP-based access
  3. RateLimiter - Finally check rate limits

The first middleware that rejects a request stops the chain.

Handlers

Handlers generate responses for requests. Nauyaca provides built-in handlers for common use cases.

RequestHandler Base Class

RequestHandler

Bases: ABC

Abstract base class for request handlers.

All request handlers should inherit from this class and implement the handle() method.

handle abstractmethod

handle(request: GeminiRequest) -> GeminiResponse

Handle a Gemini request and return a response.

Parameters:

Name Type Description Default
request GeminiRequest

The incoming request to handle.

required

Returns:

Type Description
GeminiResponse

A GeminiResponse object.

Source code in src/nauyaca/server/handler.py
@abstractmethod
def handle(self, request: GeminiRequest) -> GeminiResponse:
    """Handle a Gemini request and return a response.

    Args:
        request: The incoming request to handle.

    Returns:
        A GeminiResponse object.
    """
    pass

StaticFileHandler

StaticFileHandler

StaticFileHandler(
    document_root: Path | str,
    default_indices: list[str] | None = None,
    enable_directory_listing: bool = False,
    max_file_size: int | None = None,
)

Bases: RequestHandler

Handler for serving static files from a document root.

This handler serves files from a specified directory with path traversal protection and automatic MIME type detection.

Attributes:

Name Type Description
document_root

Path to the directory containing files to serve.

default_indices

List of index filenames to try for directory requests.

max_file_size

Maximum file size to serve (in bytes).

Examples:

>>> handler = StaticFileHandler(Path("/var/gemini/capsule"))
>>> request = GeminiRequest.from_line("gemini://example.com/file.gmi")
>>> response = handler.handle(request)

Initialize the static file handler.

Parameters:

Name Type Description Default
document_root Path | str

Path to the directory containing files to serve.

required
default_indices list[str] | None

List of index filenames to try for directory requests (default: ["index.gmi", "index.gemini"]).

None
enable_directory_listing bool

If True, generate directory listings for directories without an index file (default: False).

False
max_file_size int | None

Maximum file size to serve in bytes (default: 100 MiB per Gemini best practices).

None
Source code in src/nauyaca/server/handler.py
def __init__(
    self,
    document_root: Path | str,
    default_indices: list[str] | None = None,
    enable_directory_listing: bool = False,
    max_file_size: int | None = None,
) -> None:
    """Initialize the static file handler.

    Args:
        document_root: Path to the directory containing files to serve.
        default_indices: List of index filenames to try for directory requests
            (default: ["index.gmi", "index.gemini"]).
        enable_directory_listing: If True, generate directory listings for
            directories without an index file (default: False).
        max_file_size: Maximum file size to serve in bytes
            (default: 100 MiB per Gemini best practices).
    """
    self.document_root = Path(document_root).resolve()
    self.default_indices = default_indices or ["index.gmi", "index.gemini"]
    self.enable_directory_listing = enable_directory_listing
    self.max_file_size = max_file_size or DEFAULT_MAX_FILE_SIZE

    if not self.document_root.exists():
        raise ValueError(f"Document root does not exist: {self.document_root}")
    if not self.document_root.is_dir():
        raise ValueError(f"Document root is not a directory: {self.document_root}")

handle

handle(request: GeminiRequest) -> GeminiResponse

Handle a request for a static file.

Parameters:

Name Type Description Default
request GeminiRequest

The incoming request.

required

Returns:

Type Description
GeminiResponse

A GeminiResponse with the file contents or an error.

Source code in src/nauyaca/server/handler.py
def handle(self, request: GeminiRequest) -> GeminiResponse:
    """Handle a request for a static file.

    Args:
        request: The incoming request.

    Returns:
        A GeminiResponse with the file contents or an error.
    """
    # Get the requested path (remove leading slash)
    requested_path = request.path.lstrip("/")

    # Construct the full file path
    file_path = (self.document_root / requested_path).resolve()

    # Path traversal protection: ensure the resolved path is within document root
    if not self._is_safe_path(file_path):
        return GeminiResponse(status=StatusCode.NOT_FOUND.value, meta="Not found")

    # If path is a directory, try to serve an index file or generate listing
    if file_path.is_dir():
        # Try each index filename in order (per Gemini best practices)
        index_found = False
        for index_name in self.default_indices:
            index_path = file_path / index_name
            if index_path.exists() and index_path.is_file():
                file_path = index_path
                index_found = True
                break

        if not index_found:
            if self.enable_directory_listing:
                # Generate directory listing
                try:
                    listing = generate_directory_listing(file_path, request.path)
                    return GeminiResponse(
                        status=StatusCode.SUCCESS.value,
                        meta=MIME_TYPE_GEMTEXT,
                        body=listing,
                    )
                except Exception as e:
                    return GeminiResponse(
                        status=StatusCode.TEMPORARY_FAILURE.value,
                        meta=f"Error generating directory listing: {str(e)}",
                    )
            else:
                # No index and directory listing disabled
                return GeminiResponse(
                    status=StatusCode.NOT_FOUND.value, meta="Not found"
                )

    # Check if file exists
    if not file_path.exists() or not file_path.is_file():
        return GeminiResponse(status=StatusCode.NOT_FOUND.value, meta="Not found")

    # Check file size (per Gemini best practices, avoid files >100 MiB)
    file_size = file_path.stat().st_size
    if file_size > self.max_file_size:
        return GeminiResponse(
            status=StatusCode.PERMANENT_FAILURE.value,
            meta="File too large - use alternative protocol",
        )

    try:
        # Read file contents
        content = file_path.read_text(encoding="utf-8")

        # Determine MIME type
        mime_type = self._get_mime_type(file_path)

        return GeminiResponse(
            status=StatusCode.SUCCESS.value, meta=mime_type, body=content
        )

    except UnicodeDecodeError:
        # File is not valid UTF-8
        return GeminiResponse(
            status=StatusCode.TEMPORARY_FAILURE.value,
            meta="File encoding error (not UTF-8)",
        )
    except PermissionError:
        return GeminiResponse(
            status=StatusCode.TEMPORARY_FAILURE.value,
            meta="Permission denied",
        )
    except Exception as e:
        return GeminiResponse(
            status=StatusCode.TEMPORARY_FAILURE.value,
            meta=f"Server error: {str(e)}",
        )

Example usage:

from pathlib import Path
from nauyaca.server.handler import StaticFileHandler
from nauyaca.protocol.request import GeminiRequest

# Create handler
handler = StaticFileHandler(
    document_root=Path("/var/gemini/capsule"),
    default_indices=["index.gmi", "index.gemini"],
    enable_directory_listing=True,
    max_file_size=100 * 1024 * 1024  # 100 MiB
)

# Handle a request
request = GeminiRequest.from_line("gemini://example.com/page.gmi\r\n")
response = handler.handle(request)

print(f"Status: {response.status}")
print(f"Meta: {response.meta}")
print(f"Body: {response.body[:100]}...")

Default Handler

The start_server() function automatically configures a StaticFileHandler for the document root. You don't need to create one manually unless you need custom behavior.

ErrorHandler

ErrorHandler

ErrorHandler(status: StatusCode, message: str)

Bases: RequestHandler

Handler that returns error responses.

Useful for handling 404 Not Found and other error cases.

Examples:

>>> handler = ErrorHandler(StatusCode.NOT_FOUND, "Page not found")
>>> response = handler.handle(request)
>>> response.status
51

Initialize the error handler.

Parameters:

Name Type Description Default
status StatusCode

The error status code to return.

required
message str

The error message (becomes the meta field).

required
Source code in src/nauyaca/server/handler.py
def __init__(self, status: StatusCode, message: str):
    """Initialize the error handler.

    Args:
        status: The error status code to return.
        message: The error message (becomes the meta field).
    """
    self.status = status
    self.message = message

handle

handle(request: GeminiRequest) -> GeminiResponse

Return an error response.

Parameters:

Name Type Description Default
request GeminiRequest

The incoming request (ignored).

required

Returns:

Type Description
GeminiResponse

A GeminiResponse with the configured error.

Source code in src/nauyaca/server/handler.py
def handle(self, request: GeminiRequest) -> GeminiResponse:
    """Return an error response.

    Args:
        request: The incoming request (ignored).

    Returns:
        A GeminiResponse with the configured error.
    """
    return GeminiResponse(status=self.status.value, meta=self.message)

Example usage:

from nauyaca.server.handler import ErrorHandler
from nauyaca.protocol.status import StatusCode

# Create 404 handler
not_found_handler = ErrorHandler(
    status=StatusCode.NOT_FOUND,
    message="Page not found"
)

# Create maintenance handler
maintenance_handler = ErrorHandler(
    status=StatusCode.TEMPORARY_FAILURE,
    message="Server maintenance - try again later"
)

Custom Handlers

Create custom handlers by subclassing RequestHandler:

from nauyaca.server.handler import RequestHandler
from nauyaca.protocol.request import GeminiRequest
from nauyaca.protocol.response import GeminiResponse
from nauyaca.protocol.status import StatusCode

class EchoHandler(RequestHandler):
    """Handler that echoes the request URL."""

    def handle(self, request: GeminiRequest) -> GeminiResponse:
        body = f"# Echo\n\nYou requested: {request.url}\n"

        return GeminiResponse(
            status=StatusCode.SUCCESS.value,
            meta="text/gemini",
            body=body
        )

# Use with router
from nauyaca.server.router import Router

router = Router()
router.add_route("/echo", EchoHandler().handle)

See Also