I'd like to create a user-defined aggregate to implement reservoir sampling. Here are my codes. I want to use the bytea to store the address of the struct pointer which stores the state of the reservoir sampling. However, there are some bugs.
For the SQL:
CREATE FUNCTION res_trans_crimes_c(bytea, bigint)
RETURNS bytea
AS 'MODULE_PATHNAME', 'res_trans_crimes_c'
LANGUAGE C
IMMUTABLE
PARALLEL SAFE;
CREATE FUNCTION finalize_trans_crimes_c(bytea)
RETURNS ArrayType
AS 'MODULE_PATHNAME','finalize_trans_crimes_c'
LANGUAGE C
IMMUTABLE
PARALLEL SAFE;
CREATE AGGREGATE reservoir_sampling_c(bigint)
(
sfunc = res_trans_crimes_c,
stype = bytea,
FINALFUNC = finalize_trans_crimes_c,
INITCOND='{}'
); //create the user-defined aggregate
For the C code:
PG_MODULE_MAGIC;
typedef struct state_c
{
ArrayType *reservoir;
int32 poscnt;
int32 reservoir_size;
} state_c; //create the struct to store the state
static ArrayType *MyNew_intArrayType(int num);
PG_FUNCTION_INFO_V1(res_trans_crimes_c);
Datum
res_trans_crimes_c(PG_FUNCTION_ARGS)
{
bytea *addr = (bytea *) PG_GETARG_BYTEA_P(0);
// get the bytea which contains the address of struct pointer
int64 newsample = PG_GETARG_INT64(1);// get the input data
state_c *s = palloc0 (sizeof(state_c));
elog(INFO, "1");
if(PG_ARGISNULL(0)) { // for the first time call the function, initialize the bytea.
elog(INFO, "2");
state_c *st0 = palloc0 (sizeof(state_c));
ArrayType *a = MyNew_intArrayType(100);
//addr = palloc0 (sizeof(bytea));
//state_c **pp = &st0;
addr = (bytea *) palloc(sizeof(st0) + sizeof(bytea));
//memcpy(addr->vl_dat,pp,sizeof(st0));
//memcpy(VARDATA(addr),pp,sizeof(st0));
SET_VARSIZE(addr,sizeof(st0)+sizeof(bytea));
st0->poscnt = 1;
st0->reservoir_size = 3;
st0->reservoir = a;
sprintf(VARDATA(addr), "%p", (void*) st0);
// copy the address of struct pointer to bytea
}
//sprintf((void*) s, "%p", VARDATA(addr));
//sscanf(addr->vl_dat, "%p", (void**)&s);
sscanf(VARDATA(addr), "%p", (void**) &s);
// I tried to copy the address of pointer stored in the bytea to struct to get the information store in the struct
elog(INFO, "3");
//memcpy(pp2,addr->vl_dat,sizeof(*pp2));
//memcpy(pp2,VARDATA(addr),sizeof(*pp2));
if(s->poscnt <= s->reservoir_size){
int32 p = s->poscnt;
int64 *dr = (int64 *) ARR_DATA_PTR(s->reservoir);
dr[p-1] = newsample;
s->poscnt ++;
}else{
int32 pos = rand() % s->poscnt ; //0 - postcnt -1
if(pos < s->reservoir_size){
int64 *dr = (int64 *) ARR_DATA_PTR(s->reservoir);
dr[pos] = newsample;
}
s->poscnt ++;
} // this is reservoir sampling alogorithm
//pfree(s);
PG_RETURN_BYTEA_P(addr);
}
I tried to use sprintf and sscanf to pass the address of struct pointer to bytea. sprintf works. But sscanf seems failed. And I tried memcpy but it also failed.
What's more, my if(PG_ARGISNULL(0)) condition seems failed. is the problem with my initcond? Here is the final function:
PG_FUNCTION_INFO_V1(finalize_trans_crimes_c);
Datum
finalize_trans_crimes_c(PG_FUNCTION_ARGS)
{
ArrayType *result;
Datum *elems;
int i;
int num;
int64 *dr;
state_c *st = palloc0 (sizeof(state_c));
bytea *addr = (bytea *) PG_GETARG_BYTEA_P(0);
//st->reservoir = MyNew_intArrayType(100);
state_c **pp2 = &st;
//memcpy(pp2,addr->vl_dat,sizeof(*pp2));
memcpy(pp2,VARDATA(addr),sizeof(*pp2));
//sscanf(VARDATA(addr), "%p", (void**)&st);
num = st->reservoir_size;
dr = (int64 *) ARR_DATA_PTR(st->reservoir);
elems = (Datum *)palloc(num * sizeof(Datum));
for (i = 0; i < num; i++) {
elems[i] = dr[i];
}
result = construct_array(elems, num , INT8OID, 8, true, 'd');
PG_RETURN_ARRAYTYPE_P(result);
//return the arraytype
}
static
ArrayType *
MyNew_intArrayType(int num)
{
ArrayType *r;
int nbytes;
/* if no elements, return a zero-dimensional array */
if (num <= 0)
{
Assert(num == 0);
r = construct_empty_array(INT8OID);
return r;
}
nbytes = ARR_OVERHEAD_NONULLS(1) + sizeof(int) * num;
r = (ArrayType *) palloc0(nbytes);
SET_VARSIZE(r, nbytes);
ARR_NDIM(r) = 1;
r->dataoffset = 0; /* marker for no null bitmap */
ARR_ELEMTYPE(r) = INT8OID;
ARR_DIMS(r)[0] = num;
ARR_LBOUND(r)[0] = 1;
return r;
}
Another question, I can not stop the code at my breakpoint using gdb
. It shows Error in re-setting breakpoint 1: Cannot access memory at address 0xb16.
Thank you!