Skip to content

Security API Reference

This page documents the security-related modules in Nauyaca, including certificate management, TOFU (Trust On First Use) validation, and TLS context creation.

Security Model

Nauyaca uses Trust On First Use (TOFU) for certificate validation instead of traditional Certificate Authority (CA) validation. This is the recommended approach for the Gemini protocol. See the Security Explanation for more details.

Overview

The security modules provide:

  • Certificate Management (nauyaca.security.certificates) - Generate, load, validate, and fingerprint TLS certificates
  • TOFU Database (nauyaca.security.tofu) - Store and verify certificate fingerprints for known hosts
  • TLS Context Creation (nauyaca.security.tls) - Create SSL contexts for client and server connections

Certificate Management

certificates

Certificate generation and management utilities.

This module provides utilities for generating, loading, and validating TLS certificates for use with Gemini protocol servers and clients.

generate_self_signed_cert

generate_self_signed_cert(
    hostname: str,
    key_size: int = 2048,
    valid_days: int = 365,
) -> tuple[bytes, bytes]

Generate a self-signed TLS certificate.

Parameters:

Name Type Description Default
hostname str

Hostname for the certificate (CN and SAN).

required
key_size int

RSA key size in bits (default: 2048).

2048
valid_days int

Certificate validity period in days (default: 365).

365

Returns:

Type Description
tuple[bytes, bytes]

Tuple of (certificate_pem, private_key_pem) as bytes.

Example

cert_pem, key_pem = generate_self_signed_cert("localhost") Path("cert.pem").write_bytes(cert_pem) Path("key.pem").write_bytes(key_pem)

Source code in src/nauyaca/security/certificates.py
def generate_self_signed_cert(
    hostname: str,
    key_size: int = 2048,
    valid_days: int = 365,
) -> tuple[bytes, bytes]:
    """Generate a self-signed TLS certificate.

    Args:
        hostname: Hostname for the certificate (CN and SAN).
        key_size: RSA key size in bits (default: 2048).
        valid_days: Certificate validity period in days (default: 365).

    Returns:
        Tuple of (certificate_pem, private_key_pem) as bytes.

    Example:
        >>> cert_pem, key_pem = generate_self_signed_cert("localhost")
        >>> Path("cert.pem").write_bytes(cert_pem)
        >>> Path("key.pem").write_bytes(key_pem)
    """
    # Generate private key
    private_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=key_size,
    )

    # Create certificate subject
    subject = issuer = x509.Name(
        [
            x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, ""),
            x509.NameAttribute(NameOID.LOCALITY_NAME, ""),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Nauyaca Gemini Server"),
            x509.NameAttribute(NameOID.COMMON_NAME, hostname),
        ]
    )

    # Build certificate
    cert = (
        x509.CertificateBuilder()
        .subject_name(subject)
        .issuer_name(issuer)
        .public_key(private_key.public_key())
        .serial_number(x509.random_serial_number())
        .not_valid_before(datetime.datetime.now(datetime.timezone.utc))
        .not_valid_after(
            datetime.datetime.now(datetime.timezone.utc)
            + datetime.timedelta(days=valid_days)
        )
        .add_extension(
            x509.SubjectAlternativeName([x509.DNSName(hostname)]),
            critical=False,
        )
        .sign(private_key, hashes.SHA256())
    )

    # Serialize to PEM format
    cert_pem = cert.public_bytes(serialization.Encoding.PEM)
    key_pem = private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.TraditionalOpenSSL,
        encryption_algorithm=serialization.NoEncryption(),
    )

    return cert_pem, key_pem

get_certificate_fingerprint

get_certificate_fingerprint(
    cert: Certificate, algorithm: str = "sha256"
) -> str

Calculate the fingerprint of a certificate.

Parameters:

Name Type Description Default
cert Certificate

The certificate to fingerprint.

required
algorithm str

Hash algorithm to use (default: sha256).

'sha256'

Returns:

Type Description
str

Fingerprint string in format "algorithm:hexdigest".

Example

cert = load_certificate(Path("cert.pem")) fingerprint = get_certificate_fingerprint(cert) print(fingerprint) 'sha256:a1b2c3d4e5f6...'

Source code in src/nauyaca/security/certificates.py
def get_certificate_fingerprint(cert: x509.Certificate, algorithm: str = "sha256") -> str:
    """Calculate the fingerprint of a certificate.

    Args:
        cert: The certificate to fingerprint.
        algorithm: Hash algorithm to use (default: sha256).

    Returns:
        Fingerprint string in format "algorithm:hexdigest".

    Example:
        >>> cert = load_certificate(Path("cert.pem"))
        >>> fingerprint = get_certificate_fingerprint(cert)
        >>> print(fingerprint)
        'sha256:a1b2c3d4e5f6...'
    """
    cert_der = cert.public_bytes(serialization.Encoding.DER)

    if algorithm == "sha256":
        digest = hashlib.sha256(cert_der).hexdigest()
    elif algorithm == "sha1":
        digest = hashlib.sha1(cert_der).hexdigest()
    else:
        raise ValueError(f"Unsupported algorithm: {algorithm}")

    return f"{algorithm}:{digest}"

get_certificate_fingerprint_from_path

get_certificate_fingerprint_from_path(
    cert_path: Path, algorithm: str = "sha256"
) -> str

Calculate the fingerprint of a certificate file.

Parameters:

Name Type Description Default
cert_path Path

Path to the certificate file.

