Skip to content

API Reference

Python API for programmatic usage.

Core modules

Configuration

slurmq.core.config.SlurmqConfig

Bases: BaseSettings

Main configuration for slurmq.

Supports loading from: 1. TOML config file 2. Environment variables (SLURMQ_* prefix) 3. Programmatic overrides

Priority (highest first): env vars > TOML file > defaults

Source code in src/slurmq/core/config.py
class SlurmqConfig(BaseSettings):
    """Main configuration for slurmq.

    Supports loading from:
    1. TOML config file
    2. Environment variables (SLURMQ_* prefix)
    3. Programmatic overrides

    Priority (highest first): env vars > TOML file > defaults
    """

    default_cluster: str = ""
    clusters: dict[str, ClusterConfig] = Field(default_factory=dict)
    monitoring: MonitoringConfig = Field(default_factory=MonitoringConfig)
    enforcement: EnforcementConfig = Field(default_factory=EnforcementConfig)
    email: EmailConfig = Field(default_factory=EmailConfig)
    display: DisplayConfig = Field(default_factory=DisplayConfig)
    cache: CacheConfig = Field(default_factory=CacheConfig)

    model_config = SettingsConfigDict(env_prefix="SLURMQ_", env_nested_delimiter="__")

    @classmethod
    def settings_customise_sources(
        cls,
        settings_cls: type[BaseSettings],
        init_settings: PydanticBaseSettingsSource,
        env_settings: PydanticBaseSettingsSource,
        dotenv_settings: PydanticBaseSettingsSource,
        file_secret_settings: PydanticBaseSettingsSource,
    ) -> tuple[PydanticBaseSettingsSource, ...]:
        """Customize settings sources priority.

        Priority (highest first):
        1. init_settings (programmatic overrides)
        2. env_settings (environment variables)
        3. TomlFileSettingsSource (config file)
        """
        return (init_settings, env_settings, TomlFileSettingsSource(settings_cls))

    @property
    def cluster_names(self) -> list[str]:
        """List of all configured cluster names."""
        return list(self.clusters.keys())

    def get_cluster(self, name: str | None = None) -> ClusterConfig:
        """Get cluster config by name, or default cluster if not specified.

        Args:
            name: Cluster name. If None, uses default_cluster.

        Returns:
            ClusterConfig for the specified cluster.

        Raises:
            ValueError: If no cluster specified and no default set,
                       or if the cluster name is not found.
        """
        key = name or self.default_cluster
        if not key:
            msg = "No cluster specified and no default_cluster set"
            raise ValueError(msg)
        if key not in self.clusters:
            msg = f"Unknown cluster: {key}"
            raise ValueError(msg)
        return self.clusters[key]

    def save(self, path: Path) -> None:
        """Save configuration to a TOML file.

        Args:
            path: Path to save the config file.
        """
        path.parent.mkdir(parents=True, exist_ok=True)
        data = self.model_dump(mode="json", exclude_defaults=False)
        with path.open("wb") as f:
            tomli_w.dump(data, f)

cluster_names property

List of all configured cluster names.

get_cluster(name=None)

Get cluster config by name, or default cluster if not specified.

Parameters:

Name Type Description Default
name str | None

Cluster name. If None, uses default_cluster.

None

Returns:

Type Description
ClusterConfig

ClusterConfig for the specified cluster.

Raises:

Type Description
ValueError

If no cluster specified and no default set, or if the cluster name is not found.

Source code in src/slurmq/core/config.py
def get_cluster(self, name: str | None = None) -> ClusterConfig:
    """Get cluster config by name, or default cluster if not specified.

    Args:
        name: Cluster name. If None, uses default_cluster.

    Returns:
        ClusterConfig for the specified cluster.

    Raises:
        ValueError: If no cluster specified and no default set,
                   or if the cluster name is not found.
    """
    key = name or self.default_cluster
    if not key:
        msg = "No cluster specified and no default_cluster set"
        raise ValueError(msg)
    if key not in self.clusters:
        msg = f"Unknown cluster: {key}"
        raise ValueError(msg)
    return self.clusters[key]

save(path)

Save configuration to a TOML file.

Parameters:

Name Type Description Default
path Path

Path to save the config file.

required
Source code in src/slurmq/core/config.py
def save(self, path: Path) -> None:
    """Save configuration to a TOML file.

    Args:
        path: Path to save the config file.
    """
    path.parent.mkdir(parents=True, exist_ok=True)
    data = self.model_dump(mode="json", exclude_defaults=False)
    with path.open("wb") as f:
        tomli_w.dump(data, f)

