0

I'd like to create a user-defined aggregate to implement reservoir sampling. Here are my codes. I want to use the bytea to store the address of the struct pointer which stores the state of the reservoir sampling. However, there are some bugs.

For the SQL:

CREATE FUNCTION res_trans_crimes_c(bytea, bigint)
        RETURNS bytea
        AS 'MODULE_PATHNAME', 'res_trans_crimes_c'
        LANGUAGE C
        IMMUTABLE 
        PARALLEL SAFE;

CREATE FUNCTION finalize_trans_crimes_c(bytea)
        RETURNS ArrayType
        AS 'MODULE_PATHNAME','finalize_trans_crimes_c'
        LANGUAGE C
        IMMUTABLE 
        PARALLEL SAFE;

CREATE AGGREGATE reservoir_sampling_c(bigint)
(
        sfunc = res_trans_crimes_c,
        stype = bytea,
        FINALFUNC = finalize_trans_crimes_c,
        INITCOND='{}'
); //create the user-defined aggregate

For the C code:

PG_MODULE_MAGIC;

typedef struct state_c
{
    ArrayType *reservoir;
        int32 poscnt;
        int32 reservoir_size; 
} state_c; //create the struct to store the state 

static ArrayType *MyNew_intArrayType(int num);

PG_FUNCTION_INFO_V1(res_trans_crimes_c);
Datum
res_trans_crimes_c(PG_FUNCTION_ARGS)
{

    bytea  *addr = (bytea *) PG_GETARG_BYTEA_P(0);
    // get the bytea which contains the address of struct pointer
    int64 newsample = PG_GETARG_INT64(1);// get the input data
    state_c *s = palloc0 (sizeof(state_c)); 
    elog(INFO, "1");
    if(PG_ARGISNULL(0)) { // for the first time call the function, initialize the bytea.
                elog(INFO, "2");
                state_c *st0 = palloc0 (sizeof(state_c));
                ArrayType *a = MyNew_intArrayType(100);
                //addr = palloc0 (sizeof(bytea));
                //state_c **pp = &st0; 
                addr = (bytea *) palloc(sizeof(st0) + sizeof(bytea));
                //memcpy(addr->vl_dat,pp,sizeof(st0));
                //memcpy(VARDATA(addr),pp,sizeof(st0));
                SET_VARSIZE(addr,sizeof(st0)+sizeof(bytea));
            
               st0->poscnt = 1;
               st0->reservoir_size = 3;
               st0->reservoir = a;
               sprintf(VARDATA(addr), "%p", (void*) st0);
                // copy the address of struct pointer to bytea    
    }
       
        //sprintf((void*) s, "%p", VARDATA(addr));
        //sscanf(addr->vl_dat, "%p", (void**)&s); 
        sscanf(VARDATA(addr), "%p", (void**) &s); 
       // I tried to copy the address of pointer stored in the bytea to struct to get the information store in the struct
        elog(INFO, "3");
        //memcpy(pp2,addr->vl_dat,sizeof(*pp2));
        //memcpy(pp2,VARDATA(addr),sizeof(*pp2));
        
        if(s->poscnt <= s->reservoir_size){
            int32 p = s->poscnt;
                int64 *dr = (int64 *) ARR_DATA_PTR(s->reservoir);
                dr[p-1] = newsample;
            s->poscnt ++;

        }else{
            int32 pos = rand() % s->poscnt ; //0 - postcnt -1
            if(pos < s->reservoir_size){
                        int64 *dr = (int64 *) ARR_DATA_PTR(s->reservoir);
                        dr[pos] = newsample;
            }
            s->poscnt ++;
        } // this is reservoir sampling alogorithm
        //pfree(s);
        PG_RETURN_BYTEA_P(addr);
}

I tried to use sprintf and sscanf to pass the address of struct pointer to bytea. sprintf works. But sscanf seems failed. And I tried memcpy but it also failed.

enter image description here

What's more, my if(PG_ARGISNULL(0)) condition seems failed. is the problem with my initcond? Here is the final function:

PG_FUNCTION_INFO_V1(finalize_trans_crimes_c);
Datum
finalize_trans_crimes_c(PG_FUNCTION_ARGS)
{               

                ArrayType *result;
                Datum *elems;
                int i;
                int num;
                int64 *dr;

                state_c *st = palloc0 (sizeof(state_c));
                bytea  *addr = (bytea *) PG_GETARG_BYTEA_P(0);
                
                //st->reservoir = MyNew_intArrayType(100);
                state_c **pp2 = &st; 
                //memcpy(pp2,addr->vl_dat,sizeof(*pp2));
                memcpy(pp2,VARDATA(addr),sizeof(*pp2));
                //sscanf(VARDATA(addr), "%p", (void**)&st);
                num = st->reservoir_size;
                dr = (int64 *) ARR_DATA_PTR(st->reservoir); 
                
                elems = (Datum *)palloc(num * sizeof(Datum));
                
                for (i = 0; i < num; i++) {
                        elems[i] = dr[i]; 
                }

                result = construct_array(elems, num , INT8OID, 8, true, 'd');
                PG_RETURN_ARRAYTYPE_P(result);
                //return the arraytype
}

static
ArrayType *
MyNew_intArrayType(int num)
{
        ArrayType  *r;
        int nbytes;

        /* if no elements, return a zero-dimensional array */
        if (num <= 0)
        {
                Assert(num == 0);
                r = construct_empty_array(INT8OID);
                return r;
        }

        nbytes = ARR_OVERHEAD_NONULLS(1) + sizeof(int) * num;

        r = (ArrayType *) palloc0(nbytes);

        SET_VARSIZE(r, nbytes);
        ARR_NDIM(r) = 1;
        r->dataoffset = 0;                      /* marker for no null bitmap */
        ARR_ELEMTYPE(r) = INT8OID;
        ARR_DIMS(r)[0] = num;
        ARR_LBOUND(r)[0] = 1;

        return r;
}

Another question, I can not stop the code at my breakpoint using gdb enter image description here. It shows Error in re-setting breakpoint 1: Cannot access memory at address 0xb16. Thank you!

Leo
  • 1
  • 1
  • I don't know what you are trying to do, but it looks like you should use `internal` rather than `bytea`. – Laurenz Albe Feb 24 '23 at 15:05
  • 1
    All your commented out code looks like the detritus of failed past attempts. Are they supposed to be useful to us? You should remove them, and add comments which might tell us what is supposed to be going on. – jjanes Feb 24 '23 at 16:13

0 Answers0