required
algorithm str

Hash algorithm to use (default: sha256).

'sha256'

Returns:

Type Description
str

Hex-encoded fingerprint string.

Source code in src/nauyaca/security/certificates.py
def get_certificate_fingerprint_from_path(
    cert_path: Path, algorithm: str = "sha256"
) -> str:
    """Calculate the fingerprint of a certificate file.

    Args:
        cert_path: Path to the certificate file.
        algorithm: Hash algorithm to use (default: sha256).

    Returns:
        Hex-encoded fingerprint string.
    """
    cert = load_certificate(cert_path)
    return get_certificate_fingerprint(cert, algorithm)

get_certificate_info

get_certificate_info(cert: Certificate) -> dict[str, str]

Extract human-readable information from a certificate.

Parameters:

Name Type Description Default
cert Certificate

The certificate to inspect.

required

Returns:

Type Description
dict[str, str]

Dictionary containing certificate information.

Source code in src/nauyaca/security/certificates.py
def get_certificate_info(cert: x509.Certificate) -> dict[str, str]:
    """Extract human-readable information from a certificate.

    Args:
        cert: The certificate to inspect.

    Returns:
        Dictionary containing certificate information.
    """
    # Get full fingerprints with algorithm prefix
    fp_sha256 = get_certificate_fingerprint(cert, "sha256")
    fp_sha1 = get_certificate_fingerprint(cert, "sha1")

    info = {
        "subject": cert.subject.rfc4514_string(),
        "issuer": cert.issuer.rfc4514_string(),
        "serial_number": str(cert.serial_number),
        "not_before": cert.not_valid_before_utc.isoformat(),
        "not_after": cert.not_valid_after_utc.isoformat(),
        "fingerprint_sha256": fp_sha256,
        "fingerprint_sha1": fp_sha1,
        "expired": str(is_certificate_expired(cert)),
    }

    # Extract SAN if present
    try:
        san_extension = cert.extensions.get_extension_for_oid(
            x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME
        )
        san = cast(x509.SubjectAlternativeName, san_extension.value)
        san_names = san.get_values_for_type(x509.DNSName)
        info["san"] = ", ".join(san_names)
    except x509.ExtensionNotFound:
        info["san"] = ""

    return info

is_certificate_expired

is_certificate_expired(cert: Certificate) -> bool

Check if a certificate has expired.

Parameters:

Name Type Description Default
cert Certificate

The certificate to check.

required

Returns:

Type Description
bool

True if the certificate has expired, False otherwise.

Source code in src/nauyaca/security/certificates.py
def is_certificate_expired(cert: x509.Certificate) -> bool:
    """Check if a certificate has expired.

    Args:
        cert: The certificate to check.

    Returns:
        True if the certificate has expired, False otherwise.
    """
    now = datetime.datetime.now(datetime.timezone.utc)
    return now > cert.not_valid_after_utc

is_certificate_valid_for_hostname

is_certificate_valid_for_hostname(
    cert: Certificate, hostname: str
) -> bool

Check if a certificate is valid for a given hostname.

Parameters:

Name Type Description Default
cert Certificate

The certificate to check.

required
hostname str

The hostname to validate against.

required

Returns:

Type Description
bool

True if the certificate is valid for the hostname, False otherwise.

Source code in src/nauyaca/security/certificates.py
def is_certificate_valid_for_hostname(cert: x509.Certificate, hostname: str) -> bool:
    """Check if a certificate is valid for a given hostname.

    Args:
        cert: The certificate to check.
        hostname: The hostname to validate against.

    Returns:
        True if the certificate is valid for the hostname, False otherwise.
    """
    # Check CN (Common Name)
    try:
        cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
        if cn == hostname:
            return True
    except (IndexError, AttributeError):
        pass

    # Check SAN (Subject Alternative Name)
    try:
        san_extension = cert.extensions.get_extension_for_oid(
            x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME
        )
        san = cast(x509.SubjectAlternativeName, san_extension.value)
        san_names = san.get_values_for_type(x509.DNSName)
        if hostname in san_names:
            return True
    except x509.ExtensionNotFound:
        pass

    return False

load_certificate

load_certificate(cert_path: Path) -> x509.Certificate

Load a certificate from a PEM file.

Parameters:

Name Type Description Default
cert_path Path

Path to the certificate file.

required

Returns:

Type Description
Certificate

The loaded certificate object.

Raises:

Type Description
FileNotFoundError

If certificate file doesn't exist.

ValueError

If certificate file is invalid.

Source code in src/nauyaca/security/certificates.py
def load_certificate(cert_path: Path) -> x509.Certificate:
    """Load a certificate from a PEM file.

    Args:
        cert_path: Path to the certificate file.

    Returns:
        The loaded certificate object.

    Raises:
        FileNotFoundError: If certificate file doesn't exist.
        ValueError: If certificate file is invalid.
    """
    if not cert_path.exists():
        raise FileNotFoundError(f"Certificate file not found: {cert_path}")

    try:
        cert_data = cert_path.read_bytes()
        return x509.load_pem_x509_certificate(cert_data)
    except Exception as e:
        raise ValueError(f"Invalid certificate file: {e}") from e

validate_certificate_file

validate_certificate_file(
    cert_path: Path,
) -> tuple[bool, str]

Validate a certificate file.

Parameters:

Name Type Description Default
cert_path Path

Path to the certificate file.

required

Returns:

Type Description
tuple[bool, str]

