SQLite在多线程环境下的应用
- ndv - keakon的涂鸦馆这几天研究了一下SQLite这个嵌入式数据库在多线程环境下的应用,感觉里面的学问还挺多,于是就在此分享一下. 先说下初衷吧,实际上我经常看到有人抱怨SQLite不支持多线程. 而在iOS开发时,为了不阻塞主线程,数据库访问必须移到子线程中. 为了解决这个矛盾,很有必要对此一探究竟. 关于这个问题,最权威的解答当然是SQLite官网上的“Is SQLite threadsafe?”这个问答.
3.4.0 - iPhone OS 2.2.1当然,你也可以自己编译最新版本。只是我发现自己编译出来的3.7.8居然比iOS 4.3.3内置的3.7.2慢了一半,不知道苹果做了什么优化。
3.6.12 - iPhone OS 3.0 / 3.1
3.6.22 - iPhone OS 4.0
3.7.2 - iPhone OS 4.3
3.7.7 - iPhone OS 5.0
# -*- coding: utf-8 -*-
import sqlite3
import threading
def f():
con.rollback()
con = sqlite3.connect('test.db', check_same_thread=False) # 允许在其他线程中使用这个连接
cu = con.cursor()
cu.execute('CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY)')
print cu.execute('SELECT count(*) FROM test').fetchone()[0] # 0
cu.execute('INSERT INTO test VALUES (NULL)')
print cu.execute('SELECT count(*) FROM test').fetchone()[0] # 1
thread = threading.Thread(target=f)
thread.start()
thread.join()
print cu.execute('SELECT count(*) FROM test').fetchone()[0] # 0
cu.close()
con.close()
在这个例子中,虽然是在子线程中执行rollback,但由于和主线程用的是同一个数据库连接,所以主线程所做的更改也被回滚了。连接1:BEGIN (UNLOCKED)现在2个连接都在等待对方释放锁,于是就死锁了。当然,实际情况并没那么糟糕,任何一方选择不继续等待,回滚事务就行了。
连接1:SELECT ... (SHARED)
连接1:INSERT ... (RESERVED)
连接2:BEGIN (UNLOCKED)
连接2:SELECT ... (SHARED)
连接1:COMMIT (PENDING,尝试获取EXCLUSIVE锁,但还有SHARED锁未释放,返回SQLITE_BUSY)
连接2:INSERT ... (尝试获取RESERVED锁,但已有PENDING锁未释放,返回SQLITE_BUSY)
连接1:BEGIN IMMEDIATE (RESERVED)这样死锁就被避免了。
连接1:SELECT ... (RESERVED)
连接1:INSERT ... (RESERVED)
连接2:BEGIN IMMEDIATE (尝试获取RESERVED锁,但已有RESERVED锁未释放,因此事务开始失败,返回SQLITE_BUSY,等待用户重试)
连接1:COMMIT (EXCLUSIVE,写入完成后释放)
连接2:BEGIN IMMEDIATE (RESERVED)
连接2:SELECT ... (RESERVED)
连接2:INSERT ... (RESERVED)
连接2:COMMIT (EXCLUSIVE,写入完成后释放)
连接1:BEGIN EXCLUSIVE (EXCLUSIVE)不过在并非很高的情况下,直接获取EXCLUSIVE锁的难度比较大;而且为了避免EXCLUSIVE状态长期阻塞其他请求,最好的方式还是让所有写事务都以IMMEDIATE方式开始。
连接1:SELECT ... (EXCLUSIVE)
连接1:INSERT ... (EXCLUSIVE)
连接2:BEGIN (UNLOCKED)
连接2:SELECT ... (尝试获取SHARED锁,但已有EXCLUSIVE锁未释放,返回SQLITE_BUSY,等待用户重试)
连接1:COMMIT (EXCLUSIVE,写入完成后释放)
连接2:SELECT ... (SHARED)
连接2:INSERT ... (RESERVED)
连接2:COMMIT (EXCLUSIVE,写入完成后释放)
#import <sqlite3.h>
static char dbPath[200];
static sqlite3 *database;
static sqlite3 *openDb() {
if (sqlite3_open(dbPath, &database) != SQLITE_OK) {
sqlite3_close(database);
NSLog(@"Failed to open database: %s", sqlite3_errmsg(database));
}
return database;
}
- (void)viewDidLoad {
[super viewDidLoad];
sqlite3_config(SQLITE_CONFIG_SINGLETHREAD);
NSLog(@"%d", sqlite3_threadsafe());
NSLog(@"%s", sqlite3_libversion());
NSArray *paths = NSSearchPathForDirectoriesInDomains(NSDocumentDirectory, NSUserDomainMask, YES);
NSString *documentsDirectory = [paths objectAtIndex:0];
strcpy(dbPath, [[documentsDirectory stringByAppendingPathComponent:@"data.sqlite3"] UTF8String]);
database = openDb();
char *errorMsg;
if (sqlite3_exec(database, "CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY AUTOINCREMENT, value INTEGER);", NULL, NULL, &errorMsg) != SQLITE_OK) {
NSLog(@"Failed to create table: %s", errorMsg);
}
}
static void insertData() {
char *errorMsg;
if (sqlite3_exec(database, "BEGIN TRANSACTION", NULL, NULL, &errorMsg) != SQLITE_OK) {
NSLog(@"Failed to begin transaction: %s", errorMsg);
}
static const char *insert = "INSERT INTO test VALUES (NULL, ?);";
sqlite3_stmt *stmt;
if (sqlite3_prepare_v2(database, insert, -1, &stmt, NULL) == SQLITE_OK) {
for (int i = 0; i < 1000; ++i) {
sqlite3_bind_int(stmt, 1, arc4random());
if (sqlite3_step(stmt) != SQLITE_DONE) {
--i;
NSLog(@"Error inserting table: %s", sqlite3_errmsg(database));
}
sqlite3_reset(stmt);
}
sqlite3_finalize(stmt);
}
if (sqlite3_exec(database, "COMMIT TRANSACTION", NULL, NULL, &errorMsg) != SQLITE_OK) {
NSLog(@"Failed to commit transaction: %s", errorMsg);
}
static const char *query = "SELECT count(*) FROM test;";
if (sqlite3_prepare_v2(database, query, -1, &stmt, NULL) == SQLITE_OK) {
if (sqlite3_step(stmt) == SQLITE_ROW) {
NSLog(@"Table size: %d", sqlite3_column_int(stmt, 0));
} else {
NSLog(@"Failed to read table: %s", sqlite3_errmsg(database));
}
sqlite3_finalize(stmt);
}
}
static dispatch_queue_t queue;
- (void)viewDidLoad {
// ...
queue = dispatch_queue_create("net.keakon.db", NULL);
}
static int lastReadCount = 0;
static int readCount = 0;
static int lastWriteCount = 0;
static int writeCount = 0;
- (void)count {
int lastRead = lastReadCount;
int lastWrite = lastWriteCount;
lastReadCount = readCount;
lastWriteCount = writeCount;
NSLog(@"%d, %d", lastReadCount - lastRead, lastWriteCount - lastWrite);
}
- (void)viewDidLoad {
// ...
[NSTimer scheduledTimerWithTimeInterval:1.0 target:self selector:@selector(count) userInfo:nil repeats:YES];
}
static void readData() {
static const char *query = "SELECT value FROM test WHERE value < ? ORDER BY value DESC LIMIT 1;";
void (^ __block readBlock)() = Block_copy(^{
sqlite3_stmt *stmt;
if (sqlite3_prepare_v2(database, query, -1, &stmt, NULL) == SQLITE_OK) {
sqlite3_bind_int(stmt, 1, arc4random());
int returnCode = sqlite3_step(stmt);
if (returnCode == SQLITE_ROW || returnCode == SQLITE_DONE) {
++readCount;
}
sqlite3_finalize(stmt);
} else {
NSLog(@"Failed to prepare statement: %s", sqlite3_errmsg(database));
}
dispatch_async(queue, readBlock);
});
dispatch_async(queue, readBlock);
}
static void writeData() {
static const char *update = "UPDATE test SET value = ? WHERE id = ?;";
void (^ __block writeBlock)() = Block_copy(^{
sqlite3_stmt *stmt;
if (sqlite3_prepare_v2(database, update, -1, &stmt, NULL) == SQLITE_OK) {
sqlite3_bind_int(stmt, 1, arc4random());
sqlite3_bind_int(stmt, 2, arc4random() % 1000 + 1);
if (sqlite3_step(stmt) == SQLITE_DONE) {
++writeCount;
}
sqlite3_finalize(stmt);
} else {
NSLog(@"Failed to prepare statement: %s", sqlite3_errmsg(database));
}
dispatch_async(queue, writeBlock);
});
dispatch_async(queue, writeBlock);
}
这里是用dispatch_async()来异步地递归调用block。if (sqlite3_exec(database, "PRAGMA journal_mode=WAL;", NULL, NULL, &errorMsg) != SQLITE_OK) {
NSLog(@"Failed to set WAL mode: %s", errorMsg);
}
sqlite3_wal_checkpoint(database, NULL); // 每次测试前先checkpoint,避免WAL文件过大而影响性能
测试结果为只读时平均每秒165次,同时读写时每秒各100次。并发性增加了1倍有木有!更夸张的是写入达到了每秒240次,比读取还快。static sqlite3 *openDb() {
sqlite3 *database = NULL;
if (sqlite3_open(dbPath, &database) != SQLITE_OK) {
sqlite3_close(database);
NSLog(@"Failed to open database: %s", sqlite3_errmsg(database));
}
return database;
}
sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0);
static void readData() {
static const char *query = "SELECT value FROM test WHERE value < ? ORDER BY value DESC LIMIT 1;";
dispatch_async(queue, ^{
sqlite3 *database = openDb();
sqlite3_stmt *stmt;
if (sqlite3_prepare_v2(database, query, -1, &stmt, NULL) == SQLITE_OK) {
while (YES) {
sqlite3_bind_int(stmt, 1, arc4random());
int returnCode = sqlite3_step(stmt);
if (returnCode == SQLITE_ROW || returnCode == SQLITE_DONE) {
++readCount;
}
sqlite3_reset(stmt);
}
sqlite3_finalize(stmt);
} else {
NSLog(@"Failed to prepare statement: %s", sqlite3_errmsg(database));
}
sqlite3_close(database);
});
}
static void writeData() {
static const char *update = "UPDATE test SET value = ? WHERE id = ?;";
dispatch_async(queue, ^{
sqlite3 *database = openDb();
sqlite3_stmt *stmt;
if (sqlite3_prepare_v2(database, update, -1, &stmt, nil) == SQLITE_OK) {
while (YES) {
sqlite3_bind_int(stmt, 1, arc4random());
sqlite3_bind_int(stmt, 2, arc4random() % 1000 + 1);
if (sqlite3_step(stmt) == SQLITE_DONE) {
++writeCount;
}
sqlite3_reset(stmt);
}
sqlite3_finalize(stmt);
} else {
NSLog(@"Failed to prepare statement: %s", sqlite3_errmsg(database));
}
sqlite3_close(database);
});
}
这里就无需递归调用了,直接在子线程中循环即可。static void readData() {
static const char *query = "SELECT value FROM test WHERE value < ? ORDER BY value DESC LIMIT 1;";
dispatch_async(queue, ^{
sqlite3 *database = openDb();
sqlite3_stmt *stmt;
while (sqlite3_prepare_v2(database, query, -1, &stmt, NULL) != SQLITE_OK);
while (YES) {
sqlite3_bind_int(stmt, 1, arc4random());
int returnCode = sqlite3_step(stmt);
if (returnCode == SQLITE_ROW || returnCode == SQLITE_DONE) {
++readCount;
}
sqlite3_reset(stmt);
}
sqlite3_finalize(stmt);
sqlite3_close(database);
});
}
static void writeData() {
static const char *update = "UPDATE test SET value = ? WHERE id = ?;";
dispatch_async(queue, ^{
sqlite3 *database = openDb();
sqlite3_stmt *stmt;
while (sqlite3_prepare_v2(database, update, -1, &stmt, nil) != SQLITE_OK);
while (YES) {
sqlite3_bind_int(stmt, 1, arc4random());
sqlite3_bind_int(stmt, 2, arc4random() % 1000 + 1);
if (sqlite3_step(stmt) == SQLITE_DONE) {
++writeCount;
}
sqlite3_reset(stmt);
}
sqlite3_finalize(stmt);
sqlite3_close(database);
});
}
结果为只读时平均每秒170次,同时读写时每秒分别为70和50次(波动较大)。读取效率有了显著提升,但仍不及第二种方式。sqlite3_config(SQLITE_CONFIG_SERIALIZED);
static void readData() {
static const char *query = "SELECT value FROM test WHERE value < ? ORDER BY value DESC LIMIT 1;";
dispatch_async(queue, ^{
sqlite3_stmt *stmt;
if (sqlite3_prepare_v2(database, query, -1, &stmt, NULL) == SQLITE_OK) {
while (YES) {
sqlite3_bind_int(stmt, 1, arc4random());
int returnCode = sqlite3_step(stmt);
if (returnCode == SQLITE_ROW || returnCode == SQLITE_DONE) {
++readCount;
}
sqlite3_reset(stmt);
}
sqlite3_finalize(stmt);
} else {
NSLog(@"Failed to prepare statement: %s", sqlite3_errmsg(database));
}
});
}
static void writeData() {
static const char *update = "UPDATE test SET value = ? WHERE id = ?;";
dispatch_async(queue, ^{
sqlite3_stmt *stmt;
if (sqlite3_prepare_v2(database, update, -1, &stmt, NULL) == SQLITE_OK) {
while (YES) {
sqlite3_bind_int(stmt, 1, arc4random());
sqlite3_bind_int(stmt, 2, arc4random() % 1000 + 1);
if (sqlite3_step(stmt) == SQLITE_DONE) {
++writeCount;
}
sqlite3_reset(stmt);
}
sqlite3_finalize(stmt);
} else {
NSLog(@"Failed to prepare statement: %s", sqlite3_errmsg(database));
}
});
}
测试结果为只读时平均每秒170次,同时读写时每秒分别为57和43次。读线程比写线程的速率更高,而且新线程的加入不需要等待。