settings_customise_sources(settings_cls, init_settings, env_settings, dotenv_settings, file_secret_settings) classmethod

Customize settings sources priority.

Priority (highest first): 1. init_settings (programmatic overrides) 2. env_settings (environment variables) 3. TomlFileSettingsSource (config file)

Source code in src/slurmq/core/config.py
@classmethod
def settings_customise_sources(
    cls,
    settings_cls: type[BaseSettings],
    init_settings: PydanticBaseSettingsSource,
    env_settings: PydanticBaseSettingsSource,
    dotenv_settings: PydanticBaseSettingsSource,
    file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
    """Customize settings sources priority.

    Priority (highest first):
    1. init_settings (programmatic overrides)
    2. env_settings (environment variables)
    3. TomlFileSettingsSource (config file)
    """
    return (init_settings, env_settings, TomlFileSettingsSource(settings_cls))

options: show_source: false members: - clusters - default_cluster - monitoring - enforcement

Quota checking

slurmq.core.quota.QuotaChecker

Checks allocated GPU-hours against cluster quota configuration.

Source code in src/slurmq/core/quota.py
class QuotaChecker:
    """Checks allocated GPU-hours against cluster quota configuration."""

    def __init__(
        self, cluster: ClusterConfig, warning_threshold: float = 0.8, critical_threshold: float = 1.0
    ) -> None:
        """Initialize QuotaChecker.

        Args:
            cluster: Cluster configuration with quota settings
            warning_threshold: Usage fraction for warning status
            critical_threshold: Usage fraction for exceeded status

        """
        self.cluster = cluster
        self.warning_threshold = warning_threshold
        self.critical_threshold = critical_threshold

    def calculate_gpu_hours(self, records: list[JobRecord]) -> float:
        """Calculate total allocated GPU-hours from job records.

        Args:
            records: List of job records

        Returns:
            Total allocated GPU-hours

        """
        return sum(record.gpu_hours for record in records)

    def filter_by_window(self, records: list[JobRecord], window_days: int | None = None) -> list[JobRecord]:
        """Filter records to those within the rolling window.

        Args:
            records: List of job records
            window_days: Number of days in window (uses cluster config if None)

        Returns:
            Records with start_time within the window

        """
        days = window_days if window_days is not None else self.cluster.rolling_window_days
        cutoff = datetime.now(tz=UTC) - timedelta(days=days)
        return [record for record in records if record.start_time >= cutoff]

    def filter_by_qos(self, records: list[JobRecord], qos: str | None = None) -> list[JobRecord]:
        """Filter records by QoS.

        Args:
            records: List of job records
            qos: QoS to filter by (uses first from cluster config if None)

        Returns:
            Records matching the QoS

        """
        target_qos = qos if qos is not None else self.cluster.qos[0]
        return [record for record in records if record.qos == target_qos]

    def generate_report(self, user: str, records: list[JobRecord], qos: str | None = None) -> UsageReport:
        """Generate a usage report for a user.

        Args:
            user: Username
            records: Job records (will be filtered)
            qos: QoS to report on (uses first from cluster config if None)

        Returns:
            UsageReport with quota status

        """
        target_qos = qos if qos is not None else self.cluster.qos[0]

        # Filter to user's jobs in the rolling window for the target QoS
        user_records = [record for record in records if record.user == user]
        windowed = self.filter_by_window(user_records)
        qos_filtered = self.filter_by_qos(windowed, target_qos)

        used_hours = self.calculate_gpu_hours(qos_filtered)
        active = [record for record in qos_filtered if record.is_running]

        return UsageReport(
            user=user,
            qos=target_qos,
            used_gpu_hours=used_hours,
            quota_limit=self.cluster.quota_limit,
            rolling_window_days=self.cluster.rolling_window_days,
            active_jobs=active,
            warning_threshold=self.warning_threshold,
            critical_threshold=self.critical_threshold,
        )

    def forecast_quota(
        self, user: str, records: list[JobRecord], hours_ahead: list[int] | None = None, qos: str | None = None
    ) -> dict[int, float]:
        """Forecast quota availability at future times.

        As time passes, old jobs fall outside the rolling window,
        freeing up quota. This method calculates how much quota
        will be available at each future time point.

        Args:
            user: Username
            records: Job records
            hours_ahead: List of hours to forecast (default: [12, 24, 72, 168])
            qos: QoS to forecast for

        Returns:
            Dict mapping hours_ahead to available GPU-hours at that time

        """
        if hours_ahead is None:
            hours_ahead = [12, 24, 72, 168]

        target_qos = qos if qos is not None else self.cluster.qos[0]
        user_records = [record for record in records if record.user == user]
        qos_filtered = self.filter_by_qos(user_records, target_qos)

        forecast: dict[int, float] = {}
        window_days = self.cluster.rolling_window_days

        for hours in hours_ahead:
            # Calculate what the cutoff will be N hours from now
            future_cutoff = datetime.now(tz=UTC) + timedelta(hours=hours) - timedelta(days=window_days)

            # Sum GPU-hours for jobs that will still be in window at that time
            future_records = [record for record in qos_filtered if record.start_time >= future_cutoff]
            future_usage = self.calculate_gpu_hours(future_records)
            forecast[hours] = self.cluster.quota_limit - future_usage

        return forecast

__init__(cluster, warning_threshold=0.8, critical_threshold=1.0)

Initialize QuotaChecker.

Parameters:

Name Type Description Default
cluster ClusterConfig

Cluster configuration with quota settings

required
warning_threshold float

Usage fraction for warning status

0.8
critical_threshold float

Usage fraction for exceeded status

1.0
Source code in src/slurmq/core/quota.py
def __init__(
    self, cluster: ClusterConfig, warning_threshold: float = 0.8, critical_threshold: float = 1.0
) -> None:
    """Initialize QuotaChecker.

    Args:
        cluster: Cluster configuration with quota settings
        warning_threshold: Usage fraction for warning status
        critical_threshold: Usage fraction for exceeded status

    """
    self.cluster = cluster
    self.warning_threshold = warning_threshold
    self.critical_threshold = critical_threshold

calculate_gpu_hours(records)

Calculate total allocated GPU-hours from job records.

Parameters:

Name Type Description Default
records list[JobRecord]

List of job records

required

Returns:

Type Description
float

Total allocated GPU-hours

Source code in src/slurmq/core/quota.py
def calculate_gpu_hours(self, records: list[JobRecord]) -> float:
    """Calculate total allocated GPU-hours from job records.

    Args:
        records: List of job records

    Returns:
        Total allocated GPU-hours

    """
    return sum(record.gpu_hours for record in records)

filter_by_qos(records, qos=None)

Filter records by QoS.

Parameters:

Name Type Description Default
records list[JobRecord]

List of job records

required
qos str | None

QoS to filter by (uses first from cluster config if None)

None

Returns:

Type Description
list[JobRecord]

Records matching the QoS

Source code in src/slurmq/core/quota.py
def filter_by_qos(self, records: list[JobRecord], qos: str | None = None) -> list[JobRecord]:
    """Filter records by QoS.

    Args:
        records: List of job records
        qos: QoS to filter by (uses first from cluster config if None)

    Returns:
        Records matching the QoS

    """
    target_qos = qos if qos is not None else self.cluster.qos[0]
    return [record for record in records if record.qos == target_qos]

filter_by_window(records, window_days=None)

Filter records to those within the rolling window.

Parameters:

Name Type Description Default
records list[JobRecord]

List of job records

required
window_days int | None

Number of days in window (uses cluster config if None)

None

Returns:

Type Description
list[JobRecord]

Records with start_time within the window

Source code in src/slurmq/core/quota.py
def filter_by_window(self, records: list[JobRecord], window_days: int | None = None) -> list[JobRecord]:
    """Filter records to those within the rolling window.

    Args:
        records: List of job records
        window_days: Number of days in window (uses cluster config if None)

    Returns:
        Records with start_time within the window

    """
    days = window_days if window_days is not None else self.cluster.rolling_window_days
    cutoff = datetime.now(tz=UTC) - timedelta(days=days)
    return [record for record in records if record.start_time >= cutoff]

forecast_quota(user, records, hours_ahead=None, qos=None)

Forecast quota availability at future times.

As time passes, old jobs fall outside the rolling window, freeing up quota. This method calculates how much quota will be available at each future time point.

Parameters:

Name Type Description Default
user str

Username

required
records list[JobRecord]

Job records

required
hours_ahead list[int] | None

List of hours to forecast (default: [12, 24, 72, 168])

None
qos str | None

QoS to forecast for

None

Returns:

Type Description
dict[int, float]

Dict mapping hours_ahead to available GPU-hours at that time

Source code in src/slurmq/core/quota.py
def forecast_quota(
    self, user: str, records: list[JobRecord], hours_ahead: list[int] | None = None, qos: str | None = None
) -> dict[int, float]:
    """Forecast quota availability at future times.

    As time passes, old jobs fall outside the rolling window,
    freeing up quota. This method calculates how much quota
    will be available at each future time point.

    Args:
        user: Username
        records: Job records
        hours_ahead: List of hours to forecast (default: [12, 24, 72, 168])
        qos: QoS to forecast for

    Returns:
        Dict mapping hours_ahead to available GPU-hours at that time

    """
    if hours_ahead is None:
        hours_ahead = [12, 24, 72, 168]

    target_qos = qos if qos is not None else self.cluster.qos[0]
    user_records = [record for record in records if record.user == user]
    qos_filtered = self.filter_by_qos(user_records, target_qos)

    forecast: dict[int, float] = {}
    window_days = self.cluster.rolling_window_days

    for hours in hours_ahead:
        # Calculate what the cutoff will be N hours from now
        future_cutoff = datetime.now(tz=UTC) + timedelta(hours=hours) - timedelta(days=window_days)

        # Sum GPU-hours for jobs that will still be in window at that time
        future_records = [record for record in qos_filtered if record.start_time >= future_cutoff]
        future_usage = self.calculate_gpu_hours(future_records)
        forecast[hours] = self.cluster.quota_limit - future_usage

    return forecast

generate_report(user, records, qos=None)

Generate a usage report for a user.

Parameters:

Name Type Description Default
user str

Username

required
records list[JobRecord]

Job records (will be filtered)

required
qos str | None

QoS to report on (uses first from cluster config if None)

None

Returns:

Type Description
UsageReport

UsageReport with quota status

Source code in src/slurmq/core/quota.py
def generate_report(self, user: str, records: list[JobRecord], qos: str | None = None) -> UsageReport:
    """Generate a usage report for a user.

    Args:
        user: Username
        records: Job records (will be filtered)
        qos: QoS to report on (uses first from cluster config if None)

    Returns:
        UsageReport with quota status

    """
    target_qos = qos if qos is not None else self.cluster.qos[0]

    # Filter to user's jobs in the rolling window for the target QoS
    user_records = [record for record in records if record.user == user]
    windowed = self.filter_by_window(user_records)
    qos_filtered = self.filter_by_qos(windowed, target_qos)

    used_hours = self.calculate_gpu_hours(qos_filtered)
    active = [record for record in qos_filtered if record.is_running]

    return UsageReport(
        user=user,
        qos=target_qos,
        used_gpu_hours=used_hours,
        quota_limit=self.cluster.quota_limit,
        rolling_window_days=self.cluster.rolling_window_days,
        active_jobs=active,
        warning_threshold=self.warning_threshold,
        critical_threshold=self.critical_threshold,
    )

options: show_source: false

slurmq.core.quota.JobRecord dataclass

A single Slurm job record.

Source code in src/slurmq/core/models.py
@dataclass
class JobRecord:
    """A single Slurm job record."""

    job_id: int
    name: str
    user: str
    qos: str
    n_gpus: int
    elapsed_seconds: int
    start_time: datetime
    submission_time: datetime
    state: JobState
    account: str = ""
    allocation_nodes: int = 1
    n_cpus: int = 0
    req_mem: str = ""  # Requested memory (e.g., "32G")
    max_rss: int = 0  # Max RSS in bytes (for efficiency calc)

    @property
    def is_running(self) -> bool:
        """Check if job is currently running."""
        return self.state.is_running

    @property
    def is_problematic(self) -> bool:
        """Check if job ended with a problem."""
        return self.state.is_problematic

    @property
    def gpu_hours(self) -> float:
        """Allocated GPU-hours (n_gpus x elapsed time, not utilization)."""
        return (self.n_gpus * self.elapsed_seconds) / 3600

    @classmethod
    def from_sacct(cls, job: SacctJob) -> JobRecord:
        """Parse a job record from sacct JSON output.

        Args:
            job: Validated SacctJob from sacct --json output

        Returns:
            Parsed JobRecord

        """
        # Extract GPU count and CPU count from TRES
        n_gpus = 0
        n_cpus = 0
        for tres in job.tres.allocated:
            if tres.type == "gres" and tres.name == "gpu":
                n_gpus = tres.count
            elif tres.type == "cpu":
                n_cpus = tres.count

        # Parse state (using our enum)
        state_str = job.state.current[0] if job.state.current else "UNKNOWN"
        state = JobState.from_slurm(state_str)

        # Get max RSS from steps
        max_rss = max((step.statistics.RSS.max.value for step in job.steps), default=0)

        return cls(
            job_id=job.job_id,
            name=job.name,
            user=job.user,
            qos=job.qos,
            account=job.account,
            n_gpus=n_gpus,
            n_cpus=n_cpus,
            req_mem=job.required.memory,
            max_rss=max_rss,
            elapsed_seconds=job.time.elapsed,
            start_time=datetime.fromtimestamp(job.time.start, tz=UTC)
            if job.time.start
            else datetime.min.replace(tzinfo=UTC),
            submission_time=datetime.fromtimestamp(job.time.submission, tz=UTC)
            if job.time.submission
            else datetime.min.replace(tzinfo=UTC),
            state=state,
            allocation_nodes=job.allocation_nodes,
        )

gpu_hours property

Allocated GPU-hours (n_gpus x elapsed time, not utilization).

is_problematic property

Check if job ended with a problem.

is_running property

Check if job is currently running.

from_sacct(job) classmethod

Parse a job record from sacct JSON output.

Parameters:

Name Type Description Default
job SacctJob

Validated SacctJob from sacct --json output

required

Returns:

Type Description
JobRecord

Parsed JobRecord

Source code in src/slurmq/core/models.py
@classmethod
def from_sacct(cls, job: SacctJob) -> JobRecord:
    """Parse a job record from sacct JSON output.

    Args:
        job: Validated SacctJob from sacct --json output

    Returns:
        Parsed JobRecord

    """
    # Extract GPU count and CPU count from TRES
    n_gpus = 0
    n_cpus = 0
    for tres in job.tres.allocated:
        if tres.type == "gres" and tres.name == "gpu":
            n_gpus = tres.count
        elif tres.type == "cpu":
            n_cpus = tres.count

    # Parse state (using our enum)
    state_str = job.state.current[0] if job.state.current else "UNKNOWN"
    state = JobState.from_slurm(state_str)

    # Get max RSS from steps
    max_rss = max((step.statistics.RSS.max.value for step in job.steps), default=0)

    return cls(
        job_id=job.job_id,
        name=job.name,
        user=job.user,
        qos=job.qos,
        account=job.account,
        n_gpus=n_gpus,
        n_cpus=n_cpus,
        req_mem=job.required.memory,
        max_rss=max_rss,
        elapsed_seconds=job.time.elapsed,
        start_time=datetime.fromtimestamp(job.time.start, tz=UTC)
        if job.time.start
        else datetime.min.replace(tzinfo=UTC),
        submission_time=datetime.fromtimestamp(job.time.submission, tz=UTC)
        if job.time.submission
        else datetime.min.replace(tzinfo=UTC),
        state=state,
        allocation_nodes=job.allocation_nodes,
    )

options: show_source: false

slurmq.core.quota.UsageReport dataclass

A user's quota usage report.

GPU-hours are allocation-based (reserved time * GPUs), not utilization.

Source code in src/slurmq/core/models.py
@dataclass
class UsageReport:
    """A user's quota usage report.

    GPU-hours are allocation-based (reserved time * GPUs), not utilization.

    """

    user: str
    qos: str
    used_gpu_hours: float
    quota_limit: int
    rolling_window_days: int
    active_jobs: list[JobRecord] = field(default_factory=list)
    warning_threshold: float = 0.8
    critical_threshold: float = 1.0

    @property
    def remaining_gpu_hours(self) -> float:
        """Allocated GPU-hours remaining in quota."""
        return self.quota_limit - self.used_gpu_hours

    @property
    def usage_percentage(self) -> float:
        """Usage as a fraction (0.0 to 1.0+)."""
        if self.quota_limit == 0:
            return 0.0
        return self.used_gpu_hours / self.quota_limit

    @property
    def status(self) -> QuotaStatus:
        """Current quota status."""
        return QuotaStatus.from_usage(self.usage_percentage, self.warning_threshold, self.critical_threshold)

remaining_gpu_hours property

Allocated GPU-hours remaining in quota.

status property

Current quota status.

usage_percentage property

Usage as a fraction (0.0 to 1.0+).

options: show_source: false

Usage example

from slurmq.core.config import load_config
from slurmq.core.quota import QuotaChecker

# Load config
config = load_config()
cluster = config.clusters[config.default_cluster]

# Check quota
checker = QuotaChecker(cluster)
report = checker.get_user_usage("alice")

print(f"Used: {report.used_hours:.1f} GPU-hours")
print(f"Remaining: {report.remaining_hours:.1f} GPU-hours")