Tuple of (is_valid, error_message). error_message is empty if valid.

Source code in src/nauyaca/security/certificates.py
def validate_certificate_file(cert_path: Path) -> tuple[bool, str]:
    """Validate a certificate file.

    Args:
        cert_path: Path to the certificate file.

    Returns:
        Tuple of (is_valid, error_message). error_message is empty if valid.
    """
    try:
        cert = load_certificate(cert_path)

        if is_certificate_expired(cert):
            return False, "Certificate has expired"

        return True, ""
    except FileNotFoundError:
        return False, "Certificate file not found"
    except ValueError as e:
        return False, str(e)

Common Certificate Operations

Generating a Self-Signed Certificate

from pathlib import Path
from nauyaca.security.certificates import generate_self_signed_cert

# Generate certificate for localhost
cert_pem, key_pem = generate_self_signed_cert("localhost")

# Save to files
Path("cert.pem").write_bytes(cert_pem)
Path("key.pem").write_bytes(key_pem)

Loading and Fingerprinting Certificates

from pathlib import Path
from nauyaca.security.certificates import (
    load_certificate,
    get_certificate_fingerprint,
    get_certificate_info
)

# Load certificate
cert = load_certificate(Path("cert.pem"))

# Get fingerprint
fingerprint = get_certificate_fingerprint(cert)
print(f"Certificate fingerprint: {fingerprint}")

# Get detailed information
info = get_certificate_info(cert)
for key, value in info.items():
    print(f"{key}: {value}")

Validating Certificates

from pathlib import Path
from nauyaca.security.certificates import (
    load_certificate,
    is_certificate_expired,
    is_certificate_valid_for_hostname,
    validate_certificate_file
)

# Check if certificate is valid
is_valid, error_msg = validate_certificate_file(Path("cert.pem"))
if not is_valid:
    print(f"Certificate invalid: {error_msg}")

# Load and perform detailed checks
cert = load_certificate(Path("cert.pem"))

# Check expiration
if is_certificate_expired(cert):
    print("Certificate has expired!")

# Check hostname validity
if is_certificate_valid_for_hostname(cert, "example.com"):
    print("Certificate is valid for example.com")

TOFU Database

TOFUDatabase

TOFUDatabase(db_path: Path | None = None)

SQLite-backed TOFU certificate database.

This class manages a database of known host certificates and provides methods for trusting, verifying, and revoking certificates.

Initialize the TOFU database.

Parameters:

Name Type Description Default
db_path Path | None

Path to the SQLite database file. If None, uses ~/.nauyaca/tofu.db (creates directory if needed).

None
Source code in src/nauyaca/security/tofu.py
def __init__(self, db_path: Path | None = None):
    """Initialize the TOFU database.

    Args:
        db_path: Path to the SQLite database file. If None, uses
                ~/.nauyaca/tofu.db (creates directory if needed).
    """
    if db_path is None:
        # Use default location in user's home directory
        home = Path.home()
        nauyaca_dir = home / ".nauyaca"
        nauyaca_dir.mkdir(parents=True, exist_ok=True)
        db_path = nauyaca_dir / "tofu.db"

    self.db_path = db_path
    self._initialize_db()

clear

clear() -> int

Clear all entries from the TOFU database.

Returns:

Type Description
int

Number of entries removed.

Source code in src/nauyaca/security/tofu.py
def clear(self) -> int:
    """Clear all entries from the TOFU database.

    Returns:
        Number of entries removed.
    """
    with self._connection() as conn:
        cursor = conn.cursor()

        cursor.execute("DELETE FROM known_hosts")
        conn.commit()

        return cursor.rowcount

count_by_hostname

count_by_hostname(hostname: str) -> int

Count all entries for a hostname across all ports.

Parameters:

Name Type Description Default
hostname str

The hostname to count entries for.

required

Returns:

Type Description
int

Number of entries for this hostname.

Source code in src/nauyaca/security/tofu.py
def count_by_hostname(self, hostname: str) -> int:
    """Count all entries for a hostname across all ports.

    Args:
        hostname: The hostname to count entries for.

    Returns:
        Number of entries for this hostname.
    """
    with self._connection() as conn:
        cursor = conn.cursor()

        cursor.execute(
            "SELECT COUNT(*) FROM known_hosts WHERE hostname = ?",
            (hostname,),
        )
        row = cursor.fetchone()
        return row[0] if row else 0

export_toml

export_toml(file_path: Path) -> int

Export the TOFU database to a TOML file.

Parameters:

Name Type Description Default
file_path Path

Path to the output TOML file.

required

Returns:

Type Description
int

Number of hosts exported.

Raises:

Type Description
IOError

If the file cannot be written.

Source code in src/nauyaca/security/tofu.py
def export_toml(self, file_path: Path) -> int:
    """Export the TOFU database to a TOML file.

    Args:
        file_path: Path to the output TOML file.

    Returns:
        Number of hosts exported.

    Raises:
        IOError: If the file cannot be written.
    """
    hosts = self.list_hosts()

    # Build TOML structure
    data: dict[str, Any] = {
        "_metadata": {
            "exported_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
            "version": "1.0",
        },
        "hosts": {},
    }

    for host in hosts:
        key = f"{host['hostname']}:{host['port']}"
        data["hosts"][key] = {
            "hostname": host["hostname"],
            "port": int(host["port"]),
            "fingerprint": host["fingerprint"],
            "first_seen": host["first_seen"],
            "last_seen": host["last_seen"],
        }

    # Write TOML file
    with open(file_path, "wb") as f:
        tomli_w.dump(data, f)

    return len(hosts)

