Skip to content

Commit e4a1a5a

Browse files
committed
libsql-sqlite3: Add conflict detection to libsql_wal_insert_frame()
1 parent 1127914 commit e4a1a5a

6 files changed

Lines changed: 74 additions & 8 deletions

File tree

libsql-sqlite3/doc/libsql_extensions.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,8 @@ static void sync_db(sqlite3 *db_primary, sqlite3 *db_backup){
379379
for(int i=1; i<=max_frame; i++){
380380
char frame[4096+24];
381381
libsql_wal_get_frame(db_primary, i, frame, sizeof(frame));
382-
libsql_wal_insert_frame(db_backup, i, frame, sizeof(frame));
382+
int conflict;
383+
libsql_wal_insert_frame(db_backup, i, frame, sizeof(frame), &conflict);
383384
}
384385
libsql_wal_end_commit(db_backup);
385386
}

libsql-sqlite3/src/main.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2557,7 +2557,8 @@ int libsql_wal_insert_frame(
25572557
sqlite3* db,
25582558
unsigned int iFrame,
25592559
void *pBuf,
2560-
unsigned int nBuf
2560+
unsigned int nBuf,
2561+
int *pConflict
25612562
){
25622563
int rc = SQLITE_OK;
25632564
Pager *pPager;
@@ -2572,7 +2573,7 @@ int libsql_wal_insert_frame(
25722573

25732574
sqlite3_mutex_enter(db->mutex);
25742575
pPager = sqlite3BtreePager(db->aDb[0].pBt);
2575-
rc = sqlite3PagerWalInsert(pPager, iFrame, pBuf, nBuf);
2576+
rc = sqlite3PagerWalInsert(pPager, iFrame, pBuf, nBuf, pConflict);
25762577
if (rc != SQLITE_OK) {
25772578
goto out_unlock;
25782579
}

libsql-sqlite3/src/pager.c

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7826,9 +7826,12 @@ int sqlite3PagerWalEndCommit(Pager *pPager) {
78267826
return rc;
78277827
}
78287828

7829-
int sqlite3PagerWalInsert(Pager *pPager, unsigned int iFrame, void *pBuf, unsigned int nBuf) {
7829+
int sqlite3PagerWalInsert(Pager *pPager, unsigned int iFrame, void *pBuf, unsigned int nBuf, int *pConflict) {
78307830
int rc = SQLITE_OK;
78317831

7832+
if( pConflict ) {
7833+
*pConflict = 0;
7834+
}
78327835
if (!pagerUseWal(pPager)) {
78337836
return SQLITE_ERROR;
78347837
}
@@ -7838,6 +7841,22 @@ int sqlite3PagerWalInsert(Pager *pPager, unsigned int iFrame, void *pBuf, unsign
78387841
return rc;
78397842
}
78407843
if (iFrame <= mxFrame) {
7844+
unsigned long frame_len = nBuf-24;
7845+
unsigned char current[frame_len];
7846+
rc = pPager->wal->methods.xReadFrame(pPager->wal->pData, iFrame, frame_len, current);
7847+
if (rc != SQLITE_OK) {
7848+
return rc;
7849+
}
7850+
int conflict = 0;
7851+
if (memcmp(pBuf+24, current, frame_len) != 0) {
7852+
conflict = 1;
7853+
}
7854+
if (pConflict) {
7855+
*pConflict = conflict;
7856+
}
7857+
if (conflict) {
7858+
return SQLITE_ERROR;
7859+
}
78417860
return SQLITE_OK;
78427861
}
78437862
u8 *aFrame = (u8*)pBuf;

libsql-sqlite3/src/pager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ int sqlite3PagerWalFrameCount(Pager *, unsigned int *);
137137
int sqlite3PagerWalReadFrame(Pager *, unsigned int, void *, unsigned int);
138138
int sqlite3PagerWalBeginCommit(Pager*);
139139
int sqlite3PagerWalEndCommit(Pager*);
140-
int sqlite3PagerWalInsert(Pager*, unsigned int, void *, unsigned int);
140+
int sqlite3PagerWalInsert(Pager*, unsigned int, void *, unsigned int, int *);
141141

142142
void sqlite3PagerSetCachesize(Pager*, int);
143143
int sqlite3PagerSetSpillsize(Pager*, int);

libsql-sqlite3/src/sqlite.h.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10602,7 +10602,7 @@ int libsql_wal_insert_end(sqlite3*);
1060210602
** CAPI3REF: Insert a frame into the WAL
1060310603
** METHOD: sqlite3
1060410604
*/
10605-
int libsql_wal_insert_frame(sqlite3*, unsigned int, void *, unsigned int);
10605+
int libsql_wal_insert_frame(sqlite3*, unsigned int, void *, unsigned int, int *);
1060610606

1060710607
/*
1060810608
** CAPI3REF: Low-level system error code

libsql-sqlite3/src/test_walapi.c

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ static void sync_db(sqlite3 *db_primary, sqlite3 *db_backup){
5656
for(int i=1; i<=max_frame; i++){
5757
char frame[4096+24];
5858
ensure(libsql_wal_get_frame(db_primary, i, frame, sizeof(frame)) == SQLITE_OK, "can't get frame: %s\n", sqlite3_errmsg(db_primary));
59-
ensure(libsql_wal_insert_frame(db_backup, i, frame, sizeof(frame)) == SQLITE_OK, "can't inject frame: %s\n", sqlite3_errmsg(db_backup));
59+
int conflict;
60+
ensure(libsql_wal_insert_frame(db_backup, i, frame, sizeof(frame), &conflict) == SQLITE_OK, "can't inject frame: %s\n", sqlite3_errmsg(db_backup));
61+
ensure(conflict == 0, "conflict at frame %d\n", i);
6062
}
6163
ensure(libsql_wal_insert_end(db_backup) == SQLITE_OK, "can't end commit: %s\n", sqlite3_errmsg(db_backup));
6264
}
@@ -110,7 +112,9 @@ void test_sync_by_parts() {
110112
in_commit = 1;
111113
ensure(libsql_wal_insert_begin(db_backup) == SQLITE_OK, "can't begin commit: %s\n", sqlite3_errmsg(db_backup));
112114
}
113-
ensure(libsql_wal_insert_frame(db_backup, i, frame, sizeof(frame)) == SQLITE_OK, "can't inject frame: %s\n", sqlite3_errmsg(db_backup));
115+
int conflict;
116+
ensure(libsql_wal_insert_frame(db_backup, i, frame, sizeof(frame), &conflict) == SQLITE_OK, "can't inject frame: %s\n", sqlite3_errmsg(db_backup));
117+
ensure(conflict == 0, "conflict at frame %d\n", i);
114118
if (is_commit) {
115119
ensure(libsql_wal_insert_end(db_backup) == SQLITE_OK, "can't end commit: %s\n", sqlite3_errmsg(db_backup));
116120
in_commit = 0;
@@ -145,6 +149,44 @@ void test_sync_while_reading() {
145149
cmp_data(db_primary, db_backup);
146150
}
147151

152+
// This test case writes to two different databases and then attempts to sync them to a third database.
153+
// Only the first database should be synced, the second database sync should return a conflict error
154+
void test_conflict() {
155+
sqlite3 *db1, *db2, *db_synced;
156+
open_db("test_conflict_1.db", &db1);
157+
open_db("test_conflict_2.db", &db2);
158+
open_db("test_conflict_synced.db", &db_synced);
159+
160+
ensure(sqlite3_exec(db1, "CREATE TABLE t (x)", 0, 0, 0) == SQLITE_OK, "failed to insert data\n");
161+
ensure(sqlite3_exec(db1, "INSERT INTO t VALUES (randomblob(4 * 1024))", 0, 0, 0) == SQLITE_OK, "failed to insert data\n");
162+
163+
sync_db(db1, db_synced);
164+
165+
ensure(sqlite3_exec(db2, "CREATE TABLE t (x)", 0, 0, 0) == SQLITE_OK, "failed to insert data\n");
166+
ensure(sqlite3_exec(db2, "INSERT INTO t VALUES (randomblob(4 * 1024))", 0, 0, 0) == SQLITE_OK, "failed to insert data\n");
167+
168+
unsigned int max_frame;
169+
ensure(libsql_wal_frame_count(db2, &max_frame) == SQLITE_OK, "can't get frame count: %s\n", sqlite3_errmsg(db2));
170+
ensure(libsql_wal_insert_begin(db_synced) == SQLITE_OK, "can't begin commit: %s\n", sqlite3_errmsg(db_synced));
171+
// First 3 frames should not conflict.
172+
for(int i=1; i<=3; i++){
173+
char frame[4096+24];
174+
ensure(libsql_wal_get_frame(db2, i, frame, sizeof(frame)) == SQLITE_OK, "can't get frame: %s\n", sqlite3_errmsg(db2));
175+
int conflict;
176+
ensure(libsql_wal_insert_frame(db_synced, i, frame, sizeof(frame), &conflict) == SQLITE_OK, "conflict detected: %s\n", sqlite3_errmsg(db_synced));
177+
ensure(conflict == 0, "conflict at frame %d\n", i);
178+
}
179+
// The rest should conflict.
180+
for(int i=4; i<=max_frame; i++){
181+
char frame[4096+24];
182+
ensure(libsql_wal_get_frame(db2, i, frame, sizeof(frame)) == SQLITE_OK, "can't get frame: %s\n", sqlite3_errmsg(db2));
183+
int conflict;
184+
ensure(libsql_wal_insert_frame(db_synced, i, frame, sizeof(frame), &conflict) == SQLITE_ERROR, "conflict not detected: %s\n", sqlite3_errmsg(db_synced));
185+
ensure(conflict == 1, "no conflict at frame %d\n", i);
186+
}
187+
ensure(libsql_wal_insert_end(db_synced) == SQLITE_OK, "can't end commit: %s\n", sqlite3_errmsg(db_synced));
188+
}
189+
148190
int main(int argc, char *argv[])
149191
{
150192
test_huge_payload();
@@ -156,5 +198,8 @@ int main(int argc, char *argv[])
156198
test_sync_while_reading();
157199
printf("============= OK test_sync_while_reading\n");
158200

201+
test_conflict();
202+
printf("============= OK test_conflict\n");
203+
159204
return 0;
160205
}

0 commit comments

Comments
 (0)