2

I'm working with rxdb and I have pull and push handlers for the backend I have used supabase

I have setup the code for replication as follows:

replication.ts

import { RxDatabase } from "rxdb";
import { RxReplicationPullStreamItem } from "rxdb/dist/types/types";
import { replicateRxCollection } from "rxdb/plugins/replication";
import { Subject } from "rxjs";
import { supabaseClient, SUPABASE_URL } from "src/config/supabase";
import { DbTables } from "src/constants/db";
import {
  blockPullHandler,
  blockPushHandler,
} from "./repilicationhandlers/block";
import { CheckpointType, RxBlockDocument, RxBlocksCollections } from "./types";

export async function startReplication(
  database: RxDatabase<RxBlocksCollections>
) {
  const pullStream$ = new Subject<
    RxReplicationPullStreamItem<RxBlockDocument, CheckpointType>
  >();

  supabaseClient
    .from(DbTables.Block)
    .on("*", (payload) => {
      console.log("Change received!", payload);
      const doc = payload.new;
      pullStream$.next({
        checkpoint: {
          id: doc.id,
          updated: doc.updated,
        },
        documents: [doc] as any,
      });
    })
    .subscribe((status: string) => {
      console.log("STATUS changed");
      console.dir(status);
      if (status === "SUBSCRIBED") {
        pullStream$.next("RESYNC");
      }
    });

  const replicationState = await replicateRxCollection({
    collection: database.blocks,
    replicationIdentifier: "supabase-replication-to-" + SUPABASE_URL,
    deletedField: "archived",
    pull: {
      handler: blockPullHandler as any,
      stream$: pullStream$.asObservable(),
      batchSize: 10,
    },
    push: {
      batchSize: 1,
      handler: blockPushHandler as any,
    },
  });

  replicationState.error$.subscribe((err) => {
    console.error("## replicationState.error$:");
    console.log(err);
  });

  return replicationState;
}

blockPullHandler:

export const blockPullHandler = async (
  lastCheckpoint: any,
  batchSize: number
) => {
  const minTimestamp = lastCheckpoint ? lastCheckpoint.updated : 0;
  console.log("Pulling data", batchSize, lastCheckpoint);
  const { data, error } = await supabaseClient
    .from(DbTables.Block)
    .select()
    .gt("updated", minTimestamp)
    .order("updated", { ascending: true })
    .limit(batchSize);

  if (error) {
    console.log(error);
    throw error;
  }
  const docs: Array<Block> = data;

  return {
    documents: docs,
    hasMoreDocuments: false,
    checkpoint:
      docs.length === 0
        ? lastCheckpoint
        : {
            id: lastOfArray(docs).id,
            updated: lastOfArray(docs).updated,
          },
  };
};

blockPushHandler:

export const blockPushHandler = async (
  rows: RxReplicationWriteToMasterRow<RxBlockDocumentType>[]
) => {
  if (rows.length !== 1) {
    throw new Error("# pushHandler(): too many push documents");
  }

  const row = rows[0];
  const oldDoc: any = row.assumedMasterState;
  const doc: Block = row.newDocumentState;

  console.log(row, oldDoc, doc);

  // insert
  if (!row.assumedMasterState) {
    const { error } = await supabaseClient.from(DbTables.Block).insert([doc]);
    console.log("Error 1", error);
    if (error) {
      // we have an insert conflict
      const conflictDocRes: any = await supabaseClient
        .from(DbTables.Block)
        .select()
        .eq("id", doc.id)
        .limit(1);
      return [conflictDocRes.data[0]];
    } else {
      return [];
    }
  }
  // update
  console.log("pushHandler(): is update");
  const { data, error } = await supabaseClient
    .from(DbTables.Block)
    .update(doc)
    .match({
      id: doc.id,
      replicationRevision: oldDoc.replicationRevision,
    });
  console.log("Error 2", error);

  if (error) {
    console.log("pushHandler(): error:");
    console.log(error);
    console.log(data);
    throw error;
  }
  console.log("update response:");
  console.log(data);
  if (data.length === 0) {
    // we have an updated conflict
    const conflictDocRes: any = await supabaseClient
      .from(DbTables.Block)
      .select()
      .eq("id", doc.id)
      .limit(1);
    return [conflictDocRes.data[0]];
  }
  return [];
};

But the issue is when I start the application and the pull handler is called correctly but it doesn't stop calling the pull handler and it sends continuous request one after another even after it has fetched the documents even when I set hasMoreDocuments to false It keeps sending requests and running the replicator. Is there something wrong with my configuration?

database.ts:

export const createDatabase = async () => {
  const database = await createRxDatabase({
    name: "sundaedb",
    storage: getRxStorageDexie(),
  });

  await database.addCollections({
    blocks: {
      schema: blockSchema as any,
      conflictHandler: conflictHandler as any,
    },
    documents: {
      schema: documentSchema as any,
      conflictHandler: conflictHandler as any,
    },
  });

  database.blocks.preInsert((docData) => {
    docData.replicationRevision = createRevision(
      database.hashFunction,
      docData as any
    );
    return docData;
  }, false);

  database.blocks.preRemove((docData) => {
    console.log(" PRE REMOVE !!");
    console.log(JSON.stringify(docData, null, 4));
    const oldRevHeight = parseRevision(docData.replicationRevision).height;
    docData.replicationRevision =
      oldRevHeight + 1 + "-" + database.hashFunction(JSON.stringify(docData));
    console.log(JSON.stringify(docData, null, 4));
    return docData;
  }, false);

  database.blocks.preSave((docData) => {
    const oldRevHeight = parseRevision(docData.replicationRevision).height;
    docData.replicationRevision =
      oldRevHeight + 1 + "-" + database.hashFunction(JSON.stringify(docData));
    return docData;
  }, false);

  return database;
};
Rohan Keskar18
  • 193
  • 1
  • 10
  • Are you using RxDB 13? If so, the `hasMoreDocuments` property doesn't exist anymore. What you have to do is return 0 documents for it to stop pulling. – pianomansam Nov 16 '22 at 20:24

0 Answers0