get_host_info

get_host_info(
    hostname: str, port: int
) -> dict[str, str] | None

Get information about a specific host.

Parameters:

Name Type Description Default
hostname str

The hostname to look up.

required
port int

The port number.

required

Returns:

Type Description
dict[str, str] | None

Dictionary containing host information, or None if not found.

Source code in src/nauyaca/security/tofu.py
def get_host_info(self, hostname: str, port: int) -> dict[str, str] | None:
    """Get information about a specific host.

    Args:
        hostname: The hostname to look up.
        port: The port number.

    Returns:
        Dictionary containing host information, or None if not found.
    """
    with self._connection() as conn:
        cursor = conn.cursor()

        cursor.execute(
            """
            SELECT hostname, port, fingerprint, first_seen, last_seen
            FROM known_hosts
            WHERE hostname = ? AND port = ?
            """,
            (hostname, port),
        )

        row = cursor.fetchone()
        if row is None:
            return None

        return dict(row)

import_toml

import_toml(
    file_path: Path,
    merge: bool = True,
    on_conflict: Callable[[str, int, str, str], bool]
    | None = None,
) -> tuple[int, int, int]

Import hosts from a TOML file into the TOFU database.

Parameters:

Name Type Description Default
file_path Path

Path to the input TOML file.

required
merge bool

If True, merge with existing entries. If False, replace all.

True
on_conflict Callable[[str, int, str, str], bool] | None

Callback for resolving fingerprint conflicts. Called with (hostname, port, old_fingerprint, new_fingerprint). Should return True to update, False to skip. If None, conflicts are skipped.

None

Returns:

Type Description
tuple[int, int, int]

Tuple of (added_count, updated_count, skipped_count).

Raises:

Type Description
FileNotFoundError

If the TOML file doesn't exist.

ValueError

If the TOML structure is invalid.

Source code in src/nauyaca/security/tofu.py
def import_toml(
    self,
    file_path: Path,
    merge: bool = True,
    on_conflict: Callable[[str, int, str, str], bool] | None = None,
) -> tuple[int, int, int]:
    """Import hosts from a TOML file into the TOFU database.

    Args:
        file_path: Path to the input TOML file.
        merge: If True, merge with existing entries. If False, replace all.
        on_conflict: Callback for resolving fingerprint conflicts.
            Called with (hostname, port, old_fingerprint, new_fingerprint).
            Should return True to update, False to skip.
            If None, conflicts are skipped.

    Returns:
        Tuple of (added_count, updated_count, skipped_count).

    Raises:
        FileNotFoundError: If the TOML file doesn't exist.
        ValueError: If the TOML structure is invalid.
    """
    if not file_path.exists():
        raise FileNotFoundError(f"TOML file not found: {file_path}")

    # Load TOML file
    with open(file_path, "rb") as f:
        data = tomllib.load(f)

    # Validate structure
    if "hosts" not in data:
        raise ValueError("Invalid TOML: missing 'hosts' section")

    if not isinstance(data["hosts"], dict):
        raise ValueError("Invalid TOML: 'hosts' must be a table")

    # Clear database if not merging
    if not merge:
        self.clear()

    added_count = 0
    updated_count = 0
    skipped_count = 0

    with self._connection() as conn:
        cursor = conn.cursor()

        for key, host_data in data["hosts"].items():
            # Validate required fields
            required_fields = [
                "hostname",
                "port",
                "fingerprint",
                "first_seen",
                "last_seen",
            ]
            for field in required_fields:
                if field not in host_data:
                    raise ValueError(
                        f"Invalid TOML: host '{key}' missing required field '{field}'"
                    )

            hostname = host_data["hostname"]
            port = host_data["port"]
            fingerprint = host_data["fingerprint"]
            first_seen = host_data["first_seen"]

            # Validate port
            if not isinstance(port, int) or not (1 <= port <= 65535):
                raise ValueError(
                    f"Invalid TOML: host '{key}' has invalid port: {port}"
                )

            # Validate fingerprint format
            if not self._validate_fingerprint(fingerprint):
                raise ValueError(
                    f"Invalid TOML: host '{key}' "
                    f"has invalid fingerprint format: {fingerprint}"
                )

            # Check if host already exists
            existing = self.get_host_info(hostname, port)

            if existing is None:
                # New host - add it
                now = datetime.datetime.now(datetime.timezone.utc).isoformat()
                cursor.execute(
                    """
                    INSERT INTO known_hosts
                    (hostname, port, fingerprint, first_seen, last_seen)
                    VALUES (?, ?, ?, ?, ?)
                    """,
                    (hostname, port, fingerprint, first_seen, now),
                )
                added_count += 1
            elif existing["fingerprint"] == fingerprint:
                # Same fingerprint - skip
                skipped_count += 1
            else:
                # Fingerprint mismatch - check if we should update
                should_update = False
                if on_conflict:
                    should_update = on_conflict(
                        hostname, port, existing["fingerprint"], fingerprint
                    )

                if should_update:
                    # Update with new fingerprint
                    # Preserve first_seen, update last_seen
                    now = datetime.datetime.now(datetime.timezone.utc).isoformat()
                    cursor.execute(
                        """
                        UPDATE known_hosts
                        SET fingerprint = ?, last_seen = ?
                        WHERE hostname = ? AND port = ?
                        """,
                        (fingerprint, now, hostname, port),
                    )
                    updated_count += 1
                else:
                    skipped_count += 1

        conn.commit()
        return (added_count, updated_count, skipped_count)

