PostgreSQL可以做到Redis能做的一切 - PGSQL
我之前用的是一套很典型的Web应用技术栈:
- PostgreSQL负责持久化数据存储
- Redis负责缓存、发布订阅以及后台任务处理
两个数据库,两个体系需要管理,也意味着多了两处故障风险点。
后来我意识到:PostgreSQL可以做到Redis能做的一切。
于是我彻底移除了Redis,迁移过程是这样的。
一、设置:我使用Redis的目的
在替换之前,Redis主要处理三件事:
1、缓存(使用率70%)
// Cache API responsesawait redis.set(`user:${id}`, JSON.stringify(user), 'EX', 3600);
2、发布订阅(使用率20%)
// Real-time notificationsredis.publish('notifications', JSON.stringify({ userId, message }));
3、后台消息队列(使用率10%)
// Using Bull/BullMQqueue.add('send-email', { to, subject, body });
痛点:
- 需要备份两个数据库
- Redis使用内存(规模化时成本很高)
- Redis持久化机制……很复杂
- Postgres和Redis之间还存在一次网络跳转开销
二、我为什么考虑替换Redis
原因一:成本
我的Redis配置:
- AWS ElastiCache(2GB):每月45美元
- 若扩容至 5GB,每月费用将增至110美元
PostgreSQL:
- 已付费使用RDS(20GB存储):每月50美元
- 即便增加5GB数据流量:每月仅需0.5美元
节省成本:每月约100美元
原因二:运行复杂性
使用 Redis:
Postgres backup✅
Redisbackup❓ (RDB? AOF?Both?)
Postgresmonitoring✅
Redismonitoring❓
Postgresfailover✅
Redis Sentinel/Cluster ❓
不使用Redis:
Postgres backup✅
Postgresmonitoring✅
Postgresfailover✅
系统依赖组件更少。
原因三:数据一致性
经典问题:
//Updatedatabaseawait db.query('UPDATE users SET name = $1 WHERE id = $2', [name,id]);
// Invalidatecacheawait redis.del(`user:${id}`);
// ⚠️ What if Redis is down?
// ⚠️ What if this fails?
// NowcacheandDBareoutofsync
在PostgreSQL中,这类问题通过事务即可解决。
三、PostgreSQL特性
1、使用非日志表进行缓存
Redis:
await redis.set('session:abc123', JSON.stringify(sessionData), 'EX', 3600);
PostgreSQL:
CREATEUNLOGGEDTABLE cache (
keyTEXTPRIMARYKEY,
valueJSONBNOT NULL,
expires_at TIMESTAMPTZNOT NULL);CREATEINDEXidx_cache_expiresON cache(expires_at);
插入:
INSERTINTO cache(key, value, expires_at)VALUES ($1, $2,NOW() + INTERVAL '1 hour')ON CONFLICT (key)DO UPDATE SET value = EXCLUDED.value,
expires_at = EXCLUDED.expires_at;
读:
SELECT value FROM cacheWHERE key = $1 AND expires_at > NOW();
清理(定期运行):
DELETE FROM cache WHERE expires_at < NOW();
什么是非日志表?
- 跳过预写式日志(WAL)
- 写入性能大幅提升
- 崩溃后数据不保留(非常适合用作缓存!)
表现:
Redis SET: 0.05msPostgres UNLOGGEDINSERT: 0.08ms
用作缓存已经完全够用。
2、基于LISTEN或NOTIFY实现发布订阅功能
接下来就精彩了。
PostgreSQL具有原生的发布订阅功能,但大多数开发人员并不了解。
1)Redis的发布订阅功能
// Publisherredis.publish('notifications', JSON.stringify({ userId: 123, msg: 'Hello' }));// Subscriberredis.subscribe('notifications');redis.on('message', (channel, message) => {
console.log(message);
});
2)PostgreSQL的发布订阅功能
-- Publisher
NOTIFY notifications, '{"userId": 123, "msg": "Hello"}';
// Subscriber (Node.js with pg)const client = new Client({ connectionString: process.env.DATABASE_URL });await client.connect();await client.query('LISTEN notifications');
client.on('notification', (msg) => {
const payload = JSON.parse(msg.payload);
console.log(payload);
});
性能对比:
Redis pub/sublatency: 1-2msPostgres NOTIFYlatency: 2-5ms
性能略低,但优势明显:
- 无需额外部署中间件
- 可在事务中使用
- 可与查询语句结合使用
3)实际应用场景:实时日志追踪
在我的日志管理应用中,需要实现日志实时流式推送。
使用Redis:
// When new log arrives
await db.query('INSERTINTOlogs...');
await redis.publish('logs:new', JSON.stringify(log));
// Frontend listens
redis.subscribe('logs:new');
问题:有两个操作,如果发布失败怎么办?
使用PostgreSQL:
CREATE FUNCTION notify_new_log() RETURNS TRIGGER AS $$BEGIN PERFORM pg_notify('logs_new', row_to_json(NEW)::text);
RETURN NEW;END;
$$ LANGUAGE plpgsql;CREATE TRIGGER log_insertedAFTER INSERT ON logsFOR EACH ROW EXECUTE FUNCTION notify_new_log();
现在整个操作是原子性的:插入数据与通知推送,要么同时生效,要么都不执行。
// Frontend (via SSE)app.get('/logs/stream', async (req, res) => {
const client = await pool.connect();
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
});
await client.query('LISTEN logs_new');
client.on('notification', (msg) => {
res.write(`data: ${msg.payload}\n\n`);
});
});
结果:无需Redis即可实现实时日志流传输。
3、基于SKIP LOCKED实现任务队列
Redis(使用Bull或者BullMQ):
queue.add('send-email', { to, subject, body });
queue.process('send-email', async (job) => {
await sendEmail(job.data);
});
PostgreSQL:
CREATETABLE jobs (
idBIGSERIAL PRIMARYKEY,
queueTEXT NOTNULL,
payload JSONB NOTNULL,
attempts INT DEFAULT 0,
max_attempts INT DEFAULT 3,
scheduled_at TIMESTAMPTZ DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW()
);CREATE INDEXidx_jobs_queue ON jobs(queue, scheduled_at) WHERE attempts < max_attempts;
入队:
INSERT INTOjobs (queue, payload)VALUES ('send-email','{"to": "[email protected]", "subject": "Hi"}');
工作进程(出队):
WITH next_job AS (
SELECT id FROM jobs
WHERE queue = $1 AND attempts < max_attempts
AND scheduled_at <= NOW()
ORDER BY scheduled_at
LIMIT 1 FOR UPDATE SKIP LOCKED)UPDATE jobsSET attempts = attempts + 1FROM next_jobWHERE jobs.id = next_job.idRETURNING *;
神奇之处:FOR UPDATE SKIP LOCKED
这让PostgreSQL成为了无锁队列:
- 多个工作进程可并发拉取任务
- 任务不会被重复处理
- 若工作进程崩溃,任务会自动重新变为可执行状态
表现:
Redis BRPOP: 0.1msPostgres SKIPLOCKED: 0.3ms
对于大多数业务负载而言,性能差异可以忽略不计。
4、限流
Redis(经典限流方案):
const key = `ratelimit:${userId}`;const count = await redis.incr(key);if (count === 1) {
await redis.expire(key, 60); // 60 seconds}if (count > 100) {
throw new Error('Rate limit exceeded');
}
PostgreSQL:
CREATETABLE rate_limits (
user_id INT PRIMARYKEY,
request_count INT DEFAULT 0,
window_start TIMESTAMPTZ DEFAULT NOW()
);-- Check and incrementWITH current AS (
SELECT
request_count,
CASE
WHEN window_start < NOW() - INTERVAL '1 minute' THEN 1 -- Reset counter ELSE request_count + 1 END AS new_count
FROM rate_limits
WHERE user_id = $1 FOR UPDATE)UPDATE rate_limitsSET
request_count = (SELECT new_count FROM current),
window_start = CASE WHEN window_start < NOW() - INTERVAL '1 minute' THEN NOW()
ELSE window_start
ENDWHERE user_id = $1RETURNINGrequest_count;
或者用窗口函数更简单:
CREATETABLE api_requests (
user_id INT NOTNULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);-- Check rate limitSELECT COUNT(*) FROM api_requestsWHERE user_id = $1 AND created_at > NOW() - INTERVAL '1 minute';
-- If under limit, insertINSERTINTO api_requests (user_id) VALUES ($1);-- Cleanup old requests periodicallyDELETE FROM api_requests WHERE created_at < NOW() - INTERVAL '5 minutes';
Postgres的适用场景:
- 需要基于复杂业务逻辑做限流(而非仅简单计数)
- 希望限流数据与业务逻辑在同一事务中处理
Redis的适用场景:
- 需要亚毫秒级限流
- 极高吞吐量(每秒数百万请求)
5、基于JSONB实现会话存储
Redis:
await redis.set(`session:${sessionId}`, JSON.stringify(sessionData), 'EX', 86400);
PostgreSQL:
CREATETABLEsessions (
idTEXTPRIMARYKEY,
data JSONBNOTNULL,
expires_at TIMESTAMPTZNOTNULL);CREATEINDEXidx_sessions_expiresONsessions(expires_at);-- Insert/UpdateINSERTINTOsessions (id, data, expires_at)VALUES($1, $2,NOW() +INTERVAL '24 hours')ONCONFLICT (id) DO UPDATE SET data = EXCLUDED.data,
expires_at = EXCLUDED.expires_at;
-- ReadSELECT data FROMsessionsWHERE id= $1 ANDexpires_at >NOW();
附加内容:JSONB 运算符
你可以在会话内部进行查询:
-- Find all sessions for a specific userSELECT * FROM sessionsWHERE data->>'userId' = '123';-- Find sessions with specific roleSELECT * FROM sessionsWHERE data->'user'->>'role' = 'admin';
你用Redis做不到这一点!
四、实际生产环境基准测试
我用生产数据集完成了基准测试:
1、测试设置
- 硬件: AWS RDS db.t3.medium(2个虚拟CPU,4GB内存)
- 数据集:100万条缓存条目,1万个会话
- 工具:pgbench(自定义脚本)
2、结果
| 操作 | Redis | PostgreSQL | 不同之处 |
| 缓存集 | 0.05毫秒 | 0.08毫秒 | 速度降低 60% |
| 缓存获取 | 0.04毫秒 | 0.06毫秒 | 速度降低 50% |
| 发布订阅 | 1.2毫秒 | 3.1毫秒 | 速度降低 158% |
| 队列推送 | 0.08毫秒 | 0.15毫秒 | 速度降低 87% |
| 队列弹出 | 0.12毫秒 | 0.31毫秒 | 速度降低 158% |
PostgreSQL速度较慢,但是:
- 所有操作耗时均保持在1毫秒以内
- 省去了与Redis交互的网络开销
- 降低基础设施复杂性
3、合并执行(真正的胜利)
场景:插入数据 + 缓存失效 + 通知订阅者
使用Redis:
await db.query('INSERT INTO posts ...'); // 2msawait redis.del('posts:latest'); // 1ms (network hop)await redis.publish('posts:new', data); // 1ms (network hop)// Total: ~4ms
使用PostgreSQL:
BEGIN;INSERTINTO posts ...; -- 2msDELETE FROM cache WHERE key = 'posts:latest'; -- 0.1ms (same connection)NOTIFY posts_new, '...'; -- 0.1ms (same connection)COMMIT;-- Total: ~2.2ms
当多个操作合并执行时,PostgreSQL速度更快。
五、哪些场景仍建议保留Redis
如果符合以下条件,请不要替换Redis:
1、需要极致的性能
Redis: 100,000+ ops/sec (single instance)Postgres: 10,000-50,000 ops/sec
如果你每秒执行数百万次缓存读取操作,那就继续使用 Redis。
2、使用Redis特有的数据结构
Redis具备:
- 有序集合(排行榜)
- HyperLogLog(基数统计)
- 地理空间索引
- Streams(高级发布订阅)
PostgreSQL 虽有对应实现,但使用起来更为繁琐:
-- Leaderboard in Postgres (slower)SELECT user_id, scoreFROM leaderboardORDER BY score DESCLIMIT 10;-- vs RedisZREVRANGE leaderboard 0 9 WITHSCORES
3、架构需要独立缓存层
如果你的架构要求独立的缓存层(例如微服务架构),建议保留Redis。
六、迁移方案
不要一夜之间就彻底放弃Redis,以下是我的做法:
第一阶段:并排共存(第1周)
// Write to bothawait redis.set(key, value);await pg.query('INSERT INTO cache ...');// Read from Redis (still primary)let data = await redis.get(key);
监控:对比命中率、延迟。
第二阶段:从Postgres读取数据(第2周)
// Try Postgres firstlet data = await pg.query('SELECT value FROM cache WHERE key = $1', [key]);// Fallback to Redisif (!data) {
data = await redis.get(key);
}
监控:错误率、性能。
第三阶段:仅写入Postgres(第3周)
// Only write to Postgresawait pg.query('INSERT INTO cache ...');
监控:所有功能是否正常运行?
第四阶段:移除Redis(第4周)
# Turn off Redis# Watch for errors# Nothing breaks? Success!
七、代码示例:完整实现
1、缓存模块(PostgreSQL)
// cache.jsclass PostgresCache {
constructor(pool) {
this.pool = pool;
}
async get(key) {
const result = await this.pool.query(
'SELECT value FROM cache WHERE key = $1 AND expires_at > NOW()',
[key]
);
return result.rows[0]?.value;
}
async set(key, value, ttlSeconds = 3600) {
await this.pool.query(
`INSERT INTO cache (key, value, expires_at)
VALUES ($1, $2, NOW() + INTERVAL '${ttlSeconds} seconds')
ON CONFLICT (key) DO UPDATE
SET value = EXCLUDED.value,
expires_at = EXCLUDED.expires_at`,
[key, value]
);
}
async delete(key) {
await this.pool.query('DELETE FROM cache WHERE key = $1', [key]);
}
async cleanup() {
await this.pool.query('DELETE FROM cache WHERE expires_at < NOW()');
}
}module.exports = PostgresCache;
2、发布订阅模块
// pubsub.jsclass PostgresPubSub {
constructor(pool) {
this.pool = pool;
this.listeners = new Map();
}
async publish(channel, message) {
const payload = JSON.stringify(message);
await this.pool.query('SELECT pg_notify($1, $2)', [channel, payload]);
}
async subscribe(channel, callback) {
const client = await this.pool.connect();
await client.query(`LISTEN ${channel}`);
client.on('notification', (msg) => {
if (msg.channel === channel) {
callback(JSON.parse(msg.payload));
}
});
this.listeners.set(channel, client);
}
async unsubscribe(channel) {
const client = this.listeners.get(channel);
if (client) {
await client.query(`UNLISTEN ${channel}`);
client.release();
this.listeners.delete(channel);
}
}
}module.exports = PostgresPubSub;
3、任务队列模块
// queue.jsclass PostgresQueue {
constructor(pool) {
this.pool = pool;
}
async enqueue(queue, payload, scheduledAt = new Date()){
await this.pool.query(
'INSERT INTO jobs (queue, payload, scheduled_at) VALUES ($1, $2, $3)',
[queue, payload, scheduledAt]
);
}
async dequeue(queue){
const result = await this.pool.query(
`WITH next_jobAS( SELECT id FROM jobs
WHERE queue = $1 AND attempts < max_attempts
AND scheduled_at <= NOW()
ORDER BY scheduled_at
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE jobs
SET attempts= attempts +1 FROM next_job
WHERE jobs.id = next_job.id
RETURNING jobs.*`,
[queue]
);
return result.rows[0];
}
async complete(jobId){
await this.pool.query('DELETE FROM jobs WHERE id = $1', [jobId]);
}
async fail(jobId, error){
await this.pool.query(
`UPDATE jobs
SET attempts = max_attempts,
payload = payload || jsonb_build_object('error', $2)
WHERE id = $1`,
[jobId, error.message]
);
}
}
module.exports = PostgresQueue;
八、性能优化技巧
1、使用连接池
const { Pool } = require('pg');const pool = new Pool({
max: 20, // Max connections idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000,
});
2、添加合适的索引
CREATE INDEXCONCURRENTLY idx_cache_key ON cache(key) WHERE expires_at > NOW();CREATE INDEXCONCURRENTLY idx_jobs_pending ON jobs(queue, scheduled_at)
WHERE attempts < max_attempts;
3、调整PostgreSQL配置
# postgresql.confshared_buffers = 2GB # 25% of RAMeffective_cache_size = 6GB # 75% of RAMwork_mem = 50MB # For complex queriesmaintenance_work_mem = 512MB # For VACUUM
4、定期维护
-- Run dailyVACUUMANALYZEcache;
VACUUMANALYZEjobs;-- Or enable autovacuum (recommended)ALTERTABLE cache SET (autovacuum_vacuum_scale_factor = 0.1);
九、三个月后的结果
我省下了:
- 每月100美元(不再使用 ElastiCache)
- 备份复杂性降低50%
- 少监控一项服务
- 更简单的部署(减少一项依赖)
我失去了:
- 缓存操作延迟则增加约0.5毫秒
- Redis特有的数据结构(其实并不需要)
我会再次这样做吗?就这个业务场景而言:会。
是否推荐所有人都这么做?不推荐。
十、决策矩阵
如果满足以下条件,可用Postgres替换Redis:
- 仅用Redis做简单缓存或者会话管理
- 缓存命中率低于95%(写入次数过多)
- 需要事务一致性
- 可以接受操作速度慢0.1-1毫秒
- 团队规模小,运维资源有限
以下场景建议保留Redis:
- 每秒10万次以上的操作量
- 使用Redis特有的数据结构(有序集合等)
- 配备专业的运维团队
- 亚毫秒级延迟为核心要求
- 需要跨区域地理复制
十一、参考资料
1、PostgreSQL 特性
- LISTEN/NOTIFY 官方文档
- SKIP LOCKED 语法
- UNLOGGED 表
2、工具
- pgBouncer - 连接池
- pg_stat_statements - 查询性能
3、其他解决方案
- Graphile Worker - 基于Postgres的任务队列
- pg-boss - 另一款Postgres队列实现
十二、最后
我用PostgreSQL替换了Redis的这些场景:
- 缓存 → UNLOGGED 表
- 发布订阅 → LISTEN/NOTIFY
- 任务队列 → SKIP LOCKED
- 会话存储 → JSONB 表
结果:
- 每月节省100美元
- 降低了运维复杂度
- 性能略有下降(延迟增加 0.1–1ms),但可接受
- 保证了事务一致性
适合这样做的场景:
- 中小型应用
- 简单的缓存需求
- 希望减少系统组件、简化架构
不适合这样做的场景:
- 性能要求高(每秒10万次以上操作)
- 用Redis特有的功能
- 配备专职运维团队
你是否用过Postgres替换Redis(或反过来用Redis替换Postgres)?实际体验如何?欢迎在评论区分享你的基准测试数据!
作者丨Polliog 编译丨dbaplus社群
来源丨网址:
https://dev.to/polliog/i-replaced-redis-with-postgresql-and-its-faster-4942