Pancakes:IPC EntryBased
This is a lock-free implementation of an entry based ring buffer. It supports message sizes up to 16KB using a 16-bit header, but you can easily change it to support a 24-bit or 32-bit header. Only one reader and one writer are supported.
I have tested this code fairly well, and there appears to be no bugs. To get a major increase in speed you can return a pointer to the inside of the buffer for reads but that is not secure unless you trust the writer.
This is designed to be used for inter-process communication as if you have threads sharing the same address space you might find a faster method would be to have a ring buffer of static pointers and use those to point to memory and use flags to determine if the memory should be freed after the reader uses it because this would eliminate the need for the writer to copy data into the buffer.
Example Usage
RBM rbm;
char buf[128];
uintptr sz;
rbm.sz = 2048 - sizeof(RB);
rbm.rb = (RB*)malloc(2048);
rb_write_nbio(&rbm, &buf[0], 128);
sz = 128;
rb_read_nbio(&rbm, &buf[0], &sz, 0);
The code above will write to the ring buffer, and then read from it. You can have two separate threads with one reading and one writing. For bi-directional communication just use two different buffers.
Code
#ifdef B64
typedef unsigned long long uintptr;
#else
typedef unsigned int uintptr;
#endif
typedef unsigned long long uint64;
typedef unsigned int uint32;
typedef unsigned char uint8;
typedef unsigned short uint16;
typedef int int32;
#endif
struct _RBE {
int32 w;
int32 r;
uint8 d[];
};
typedef struct _RBE RB;
struct _RBME {
RB *rb;
int32 sz;
};
typedef struct _RBME RBM;
int rb_write_nbio(RBM *rbm, void *p, uint32 sz) {
RB volatile *rb;
int32 r;
int32 w;
uint32 *h;
int32 x, y;
int32 asz;
int32 max;
rb = (RB volatile*)rbm->rb;
sz = sz & 0xffff;
r = rb->r;
w = rb->w;
if (r > rbm->sz) {
return 0;
}
if (w > rbm->sz) {
return 0;
}
/* calculate total size including 16-bit length header */
asz = sz + 2 + 2;
/* not enough space */
if ((w < r) && (w + asz) >= r) {
return 0;
}
/* not enough space */
if ((w >= r) && ((rbm->sz - w) + r) < asz) {
return 0;
}
/* write length */
rb->d[w++] = sz >> 8;
if (w >= rbm->sz) {
w = 0;
}
rb->d[w++] = sz & 0xff;
if (w >= rbm->sz){
w = 0;
}
/* split write */
max = rbm->sz - w;
if (w >= r && max < sz) {
/* copy first part */
for (x = 0; x < max; ++x) {
rb->d[w + x] = ((uint8*)p)[x];
}
/* copy last part */
for (y = 0; x < sz; ++x, ++y) {
rb->d[y] = ((uint8*)p)[x];
}
rb->w = (w + sz) - rbm->sz;
return 1;
}
/* straight write */
for (x = 0; x < sz; ++x) {
rb->d[w + x] = ((uint8*)p)[x];
}
/*
split read wont leave 'w' == rbm->sz but this will so we have
to check for it and correct it else it messed up the reader
getting them off-track and essentially making communications
hard to reliably recover if not impossible
*/
if (w + sz == rbm->sz) {
rb->w = 0;
} else {
rb->w = w + sz;
}
return 1;
}
int rb_read_nbio(RBM *rbm, void *p, uint32 *sz, uint32 *advance) {
RB volatile *rb;
int32 r;
int32 w;
int32 h;
int32 x, y;
uint8 *_p;
int32 max;
_p = (uint8*)p;
rb = (RB volatile*)rbm->rb;
r = rb->r;
w = rb->w;
if (advance) {
r = *advance;
} else {
if (r > rbm->sz) {
/* bad header */
return 0;
}
}
if (w > rbm->sz) {
/* bad header */
return 0;
}
if (w == r) {
return 0;
}
/* read size (tricky) */
h = rb->d[r++] << 8;
if (r == w) {
return 0;
}
if (r >= rbm->sz) {
r = 0;
}
h |= rb->d[r++];
if (r == w) {
return 0;
}
if (r >= rbm->sz) {
r = 0;
}
if (h > (rbm->sz - r) + r) {
return -1;
}
if (h > *sz) {
return -1;
}
*sz = h;
/* split read */
if (r + h >= rbm->sz) {
max = rbm->sz - r;
for (x = 0; x < max; ++x) {
*(_p++) = rb->d[r + x];
}
max = h - (rbm->sz - r);
for (x = 0; x < max; ++x) {
*(_p++) = rb->d[x];
}
if (advance) {
*advance = (r + h) - rbm->sz;
} else {
rb->r = (r + h) - rbm->sz;
}
return 1;
}
/* straight read */
for (x = 0; x < h; ++x) {
*(_p++) = rb->d[r + x];
}
if (advance) {
*advance = r + h;
} else {
rb->r = r + h;
}
return 1;
}