list_hosts

list_hosts() -> list[dict[str, str]]

List all known hosts in the database.

Returns:

Type Description
list[dict[str, str]]

List of dictionaries containing host information.

Source code in src/nauyaca/security/tofu.py
def list_hosts(self) -> list[dict[str, str]]:
    """List all known hosts in the database.

    Returns:
        List of dictionaries containing host information.
    """
    with self._connection() as conn:
        cursor = conn.cursor()

        cursor.execute(
            """
            SELECT hostname, port, fingerprint, first_seen, last_seen
            FROM known_hosts
            ORDER BY last_seen DESC
            """
        )

        rows = cursor.fetchall()
        return [dict(row) for row in rows]

revoke

revoke(hostname: str, port: int) -> bool

Remove a host from the TOFU database.

Parameters:

Name Type Description Default
hostname str

The hostname to revoke.

required
port int

The port number.

required

Returns:

Type Description
bool

True if the host was removed, False if it wasn't in the database.

Source code in src/nauyaca/security/tofu.py
def revoke(self, hostname: str, port: int) -> bool:
    """Remove a host from the TOFU database.

    Args:
        hostname: The hostname to revoke.
        port: The port number.

    Returns:
        True if the host was removed, False if it wasn't in the database.
    """
    with self._connection() as conn:
        cursor = conn.cursor()

        cursor.execute(
            "DELETE FROM known_hosts WHERE hostname = ? AND port = ?",
            (hostname, port),
        )
        conn.commit()

        return cursor.rowcount > 0

revoke_by_hostname

revoke_by_hostname(hostname: str) -> int

Remove all entries for a hostname from the TOFU database.

This removes all port entries for the given hostname.

Parameters:

Name Type Description Default
hostname str

The hostname to revoke all entries for.

required

Returns:

Type Description
int

Number of entries removed.

Source code in src/nauyaca/security/tofu.py
def revoke_by_hostname(self, hostname: str) -> int:
    """Remove all entries for a hostname from the TOFU database.

    This removes all port entries for the given hostname.

    Args:
        hostname: The hostname to revoke all entries for.

    Returns:
        Number of entries removed.
    """
    with self._connection() as conn:
        cursor = conn.cursor()

        cursor.execute(
            "DELETE FROM known_hosts WHERE hostname = ?",
            (hostname,),
        )
        conn.commit()

        return cursor.rowcount

trust

trust(hostname: str, port: int, cert: Certificate) -> None

Trust a certificate for a host.

This stores the certificate fingerprint in the database. If a certificate already exists for this host, it will be replaced.

Parameters:

Name Type Description Default
hostname str

The hostname (e.g., "example.com").

required
port int

The port number.

required
cert Certificate

The certificate to trust.

required
Source code in src/nauyaca/security/tofu.py
def trust(self, hostname: str, port: int, cert: x509.Certificate) -> None:
    """Trust a certificate for a host.

    This stores the certificate fingerprint in the database. If a certificate
    already exists for this host, it will be replaced.

    Args:
        hostname: The hostname (e.g., "example.com").
        port: The port number.
        cert: The certificate to trust.
    """
    fingerprint = get_certificate_fingerprint(cert)
    now = datetime.datetime.now(datetime.timezone.utc).isoformat()

    with self._connection() as conn:
        cursor = conn.cursor()

        # Check if host already exists
        cursor.execute(
            "SELECT fingerprint FROM known_hosts WHERE hostname = ? AND port = ?",
            (hostname, port),
        )
        row = cursor.fetchone()

        if row is None:
            # First time seeing this host
            cursor.execute(
                """
                INSERT INTO known_hosts
                (hostname, port, fingerprint, first_seen, last_seen)
                VALUES (?, ?, ?, ?, ?)
                """,
                (hostname, port, fingerprint, now, now),
            )
        else:
            # Update existing entry
            cursor.execute(
                """
                UPDATE known_hosts
                SET fingerprint = ?, last_seen = ?
                WHERE hostname = ? AND port = ?
                """,
                (fingerprint, now, hostname, port),
            )

        conn.commit()

verify

verify(
    hostname: str, port: int, cert: Certificate
) -> tuple[bool, str]

Verify a certificate against the TOFU database.

Parameters:

Name Type Description Default
hostname str

The hostname to verify.

required
port int

The port number.

required
cert Certificate

The certificate to verify.

required

Returns:

Type Description
bool

Tuple of (is_valid, message):

str
  • (True, "") if certificate matches stored fingerprint
tuple[bool, str]
  • (True, "first_use") if this is first connection to host
tuple[bool, str]
  • (False, "changed") if certificate has changed
