I'm working with rxdb and I have pull and push handlers for the backend I have used supabase
I have setup the code for replication as follows:
replication.ts
import { RxDatabase } from "rxdb";
import { RxReplicationPullStreamItem } from "rxdb/dist/types/types";
import { replicateRxCollection } from "rxdb/plugins/replication";
import { Subject } from "rxjs";
import { supabaseClient, SUPABASE_URL } from "src/config/supabase";
import { DbTables } from "src/constants/db";
import {
blockPullHandler,
blockPushHandler,
} from "./repilicationhandlers/block";
import { CheckpointType, RxBlockDocument, RxBlocksCollections } from "./types";
export async function startReplication(
database: RxDatabase<RxBlocksCollections>
) {
const pullStream$ = new Subject<
RxReplicationPullStreamItem<RxBlockDocument, CheckpointType>
>();
supabaseClient
.from(DbTables.Block)
.on("*", (payload) => {
console.log("Change received!", payload);
const doc = payload.new;
pullStream$.next({
checkpoint: {
id: doc.id,
updated: doc.updated,
},
documents: [doc] as any,
});
})
.subscribe((status: string) => {
console.log("STATUS changed");
console.dir(status);
if (status === "SUBSCRIBED") {
pullStream$.next("RESYNC");
}
});
const replicationState = await replicateRxCollection({
collection: database.blocks,
replicationIdentifier: "supabase-replication-to-" + SUPABASE_URL,
deletedField: "archived",
pull: {
handler: blockPullHandler as any,
stream$: pullStream$.asObservable(),
batchSize: 10,
},
push: {
batchSize: 1,
handler: blockPushHandler as any,
},
});
replicationState.error$.subscribe((err) => {
console.error("## replicationState.error$:");
console.log(err);
});
return replicationState;
}
blockPullHandler:
export const blockPullHandler = async (
lastCheckpoint: any,
batchSize: number
) => {
const minTimestamp = lastCheckpoint ? lastCheckpoint.updated : 0;
console.log("Pulling data", batchSize, lastCheckpoint);
const { data, error } = await supabaseClient
.from(DbTables.Block)
.select()
.gt("updated", minTimestamp)
.order("updated", { ascending: true })
.limit(batchSize);
if (error) {
console.log(error);
throw error;
}
const docs: Array<Block> = data;
return {
documents: docs,
hasMoreDocuments: false,
checkpoint:
docs.length === 0
? lastCheckpoint
: {
id: lastOfArray(docs).id,
updated: lastOfArray(docs).updated,
},
};
};
blockPushHandler:
export const blockPushHandler = async (
rows: RxReplicationWriteToMasterRow<RxBlockDocumentType>[]
) => {
if (rows.length !== 1) {
throw new Error("# pushHandler(): too many push documents");
}
const row = rows[0];
const oldDoc: any = row.assumedMasterState;
const doc: Block = row.newDocumentState;
console.log(row, oldDoc, doc);
// insert
if (!row.assumedMasterState) {
const { error } = await supabaseClient.from(DbTables.Block).insert([doc]);
console.log("Error 1", error);
if (error) {
// we have an insert conflict
const conflictDocRes: any = await supabaseClient
.from(DbTables.Block)
.select()
.eq("id", doc.id)
.limit(1);
return [conflictDocRes.data[0]];
} else {
return [];
}
}
// update
console.log("pushHandler(): is update");
const { data, error } = await supabaseClient
.from(DbTables.Block)
.update(doc)
.match({
id: doc.id,
replicationRevision: oldDoc.replicationRevision,
});
console.log("Error 2", error);
if (error) {
console.log("pushHandler(): error:");
console.log(error);
console.log(data);
throw error;
}
console.log("update response:");
console.log(data);
if (data.length === 0) {
// we have an updated conflict
const conflictDocRes: any = await supabaseClient
.from(DbTables.Block)
.select()
.eq("id", doc.id)
.limit(1);
return [conflictDocRes.data[0]];
}
return [];
};
But the issue is when I start the application and the pull handler is called correctly but it doesn't stop calling the pull handler and it sends continuous request one after another even after it has fetched the documents even when I set hasMoreDocuments
to false
It keeps sending requests and running the replicator. Is there something wrong with my configuration?
database.ts:
export const createDatabase = async () => {
const database = await createRxDatabase({
name: "sundaedb",
storage: getRxStorageDexie(),
});
await database.addCollections({
blocks: {
schema: blockSchema as any,
conflictHandler: conflictHandler as any,
},
documents: {
schema: documentSchema as any,
conflictHandler: conflictHandler as any,
},
});
database.blocks.preInsert((docData) => {
docData.replicationRevision = createRevision(
database.hashFunction,
docData as any
);
return docData;
}, false);
database.blocks.preRemove((docData) => {
console.log(" PRE REMOVE !!");
console.log(JSON.stringify(docData, null, 4));
const oldRevHeight = parseRevision(docData.replicationRevision).height;
docData.replicationRevision =
oldRevHeight + 1 + "-" + database.hashFunction(JSON.stringify(docData));
console.log(JSON.stringify(docData, null, 4));
return docData;
}, false);
database.blocks.preSave((docData) => {
const oldRevHeight = parseRevision(docData.replicationRevision).height;
docData.replicationRevision =
oldRevHeight + 1 + "-" + database.hashFunction(JSON.stringify(docData));
return docData;
}, false);
return database;
};