PostgreSQL-based job queue with priority scheduling, batch claiming, and progress tracking. Use when building job queues without external dependencies. Triggers on PostgreSQL job queue, background jobs, task queue, priority queue, SKIP LOCKED.
数据来源:ClawHub。 在 ClawSkills 查看
选择你使用的 Agent
方法一:命令行安装(推荐)
推荐(无需提前安装 clawhub)
npx clawhub@latest --dir ~/.claude/skills install postgres-job-queue或使用 clawhub CLI(需提前安装)
clawhub --dir ~/.claude/skills install postgres-job-queue⚠️ 需要 Node.js 18+,没有 Node?请使用下方方法二直接下载 ZIP。 安装 Node.js →
方法二:手动下载安装(无需 Node)
下载 ZIP,解压后将文件夹放到以下路径,重启 Agent 即可:
安装路径
~/.claude/skills/postgres-job-queue/💡解压后将文件夹放到上方路径,重启 Agent 即可生效
--- name: postgres-job-queue model: standard description: PostgreSQL-based job queue with priority scheduling, batch claiming, and progress tracking. Use when building job queues without external dependencies. Triggers on PostgreSQL job queue, background jobs, task queue, priority queue, SKIP LOCKED. ---
Production-ready job queue using PostgreSQL with priority scheduling, batch claiming, and progress tracking.
---
---
CREATE TABLE jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_type VARCHAR(50) NOT NULL,
priority INT NOT NULL DEFAULT 100,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
data JSONB NOT NULL DEFAULT '{}',
-- Progress tracking
progress INT DEFAULT 0,
current_stage VARCHAR(100),
events_count INT DEFAULT 0,
-- Worker tracking
worker_id VARCHAR(100),
claimed_at TIMESTAMPTZ,
-- Timing
created_at TIMESTAMPTZ DEFAULT NOW(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
-- Retry handling
attempts INT DEFAULT 0,
max_attempts INT DEFAULT 3,
last_error TEXT,
CONSTRAINT valid_status CHECK (
status IN ('pending', 'claimed', 'running', 'completed', 'failed', 'cancelled')
)
);
-- Critical: Partial index for fast claiming
CREATE INDEX idx_jobs_claimable ON jobs (priority DESC, created_at ASC)
WHERE status = 'pending';
CREATE INDEX idx_jobs_worker ON jobs (worker_id)
WHERE status IN ('claimed', 'running');
---
CREATE OR REPLACE FUNCTION claim_job_batch(
p_worker_id VARCHAR(100),
p_job_types VARCHAR(50)[],
p_batch_size INT DEFAULT 10
) RETURNS SETOF jobs AS $$
BEGIN
RETURN QUERY
WITH claimable AS (
SELECT id
FROM jobs
WHERE status = 'pending'
AND job_type = ANY(p_job_types)
AND attempts < max_attempts
ORDER BY priority DESC, created_at ASC
LIMIT p_batch_size
FOR UPDATE SKIP LOCKED -- Critical: skip locked rows
),
claimed AS (
UPDATE jobs
SET status = 'claimed',
worker_id = p_worker_id,
claimed_at = NOW(),
attempts = attempts + 1
WHERE id IN (SELECT id FROM claimable)
RETURNING *
)
SELECT * FROM claimed;
END;
$$ LANGUAGE plpgsql;
---
const (
PriorityExplicit = 150 // User-requested
PriorityDiscovered = 100 // System-discovered
PriorityBackfill = 30 // Background backfills
)
type JobQueue struct {
db *pgx.Pool
workerID string
}
func (q *JobQueue) Claim(ctx context.Context, types []string, batchSize int) ([]Job, error) {
rows, err := q.db.Query(ctx,
"SELECT * FROM claim_job_batch($1, $2, $3)",
q.workerID, types, batchSize,
)
if err != nil {
return nil, err
}
defer rows.Close()
var jobs []Job
for rows.Next() {
var job Job
if err := rows.Scan(&job); err != nil {
return nil, err
}
jobs = append(jobs, job)
}
return jobs, nil
}
func (q *JobQueue) Complete(ctx context.Context, jobID uuid.UUID) error {
_, err := q.db.Exec(ctx, `
UPDATE jobs
SET status = 'completed',
progress = 100,
completed_at = NOW()
WHERE id = $1`,
jobID,
)
return err
}
func (q *JobQueue) Fail(ctx context.Context, jobID uuid.UUID, errMsg string) error {
_, err := q.db.Exec(ctx, `
UPDATE jobs
SET status = CASE
WHEN attempts >= max_attempts THEN 'failed'
ELSE 'pending'
END,
last_error = $2,
worker_id = NULL,
claimed_at = NULL
WHERE id = $1`,
jobID, errMsg,
)
return err
}
---
func (q *JobQueue) RecoverStaleJobs(ctx context.Context, timeout time.Duration) (int, error) {
result, err := q.db.Exec(ctx, `
UPDATE jobs
SET status = 'pending',
worker_id = NULL,
claimed_at = NULL
WHERE status IN ('claimed', 'running')
AND claimed_at < NOW() - $1::interval
AND attempts < max_attempts`,
timeout.String(),
)
if err != nil {
return 0, err
}
return int(result.RowsAffected()), nil
}
---
| Scenario | Approach | |----------|----------| | Need guaranteed delivery | PostgreSQL queue | | Need sub-ms latency | Use Redis instead | | < 1000 jobs/sec | PostgreSQL is fine | | > 10000 jobs/sec | Add Redis layer | | Need strict ordering | Single worker per type |
---
---
安装 Postgres Job Queue 后,可以对 AI 说这些话来触发它
Help me get started with Postgres Job Queue
Explains what Postgres Job Queue does, walks through the setup, and runs a quick demo based on your current project
Use Postgres Job Queue to postgreSQL-based job queue with priority scheduling, batch claiming...
Invokes Postgres Job Queue with the right parameters and returns the result directly in the conversation
What can I do with Postgres Job Queue in my developer & devops workflow?
Lists the top use cases for Postgres Job Queue, with example commands for each scenario
将技能文件夹放到 ~/.claude/skills/postgres-job-queue/ 目录(个人级,所有项目可用),或 .claude/skills/postgres-job-queue/(项目级)。重启 AI 客户端后,用 /postgres-job-queue 主动调用,或让 AI 根据上下文自动发现并使用。
Postgres Job Queue 支持 Claude、Cursor、OpenClaw,可与这些 AI 平台无缝集成,扩展其能力。
Postgres Job Queue 可免费安装使用。请查阅仓库了解许可证信息。
PostgreSQL-based job queue with priority scheduling, batch claiming, and progress tracking. Use when building job queues without external dependencies. Triggers on PostgreSQL job queue, background jobs, task queue, priority queue, SKIP LOCKED.
Postgres Job Queue 属于「Developer & DevOps」分类,该分类的技能帮助 AI 智能体在此领域执行专业任务。
Automate my developer & devops tasks using Postgres Job Queue
Identifies repetitive steps in your workflow and sets up Postgres Job Queue to handle them automatically