Source code in src/nauyaca/security/tofu.py
def verify(
    self, hostname: str, port: int, cert: x509.Certificate
) -> tuple[bool, str]:
    """Verify a certificate against the TOFU database.

    Args:
        hostname: The hostname to verify.
        port: The port number.
        cert: The certificate to verify.

    Returns:
        Tuple of (is_valid, message):
        - (True, "") if certificate matches stored fingerprint
        - (True, "first_use") if this is first connection to host
        - (False, "changed") if certificate has changed
    """
    fingerprint = get_certificate_fingerprint(cert)

    with self._connection() as conn:
        cursor = conn.cursor()

        cursor.execute(
            "SELECT fingerprint FROM known_hosts WHERE hostname = ? AND port = ?",
            (hostname, port),
        )
        row = cursor.fetchone()

        if row is None:
            # First time seeing this host
            return True, "first_use"

        stored_fingerprint = row["fingerprint"]

        if stored_fingerprint == fingerprint:
            # Certificate matches - update last_seen
            now = datetime.datetime.now(datetime.timezone.utc).isoformat()
            cursor.execute(
                "UPDATE known_hosts SET last_seen = ? "
                "WHERE hostname = ? AND port = ?",
                (now, hostname, port),
            )
            conn.commit()
            return True, ""

        # Certificate has changed
        return False, "changed"

CertificateChangedError

CertificateChangedError(
    hostname: str,
    port: int,
    old_fingerprint: str,
    new_fingerprint: str,
)

Bases: Exception

Exception raised when a certificate has changed unexpectedly.

This indicates a potential MITM attack or legitimate certificate renewal.

Initialize the exception.

Parameters:

Name Type Description Default
hostname str

The hostname where certificate changed.

required
port int

The port number.

required
old_fingerprint str

The previously stored fingerprint.

required
new_fingerprint str

The new certificate fingerprint.

required
Source code in src/nauyaca/security/tofu.py
def __init__(
    self,
    hostname: str,
    port: int,
    old_fingerprint: str,
    new_fingerprint: str,
):
    """Initialize the exception.

    Args:
        hostname: The hostname where certificate changed.
        port: The port number.
        old_fingerprint: The previously stored fingerprint.
        new_fingerprint: The new certificate fingerprint.
    """
    self.hostname = hostname
    self.port = port
    self.old_fingerprint = old_fingerprint
    self.new_fingerprint = new_fingerprint

    super().__init__(
        f"Certificate for {hostname}:{port} has changed!\n"
        f"Old fingerprint: {old_fingerprint}\n"
        f"New fingerprint: {new_fingerprint}\n"
        f"This could indicate a man-in-the-middle attack or a legitimate "
        f"certificate renewal. Verify the new certificate before continuing."
    )

Common TOFU Operations

Basic TOFU Validation

from pathlib import Path
from nauyaca.security.tofu import TOFUDatabase
from nauyaca.security.certificates import load_certificate

# Initialize TOFU database (uses ~/.nauyaca/tofu.db by default)
tofu = TOFUDatabase()

# Load certificate to verify
cert = load_certificate(Path("server.pem"))

# Verify certificate
is_valid, reason = tofu.verify("example.com", 1965, cert)

if reason == "first_use":
    print("First connection to this host - trusting certificate")
    tofu.trust("example.com", 1965, cert)
elif is_valid:
    print("Certificate verified successfully")
else:
    print(f"Certificate verification failed: {reason}")

Managing Known Hosts

from nauyaca.security.tofu import TOFUDatabase

tofu = TOFUDatabase()

# List all known hosts
hosts = tofu.list_hosts()
for host in hosts:
    print(f"{host['hostname']}:{host['port']} - {host['fingerprint']}")

# Get info about specific host
info = tofu.get_host_info("example.com", 1965)
if info:
    print(f"First seen: {info['first_seen']}")
    print(f"Last seen: {info['last_seen']}")

# Revoke trust for a host
if tofu.revoke("example.com", 1965):
    print("Host removed from database")

Exporting and Importing TOFU Data

from pathlib import Path
from nauyaca.security.tofu import TOFUDatabase

tofu = TOFUDatabase()

# Export to TOML file
count = tofu.export_toml(Path("tofu-backup.toml"))
print(f"Exported {count} hosts")

# Import from TOML file (merge with existing)
added, updated, skipped = tofu.import_toml(
    Path("tofu-backup.toml"),
    merge=True
)
print(f"Import: {added} added, {updated} updated, {skipped} skipped")

# Import with conflict resolution
def resolve_conflict(hostname, port, old_fp, new_fp):
    """Ask user whether to update the fingerprint."""
    print(f"\nConflict for {hostname}:{port}")
    print(f"Old: {old_fp}")
    print(f"New: {new_fp}")
    response = input("Update? [y/N]: ")
    return response.lower() == 'y'

added, updated, skipped = tofu.import_toml(
    Path("tofu-backup.toml"),
    merge=True,
    on_conflict=resolve_conflict
)

Custom TOFU Database Location

from pathlib import Path
from nauyaca.security.tofu import TOFUDatabase

# Use custom database path
custom_db = Path("/var/lib/myapp/tofu.db")
tofu = TOFUDatabase(db_path=custom_db)

TLS Context Creation

tls

TLS context creation for Gemini protocol.

This module provides functions for creating SSL/TLS contexts for both client and server connections, following Gemini protocol requirements.

create_client_context

create_client_context(
    verify_mode: VerifyMode = ssl.CERT_NONE,
    check_hostname: bool = False,
    certfile: str | None = None,
    keyfile: str | None = None,
) -> ssl.SSLContext

Create an SSL context for Gemini client connections.

The Gemini protocol requires TLS 1.2 or higher. This function creates an SSL context configured for client connections.

Parameters:

Name Type Description Default
verify_mode VerifyMode

SSL certificate verification mode. Default is CERT_NONE for testing/development. Use CERT_REQUIRED with proper TOFU validation for production.

CERT_NONE
check_hostname bool

Whether to check that the certificate hostname matches the server hostname. Default is False (for testing/development).

False
certfile str | None

Optional path to client certificate file (for client cert auth).

None
keyfile str | None

Optional path to client private key file (for client cert auth).

None

Returns:

Type Description
SSLContext

An SSL context configured for Gemini client connections.

Examples:

>>> # Testing mode - accept all certificates
>>> context = create_client_context()
>>> # Production mode with TOFU (implement custom verification)
>>> context = create_client_context(
...     verify_mode=ssl.CERT_REQUIRED,
...     check_hostname=True
... )
>>> # With client certificate authentication
>>> context = create_client_context(
...     certfile='client.pem',
...     keyfile='client-key.pem'
... )
Source code in src/nauyaca/security/tls.py
def create_client_context(
    verify_mode: ssl.VerifyMode = ssl.CERT_NONE,
    check_hostname: bool = False,
    certfile: str | None = None,
    keyfile: str | None = None,
) -> ssl.SSLContext:
    """Create an SSL context for Gemini client connections.

    The Gemini protocol requires TLS 1.2 or higher. This function creates
    an SSL context configured for client connections.

    Args:
        verify_mode: SSL certificate verification mode. Default is CERT_NONE
            for testing/development. Use CERT_REQUIRED with proper TOFU
            validation for production.
        check_hostname: Whether to check that the certificate hostname matches
            the server hostname. Default is False (for testing/development).
        certfile: Optional path to client certificate file (for client cert auth).
        keyfile: Optional path to client private key file (for client cert auth).

    Returns:
        An SSL context configured for Gemini client connections.

    Examples:
        >>> # Testing mode - accept all certificates
        >>> context = create_client_context()

        >>> # Production mode with TOFU (implement custom verification)
        >>> context = create_client_context(
        ...     verify_mode=ssl.CERT_REQUIRED,
        ...     check_hostname=True
        ... )

        >>> # With client certificate authentication
        >>> context = create_client_context(
        ...     certfile='client.pem',
        ...     keyfile='client-key.pem'
        ... )
    """
    # Create default SSL context
    context = ssl.create_default_context()

    # Set minimum TLS version (Gemini requires TLS 1.2+)
    context.minimum_version = ssl.TLSVersion.TLSv1_2

    # Configure certificate verification
    context.check_hostname = check_hostname
    context.verify_mode = verify_mode

    # Load client certificate if provided
    if certfile and keyfile:
        context.load_cert_chain(certfile, keyfile)

    return context

create_server_context

create_server_context(
    certfile: str,
    keyfile: str,
    request_client_cert: bool = False,
    client_ca_certs: list[str] | None = None,
) -> ssl.SSLContext

Create an SSL context for Gemini server connections.

Parameters:

Name Type Description Default
certfile str

Path to server certificate file.

required
keyfile str

Path to server private key file.

required
request_client_cert bool

Whether to request client certificates. When True, the server will ask clients to send a certificate. With OpenSSL 3.x, client certificates must be signed by a CA in client_ca_certs or the TLS handshake will fail silently. Enforcement should be done via CertificateAuth middleware. Default is False.

False
client_ca_certs list[str] | None

List of paths to CA certificates for verifying client certificates. For self-signed client certs, include each client's cert file here. Required when request_client_cert=True with OpenSSL 3.x. Default is None.

None

Returns:

Type Description
SSLContext

An SSL context configured for Gemini server connections.

Examples:

>>> # Basic server context
>>> context = create_server_context('cert.pem', 'key.pem')
>>> # Server requesting client certificates (for middleware auth)
>>> context = create_server_context(
...     'cert.pem',
...     'key.pem',
...     request_client_cert=True,
...     client_ca_certs=['trusted_client1.pem', 'trusted_client2.pem']
... )
Source code in src/nauyaca/security/tls.py
def create_server_context(
    certfile: str,
    keyfile: str,
    request_client_cert: bool = False,
    client_ca_certs: list[str] | None = None,
) -> ssl.SSLContext:
    """Create an SSL context for Gemini server connections.

    Args:
        certfile: Path to server certificate file.
        keyfile: Path to server private key file.
        request_client_cert: Whether to request client certificates.
            When True, the server will ask clients to send a certificate.
            With OpenSSL 3.x, client certificates must be signed by a CA
            in client_ca_certs or the TLS handshake will fail silently.
            Enforcement should be done via CertificateAuth middleware.
            Default is False.
        client_ca_certs: List of paths to CA certificates for verifying client
            certificates. For self-signed client certs, include each client's
            cert file here. Required when request_client_cert=True with
            OpenSSL 3.x. Default is None.

    Returns:
        An SSL context configured for Gemini server connections.

    Examples:
        >>> # Basic server context
        >>> context = create_server_context('cert.pem', 'key.pem')

        >>> # Server requesting client certificates (for middleware auth)
        >>> context = create_server_context(
        ...     'cert.pem',
        ...     'key.pem',
        ...     request_client_cert=True,
        ...     client_ca_certs=['trusted_client1.pem', 'trusted_client2.pem']
        ... )
    """
    # Create SSL context for server
    # NOTE: We use SSLContext directly instead of create_default_context because
    # create_default_context loads system CA certificates which we don't need.
    context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)

    # Set minimum TLS version (Gemini requires TLS 1.2+)
    context.minimum_version = ssl.TLSVersion.TLSv1_2

    # Load server certificate and key
    context.load_cert_chain(certfile, keyfile)

    # Configure client certificate handling
    # Use CERT_OPTIONAL to request certs without requiring them
    # The CertificateAuth middleware handles actual enforcement
    if request_client_cert:
        context.verify_mode = ssl.CERT_OPTIONAL

        # Load client CA certificates if provided
        # NOTE: With OpenSSL 3.x, CERT_OPTIONAL requires CA certs to be loaded,
        # otherwise self-signed client certificates cause silent TLS failures.
        # For self-signed client certs, load each cert as a trusted CA.
        if client_ca_certs:
            for ca_cert in client_ca_certs:
                context.load_verify_locations(ca_cert)
    else:
        context.verify_mode = ssl.CERT_NONE

    return context

Common TLS Operations

Creating Client Contexts

import ssl
from nauyaca.security.tls import create_client_context

# Testing/development - accept all certificates
context = create_client_context()

# Production - with certificate verification
# (Combine with TOFU for full validation)
context = create_client_context(
    verify_mode=ssl.CERT_REQUIRED,
    check_hostname=True
)

# With client certificate authentication
context = create_client_context(
    verify_mode=ssl.CERT_REQUIRED,
    check_hostname=True,
    certfile="client.pem",
    keyfile="client-key.pem"
)

Creating Server Contexts

from nauyaca.security.tls import create_server_context

# Basic server
context = create_server_context(
    certfile="server.pem",
    keyfile="server-key.pem"
)

# Server requesting client certificates
# (Used with CertificateAuth middleware)
context = create_server_context(
    certfile="server.pem",
    keyfile="server-key.pem",
    request_client_cert=True,
    client_ca_certs=["trusted_client1.pem", "trusted_client2.pem"]
)

Client Certificate Authentication

When using request_client_cert=True with OpenSSL 3.x, you must provide client_ca_certs or the TLS handshake will fail silently for self-signed client certificates. For self-signed client certs, include each client's certificate file in the client_ca_certs list.

Integration Examples

Custom Client with TOFU Validation

import asyncio
import ssl
from pathlib import Path
from nauyaca.security.tls import create_client_context
from nauyaca.security.tofu import TOFUDatabase, CertificateChangedError

async def fetch_with_tofu(url: str):
    """Fetch a Gemini URL with TOFU validation."""
    # Parse URL
    from urllib.parse import urlparse
    parsed = urlparse(url)
    hostname = parsed.hostname or "localhost"
    port = parsed.port or 1965

    # Initialize TOFU database
    tofu = TOFUDatabase()

    # Create SSL context
    ssl_context = create_client_context(
        verify_mode=ssl.CERT_NONE,  # We do our own TOFU validation
        check_hostname=False
    )

    # Connect and get certificate
    reader, writer = await asyncio.open_connection(
        hostname, port, ssl=ssl_context
    )

    # Get peer certificate
    ssl_object = writer.get_extra_info('ssl_object')
    cert_der = ssl_object.getpeercert(binary_form=True)

    # Parse certificate
    from cryptography import x509
    cert = x509.load_der_x509_certificate(cert_der)

    # Verify with TOFU
    is_valid, reason = tofu.verify(hostname, port, cert)

    if reason == "first_use":
        # Prompt user to trust
        from nauyaca.security.certificates import get_certificate_info
        info = get_certificate_info(cert)
        print("First connection to this host!")
        print(f"Fingerprint: {info['fingerprint_sha256']}")
        response = input("Trust this certificate? [y/N]: ")

        if response.lower() == 'y':
            tofu.trust(hostname, port, cert)
        else:
            writer.close()
            await writer.wait_closed()
            raise ValueError("Certificate not trusted")

    elif not is_valid:
        # Certificate changed - potential security issue
        writer.close()
        await writer.wait_closed()
        raise CertificateChangedError(
            hostname, port,
            tofu.get_host_info(hostname, port)['fingerprint'],
            cert.fingerprint(ssl.create_default_context().check_hostname)
        )

    # Send request
    writer.write(f"{url}\r\n".encode())
    await writer.drain()

    # Read response
    response = await reader.read()

    # Close connection
    writer.close()
    await writer.wait_closed()

    return response.decode('utf-8')

# Use the function
asyncio.run(fetch_with_tofu("gemini://example.com/"))

Programmatic Certificate Generation

from pathlib import Path
from nauyaca.security.certificates import (
    generate_self_signed_cert,
    get_certificate_fingerprint,
    load_certificate
)

def setup_server_certificate(hostname: str, cert_dir: Path):
    """Generate and save server certificate."""
    cert_dir.mkdir(parents=True, exist_ok=True)

    cert_path = cert_dir / "cert.pem"
    key_path = cert_dir / "key.pem"

    # Generate certificate
    print(f"Generating certificate for {hostname}...")
    cert_pem, key_pem = generate_self_signed_cert(
        hostname=hostname,
        key_size=2048,
        valid_days=365
    )

    # Save to files
    cert_path.write_bytes(cert_pem)
    key_path.write_bytes(key_pem)

    # Display fingerprint
    cert = load_certificate(cert_path)
    fingerprint = get_certificate_fingerprint(cert)

    print(f"Certificate saved to {cert_path}")
    print(f"Private key saved to {key_path}")
    print(f"Fingerprint: {fingerprint}")

    return cert_path, key_path

# Example usage
cert_path, key_path = setup_server_certificate(
    "localhost",
    Path("/etc/nauyaca/certs")
)

See Also