ref: 401b33384215ea3f7e6a89e61f75e8acb35a3cc4
parent: edcf7d821652f17d23641dc193f769eb1e02be77
author: cinap_lenrek <cinap_lenrek@felloff.net>
date: Sun Mar 31 12:11:31 EDT 2024
qio: sync with 9front
--- a/kern/fns.h
+++ b/kern/fns.h
@@ -179,7 +179,6 @@
void qreopen(Queue*);
void qsetlimit(Queue*, int);
void qunlock(QLock*);
-int qwindow(Queue*);
int qwrite(Queue*, void*, int);
void qnoblock(Queue*, int);
void randominit(void);
--- a/kern/qio.c
+++ b/kern/qio.c
@@ -4,13 +4,6 @@
#include "fns.h"
#include "error.h"
-static ulong padblockcnt;
-static ulong concatblockcnt;
-static ulong pullupblockcnt;
-static ulong copyblockcnt;
-static ulong consumecnt;
-static ulong producecnt;
-
#define QDEBUG if(0)
/*
@@ -17,28 +10,28 @@
* IO queues
*/
typedef struct Queue Queue;
-
struct Queue
{
Lock lk;
+ int state;
+ int dlen; /* data length in bytes */
+ uint rp, wp; /* read/write position (counting BALLOC() bytes) */
+ int limit; /* max BALLOC() bytes in queue */
+ int inilim; /* initial limit */
+ uchar noblock; /* true if writes return immediately when q full */
+ uchar eof; /* number of eofs read by user */
+
Block* bfirst; /* buffer */
Block* blast;
- int len; /* bytes allocated to queue */
- int dlen; /* data bytes in queue */
- int limit; /* max bytes in queue */
- int inilim; /* initial limit */
- int state;
- int noblock; /* true if writes return immediately when q full */
- int eof; /* number of eofs read by user */
-
+ void* arg; /* argument to kick and bypass */
void (*kick)(void*); /* restart output */
void (*bypass)(void*, Block*); /* bypass queue altogether */
- void* arg; /* argument to kick */
QLock rlock; /* mutex for reading processes */
Rendez rr; /* process waiting to read */
+
QLock wlock; /* mutex for writing processes */
Rendez wr; /* process waiting to write */
@@ -68,44 +61,6 @@
}
/*
- * pad a block to the front (or the back if size is negative)
- */
-Block*
-padblock(Block *bp, int size)
-{
- int n;
- Block *nbp;
-
- QDEBUG checkb(bp, "padblock 0");
- if(size >= 0){
- if(bp->rp - bp->base >= size){
- bp->rp -= size;
- return bp;
- }
- n = BLEN(bp);
- nbp = allocb(size+n);
- nbp->rp += size;
- nbp->wp = nbp->rp;
- memmove(nbp->wp, bp->rp, n);
- nbp->wp += n;
- nbp->rp -= size;
- } else {
- size = -size;
- if(bp->lim - bp->wp >= size)
- return bp;
- n = BLEN(bp);
- nbp = allocb(n+size);
- memmove(nbp->wp, bp->rp, n);
- nbp->wp += n;
- }
- nbp->next = bp->next;
- freeb(bp);
- padblockcnt++;
- QDEBUG checkb(nbp, "padblock 1");
- return nbp;
-}
-
-/*
* return count of bytes in a string of blocks
*/
int
@@ -122,19 +77,33 @@
}
/*
- * return count of space in blocks
+ * copy the contents of a string of blocks into
+ * memory from an offset. blocklist kept unchanged.
+ * return number of copied bytes.
*/
-int
-blockalloclen(Block *bp)
+long
+readblist(Block *b, uchar *p, long n, ulong o)
{
- int len;
+ ulong m, r;
- len = 0;
- while(bp != nil) {
- len += BALLOC(bp);
- bp = bp->next;
+ r = 0;
+ while(n > 0 && b != nil){
+ m = BLEN(b);
+ if(o >= m)
+ o -= m;
+ else {
+ m -= o;
+ if(n < m)
+ m = n;
+ memmove(p, b->rp + o, m);
+ p += m;
+ r += m;
+ n -= m;
+ o = 0;
+ }
+ b = b->next;
}
- return len;
+ return r;
}
/*
@@ -149,7 +118,6 @@
if(bp->next == nil)
return bp;
len = blocklen(bp);
- concatblockcnt += len;
return pullupblock(bp, len);
}
@@ -162,6 +130,8 @@
Block *nbp;
int i;
+ assert(n >= 0);
+
/*
* this should almost always be true, it's
* just to avoid every caller checking.
@@ -184,7 +154,6 @@
*/
n -= BLEN(bp);
while((nbp = bp->next) != nil){
- pullupblockcnt++;
i = BLEN(nbp);
if(i > n) {
memmove(bp->wp, nbp->rp, n);
@@ -223,6 +192,8 @@
{
Block *b;
+ assert(n >= 0);
+
if(BLEN(q->bfirst) >= n)
return q->bfirst;
q->bfirst = pullupblock(q->bfirst, n);
@@ -241,8 +212,14 @@
ulong l;
Block *nb, *startb;
+ assert(len >= 0);
+ assert(offset >= 0);
+
QDEBUG checkb(bp, "trimblock 1");
- if(blocklen(bp) < offset+len) {
+ l = blocklen(bp);
+ if(offset == 0 && len == l)
+ return bp;
+ if(l < offset+len) {
freeblist(bp);
return nil;
}
@@ -274,6 +251,43 @@
}
/*
+ * pad a block to the front (or the back if size is negative)
+ */
+Block*
+padblock(Block *bp, int size)
+{
+ int n;
+ Block *nbp;
+
+ QDEBUG checkb(bp, "padblock 0");
+ if(size >= 0){
+ if(bp->rp - bp->base >= size){
+ bp->rp -= size;
+ return bp;
+ }
+ n = BLEN(bp);
+ nbp = allocb(size+n);
+ nbp->rp += size;
+ nbp->wp = nbp->rp;
+ memmove(nbp->wp, bp->rp, n);
+ nbp->wp += n;
+ nbp->rp -= size;
+ } else {
+ size = -size;
+ if(bp->lim - bp->wp >= size)
+ return bp;
+ n = BLEN(bp);
+ nbp = allocb(n+size);
+ memmove(nbp->wp, bp->rp, n);
+ nbp->wp += n;
+ }
+ nbp->next = bp->next;
+ freeb(bp);
+ QDEBUG checkb(nbp, "padblock 1");
+ return nbp;
+}
+
+/*
* copy 'count' bytes into a new block
*/
Block*
@@ -282,6 +296,8 @@
int l;
Block *nbp;
+ assert(count >= 0);
+
QDEBUG checkb(bp, "copyblock 0");
nbp = allocb(count);
for(; count > 0 && bp != nil; bp = bp->next){
@@ -296,7 +312,6 @@
memset(nbp->wp, 0, count);
nbp->wp += count;
}
- copyblockcnt++;
QDEBUG checkb(nbp, "copyblock 1");
return nbp;
@@ -330,7 +345,30 @@
return bp;
}
+/*
+ * if the allocated space is way out of line with the used
+ * space, reallocate to a smaller block
+ */
+Block*
+packblock(Block *bp)
+{
+ Block **l, *nbp;
+ int n;
+ for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
+ n = BLEN(nbp);
+ if((n<<2) < BALLOC(nbp)){
+ *l = allocb(n);
+ memmove((*l)->wp, nbp->rp, n);
+ (*l)->wp += n;
+ (*l)->next = nbp->next;
+ freeb(nbp);
+ }
+ }
+
+ return bp;
+}
+
/*
* throw away up to count bytes from a
* list of blocks. Return count of bytes
@@ -346,8 +384,8 @@
if(bph == nil)
return 0;
- while(*bph != nil && count != 0) {
- bp = *bph;
+ while((bp = *bph) != nil && count > 0) {
+ QDEBUG checkb(bp, "pullblock ");
n = BLEN(bp);
if(count < n)
n = count;
@@ -354,7 +392,6 @@
bytes += n;
count -= n;
bp->rp += n;
- QDEBUG checkb(bp, "pullblock ");
if(BLEN(bp) == 0) {
*bph = bp->next;
bp->next = nil;
@@ -365,103 +402,145 @@
}
/*
- * get next block from a queue, return null if nothing there
+ * remove a block from the front of the queue
*/
Block*
-qget(Queue *q)
+qremove(Queue *q)
{
- int dowakeup;
Block *b;
- /* sync with qwrite */
- ilock(&q->lk);
-
b = q->bfirst;
- if(b == nil){
- q->state |= Qstarve;
- iunlock(&q->lk);
+ if(b == nil)
return nil;
- }
- QDEBUG checkb(b, "qget");
+ QDEBUG checkb(b, "qremove");
q->bfirst = b->next;
b->next = nil;
- q->len -= BALLOC(b);
q->dlen -= BLEN(b);
+ q->rp += BALLOC(b);
+ return b;
+}
- /* if writer flow controlled, restart */
- if((q->state & Qflow) && q->len < q->limit/2){
- q->state &= ~Qflow;
- dowakeup = 1;
- } else
- dowakeup = 0;
+/*
+ * put a block back to the front of the queue
+ */
+void
+qputback(Queue *q, Block *b)
+{
+ QDEBUG checkb(b, "qputback");
+ b->next = q->bfirst;
+ if(q->bfirst == nil)
+ q->blast = b;
+ q->bfirst = b;
+ q->dlen += BLEN(b);
+ q->rp -= BALLOC(b);
+}
+/*
+ * after removing data from the queue,
+ * unlock queue and wakeup blocked writer.
+ * called at interrupt level.
+ */
+static int
+iunlock_consumer(Queue *q)
+{
+ int s = q->state;
+
+ /* stop flow control when back at or below the limit */
+ if((int)(q->wp - q->rp) <= q->limit)
+ q->state = s & ~Qflow;
+
iunlock(&q->lk);
- if(dowakeup)
+ if(s & Qflow){
+ /*
+ * wakeup flow controlled writers.
+ * note that this is done even when q->state
+ * still has Qflow set, as the unblocking
+ * condition depends on the writers local queuing
+ * position, not on the global queue length.
+ */
wakeup(&q->wr);
-
- return b;
+ }
+ return s;
}
/*
- * throw away the next 'len' bytes in the queue
+ * after removing data from the queue,
+ * unlock queue and wakeup blocked writer.
+ * get output going again when it was blocked.
+ * called at process level.
*/
-int
-qdiscard(Queue *q, int len)
+static int
+iunlock_reader(Queue *q)
{
- Block *b, *tofree = nil;
- int dowakeup, n, sofar;
+ int s = iunlock_consumer(q);
- ilock(&q->lk);
- for(sofar = 0; sofar < len; sofar += n){
- b = q->bfirst;
- if(b == nil)
- break;
- QDEBUG checkb(b, "qdiscard");
- n = BLEN(b);
- if(n <= len - sofar){
- q->bfirst = b->next;
- q->len -= BALLOC(b);
- q->dlen -= BLEN(b);
+ if(q->kick != nil && s & Qflow)
+ (*q->kick)(q->arg);
- /* remember to free this */
- b->next = tofree;
- tofree = b;
- } else {
- n = len - sofar;
- b->rp += n;
- q->dlen -= n;
- }
- }
+ return s;
+}
- /*
- * if writer flow controlled, restart
- *
- * This used to be
- * q->len < q->limit/2
- * but it slows down tcp too much for certain write sizes.
- * I really don't understand it completely. It may be
- * due to the queue draining so fast that the transmission
- * stalls waiting for the app to produce more data. - presotto
- */
- if((q->state & Qflow) && q->len < q->limit){
- q->state &= ~Qflow;
- dowakeup = 1;
- } else
- dowakeup = 0;
+/*
+ * after inserting into queue,
+ * unlock queue and wakeup starved reader.
+ * called at interrupt level.
+ */
+static int
+iunlock_producer(Queue *q)
+{
+ int s = q->state;
+ /* start flow control when above the limit */
+ if((int)(q->wp - q->rp) > q->limit)
+ s |= Qflow;
+
+ q->state = s & ~Qstarve;
iunlock(&q->lk);
- if(dowakeup)
- wakeup(&q->wr);
+ if(s & Qstarve)
+ wakeup(&q->rr);
- if(tofree != nil)
- freeblist(tofree);
+ return s;
+}
- return sofar;
+/*
+ * unlock queue and wakeup starved reader.
+ * get output going again when it was starved.
+ * called at process level.
+ */
+static int
+iunlock_writer(Queue *q)
+{
+ int s = iunlock_producer(q);
+
+ if(q->kick != nil && s & (Qstarve|Qkick))
+ (*q->kick)(q->arg);
+
+ return s;
}
/*
+ * get next block from a queue, return null if nothing there
+ * called at interrupt level.
+ */
+Block*
+qget(Queue *q)
+{
+ Block *b;
+
+ ilock(&q->lk);
+ if((b = qremove(q)) == nil){
+ q->state |= Qstarve;
+ iunlock(&q->lk);
+ return nil;
+ }
+ iunlock_consumer(q);
+
+ return b;
+}
+
+/*
* Interrupt level copy out of a queue, return # bytes copied.
*/
int
@@ -468,12 +547,11 @@
qconsume(Queue *q, void *vp, int len)
{
Block *b, *tofree = nil;
- int n, dowakeup;
- uchar *p = vp;
+ int n;
- /* sync with qwrite */
- ilock(&q->lk);
+ assert(len >= 0);
+ ilock(&q->lk);
for(;;) {
b = q->bfirst;
if(b == nil){
@@ -486,8 +564,10 @@
n = BLEN(b);
if(n > 0)
break;
+
+ /* get rid of zero-length blocks */
q->bfirst = b->next;
- q->len -= BALLOC(b);
+ q->rp += BALLOC(b);
/* remember to free this */
b->next = tofree;
@@ -494,10 +574,9 @@
tofree = b;
};
- consumecnt += n;
if(n < len)
len = n;
- memmove(p, b->rp, len);
+ memmove(vp, b->rp, len);
b->rp += len;
q->dlen -= len;
@@ -504,7 +583,7 @@
/* discard the block if we're done with it */
if((q->state & Qmsg) || len == n){
q->bfirst = b->next;
- q->len -= BALLOC(b);
+ q->rp += BALLOC(b);
q->dlen -= BLEN(b);
/* remember to free this */
@@ -511,23 +590,40 @@
b->next = tofree;
tofree = b;
}
-
out:
- /* if writer flow controlled, restart */
- if((q->state & Qflow) && q->len < q->limit/2){
- q->state &= ~Qflow;
- dowakeup = 1;
- } else
- dowakeup = 0;
+ iunlock_consumer(q);
- iunlock(&q->lk);
+ freeblist(tofree);
- if(dowakeup)
- wakeup(&q->wr);
+ return len;
+}
- if(tofree != nil)
- freeblist(tofree);
+/*
+ * add a block list to a queue, return bytes added
+ */
+int
+qaddlist(Queue *q, Block *b)
+{
+ int len;
+ QDEBUG checkb(b, "qaddlist 1");
+
+ /* queue the block */
+ if(q->bfirst != nil)
+ q->blast->next = b;
+ else
+ q->bfirst = b;
+
+ len = BLEN(b);
+ q->wp += BALLOC(b);
+ while(b->next != nil){
+ b = b->next;
+ QDEBUG checkb(b, "qaddlist 2");
+ len += BLEN(b);
+ q->wp += BALLOC(b);
+ }
+ q->dlen += len;
+ q->blast = b;
return len;
}
@@ -534,36 +630,22 @@
int
qpass(Queue *q, Block *b)
{
- int len, dowakeup;
+ int len;
- /* sync with qread */
- dowakeup = 0;
ilock(&q->lk);
- if(q->len >= q->limit){
+ if(q->state & Qclosed){
iunlock(&q->lk);
freeblist(b);
- return -1;
+ return 0;
}
- if(q->state & Qclosed){
+ if(q->state & Qflow){
iunlock(&q->lk);
freeblist(b);
- return 0;
+ return -1;
}
-
len = qaddlist(q, b);
+ iunlock_producer(q);
- if(q->len >= q->limit/2)
- q->state |= Qflow;
-
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(&q->lk);
-
- if(dowakeup)
- wakeup(&q->rr);
-
return len;
}
@@ -570,101 +652,36 @@
int
qpassnolim(Queue *q, Block *b)
{
- int len, dowakeup;
+ int len;
- /* sync with qread */
- dowakeup = 0;
ilock(&q->lk);
-
if(q->state & Qclosed){
iunlock(&q->lk);
freeblist(b);
return 0;
}
-
len = qaddlist(q, b);
+ iunlock_producer(q);
- if(q->len >= q->limit/2)
- q->state |= Qflow;
-
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(&q->lk);
-
- if(dowakeup)
- wakeup(&q->rr);
-
return len;
}
-/*
- * if the allocated space is way out of line with the used
- * space, reallocate to a smaller block
- */
-Block*
-packblock(Block *bp)
-{
- Block **l, *nbp;
- int n;
-
- for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
- n = BLEN(nbp);
- if((n<<2) < BALLOC(nbp)){
- *l = allocb(n);
- memmove((*l)->wp, nbp->rp, n);
- (*l)->wp += n;
- (*l)->next = nbp->next;
- freeb(nbp);
- }
- }
-
- return bp;
-}
-
int
qproduce(Queue *q, void *vp, int len)
{
Block *b;
- int dowakeup;
- uchar *p = vp;
+ assert(len >= 0);
+
b = iallocb(len);
if(b == nil)
return 0;
- /* sync with qread */
- dowakeup = 0;
- ilock(&q->lk);
-
- /* no waiting receivers, room in buffer? */
- if(q->len >= q->limit){
- q->state |= Qflow;
- iunlock(&q->lk);
- freeb(b);
- return -1;
- }
- producecnt += len;
-
/* save in buffer */
- memmove(b->wp, p, len);
+ memmove(b->wp, vp, len);
b->wp += len;
- qaddlist(q, b);
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
-
- if(q->len >= q->limit)
- q->state |= Qflow;
- iunlock(&q->lk);
-
- if(dowakeup)
- wakeup(&q->rr);
-
- return len;
+ return qpass(q, b);
}
/*
@@ -675,6 +692,8 @@
{
Block *b;
+ assert(len >= 0);
+
b = allocb(len);
ilock(&q->lk);
b->wp += readblist(q->bfirst, b->wp, len, offset);
@@ -690,16 +709,18 @@
{
Queue *q;
+ assert(limit >= 0);
+
q = malloc(sizeof(Queue));
if(q == nil)
return nil;
+ q->dlen = 0;
+ q->wp = q->rp = 0;
q->limit = q->inilim = limit;
q->kick = kick;
q->arg = arg;
- q->state = msg;
-
- q->state |= Qstarve;
+ q->state = msg | Qstarve;
q->eof = 0;
q->noblock = 0;
@@ -716,10 +737,14 @@
if(q == nil)
return nil;
+ q->dlen = 0;
+ q->wp = q->rp = 0;
q->limit = 0;
q->arg = arg;
q->bypass = bypass;
q->state = 0;
+ q->eof = 0;
+ q->noblock = 0;
return q;
}
@@ -729,7 +754,7 @@
{
Queue *q = a;
- return (q->state & Qclosed) || q->bfirst != nil;
+ return q->bfirst != nil || (q->state & Qclosed);
}
/*
@@ -745,10 +770,9 @@
break;
if(q->state & Qclosed){
- if(++q->eof > 3)
+ if(q->eof >= 3 || (*q->err && strcmp(q->err, Ehungup) != 0))
return -1;
- if(*q->err && strcmp(q->err, Ehungup) != 0)
- return -1;
+ q->eof++;
return 0;
}
@@ -761,101 +785,6 @@
}
/*
- * add a block list to a queue, return bytes added
- */
-int
-qaddlist(Queue *q, Block *b)
-{
- int len, dlen;
-
- QDEBUG checkb(b, "qaddlist 1");
-
- /* queue the block */
- if(q->bfirst != nil)
- q->blast->next = b;
- else
- q->bfirst = b;
-
- len = BALLOC(b);
- dlen = BLEN(b);
- while(b->next != nil){
- b = b->next;
- QDEBUG checkb(b, "qaddlist 2");
-
- len += BALLOC(b);
- dlen += BLEN(b);
- }
- q->blast = b;
- q->len += len;
- q->dlen += dlen;
- return dlen;
-}
-
-/*
- * called with q ilocked
- */
-Block*
-qremove(Queue *q)
-{
- Block *b;
-
- b = q->bfirst;
- if(b == nil)
- return nil;
- QDEBUG checkb(b, "qremove");
- q->bfirst = b->next;
- b->next = nil;
- q->dlen -= BLEN(b);
- q->len -= BALLOC(b);
- return b;
-}
-
-/*
- * copy the contents of a string of blocks into
- * memory from an offset. blocklist kept unchanged.
- * return number of copied bytes.
- */
-long
-readblist(Block *b, uchar *p, long n, ulong o)
-{
- ulong m, r;
-
- r = 0;
- while(n > 0 && b != nil){
- m = BLEN(b);
- if(o >= m)
- o -= m;
- else {
- m -= o;
- if(n < m)
- m = n;
- memmove(p, b->rp + o, m);
- p += m;
- r += m;
- n -= m;
- o = 0;
- }
- b = b->next;
- }
- return r;
-}
-
-/*
- * put a block back to the front of the queue
- * called with q ilocked
- */
-void
-qputback(Queue *q, Block *b)
-{
- b->next = q->bfirst;
- if(q->bfirst == nil)
- q->blast = b;
- q->bfirst = b;
- q->len += BALLOC(b);
- q->dlen += BLEN(b);
-}
-
-/*
* cut off n bytes from the end of *h. return a new
* block with the tail and change *h to refer to the
* head.
@@ -885,31 +814,6 @@
}
/*
- * flow control, get producer going again
- * called with q ilocked
- */
-static void
-qwakeup_iunlock(Queue *q)
-{
- int dowakeup = 0;
-
- /* if writer flow controlled, restart */
- if((q->state & Qflow) && q->len < q->limit/2){
- q->state &= ~Qflow;
- dowakeup = 1;
- }
-
- iunlock(&q->lk);
-
- /* wakeup flow controlled writers */
- if(dowakeup){
- if(q->kick != nil)
- q->kick(q->arg);
- wakeup(&q->wr);
- }
-}
-
-/*
* get next block from a queue (up to a limit)
*/
Block*
@@ -918,6 +822,8 @@
Block *b;
int n;
+ assert(len >= 0);
+
qlock(&q->rlock);
if(waserror()){
qunlock(&q->rlock);
@@ -950,10 +856,8 @@
else
b->wp -= n;
}
+ iunlock_reader(q);
- /* restart producer */
- qwakeup_iunlock(q);
-
qunlock(&q->rlock);
poperror();
@@ -970,6 +874,8 @@
Block *b, *first, **last;
int m, n;
+ assert(len >= 0);
+
qlock(&q->rlock);
if(waserror()){
qunlock(&q->rlock);
@@ -1001,8 +907,8 @@
freeb(qremove(q));
goto again;
}
-
- /* grab the first block plus as many
+ /*
+ * grab the first block plus as many
* following blocks as will partially
* fit in the read.
*/
@@ -1025,8 +931,7 @@
if(n > len && (q->state & Qmsg) == 0)
qputback(q, splitblock(last, n - len));
- /* restart producer */
- qwakeup_iunlock(q);
+ iunlock_reader(q);
qunlock(&q->rlock);
poperror();
@@ -1042,34 +947,39 @@
return n;
}
+/*
+ * a Flow represens a flow controlled
+ * writer on queue q with position p.
+ */
+typedef struct {
+ Queue* q;
+ uint p;
+} Flow;
+
static int
-qnotfull(void *a)
+unblocked(void *a)
{
- Queue *q = a;
+ Flow *f = a;
+ Queue *q = f->q;
- return q->len < q->limit || (q->state & Qclosed);
+ return q->noblock || (int)(f->p - q->rp) <= q->limit || (q->state & Qclosed);
}
/*
- * flow control, wait for queue to get below the limit
+ * flow control, wait for queue to drain back to the limit
*/
static void
-qflow(Queue *q)
+qflow(Flow *f)
{
- for(;;){
- if(q->noblock || qnotfull(q))
- break;
+ Queue *q = f->q;
- ilock(&q->lk);
- q->state |= Qflow;
- iunlock(&q->lk);
-
+ while(!unblocked(f)){
qlock(&q->wlock);
if(waserror()){
qunlock(&q->wlock);
nexterror();
}
- sleep(&q->wr, qnotfull, q);
+ sleep(&q->wr, unblocked, f);
qunlock(&q->wlock);
poperror();
}
@@ -1081,7 +991,8 @@
long
qbwrite(Queue *q, Block *b)
{
- int len, dowakeup;
+ Flow flow;
+ int len;
if(q->bypass != nil){
len = blocklen(b);
@@ -1089,7 +1000,6 @@
return len;
}
- dowakeup = 0;
if(waserror()){
freeblist(b);
nexterror();
@@ -1101,9 +1011,11 @@
iunlock(&q->lk);
error(q->err);
}
-
- /* don't queue over the limit */
- if(q->len >= q->limit && q->noblock){
+ /*
+ * if the queue is full,
+ * silently discard when non-blocking
+ */
+ if(q->state & Qflow && q->noblock){
iunlock(&q->lk);
poperror();
len = blocklen(b);
@@ -1110,33 +1022,25 @@
freeblist(b);
return len;
}
-
len = qaddlist(q, b);
-
- /* make sure other end gets awakened */
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(&q->lk);
poperror();
- /* get output going again */
- if(q->kick != nil && (dowakeup || (q->state&Qkick)))
- q->kick(q->arg);
-
- /* wakeup anyone consuming at the other end */
- if(dowakeup)
- wakeup(&q->rr);
-
/*
- * flow control, before allowing the process to continue and
- * queue more. We do this here so that postnote can only
- * interrupt us after the data has been queued. This means that
- * things like 9p flushes and ssl messages will not be disrupted
- * by software interrupts.
+ * save our current position in queue
+ * for flow control below.
*/
- qflow(q);
+ flow.q = q;
+ flow.p = q->wp;
+ if(iunlock_writer(q) & Qflow){
+ /*
+ * flow control, before allowing the process to continue and
+ * queue more. We do this here so that postnote can only
+ * interrupt us after the data has been queued. This means that
+ * things like 9p flushes and ssl messages will not be disrupted
+ * by software interrupts.
+ */
+ qflow(&flow);
+ }
return len;
}
@@ -1151,17 +1055,11 @@
Block *b;
uchar *p = vp;
+ assert(len >= 0);
+
QDEBUG if(!islo())
print("qwrite hi %#p\n", getcallerpc(&q));
- /* stop queue bloat before allocating blocks */
- if(q->len/2 >= q->limit && q->noblock == 0 && q->bypass == nil){
- while(waserror())
- ;
- qflow(q);
- poperror();
- }
-
sofar = 0;
do {
n = len-sofar;
@@ -1190,11 +1088,11 @@
int
qiwrite(Queue *q, void *vp, int len)
{
- int n, sofar, dowakeup;
+ int n, sofar;
Block *b;
uchar *p = vp;
- dowakeup = 0;
+ assert(len >= 0);
sofar = 0;
do {
@@ -1209,46 +1107,72 @@
b->wp += n;
ilock(&q->lk);
-
- /* we use an artificially high limit for kernel prints since anything
- * over the limit gets dropped
- */
- if((q->state & Qclosed) != 0 || q->len/2 >= q->limit){
+ if(q->state & (Qflow|Qclosed)){
iunlock(&q->lk);
freeb(b);
break;
}
+ sofar += qaddlist(q, b);
+ iunlock_writer(q);
+ } while(sofar < len && (q->state & Qmsg) == 0);
- qaddlist(q, b);
+ return sofar;
+}
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
+/*
+ * throw away the next 'len' bytes in the queue
+ */
+int
+qdiscard(Queue *q, int len)
+{
+ Block *b, *tofree = nil;
+ int n, sofar;
- iunlock(&q->lk);
+ assert(len >= 0);
- if(dowakeup){
- if(q->kick != nil)
- q->kick(q->arg);
- wakeup(&q->rr);
+ ilock(&q->lk);
+ for(sofar = 0; sofar < len; sofar += n){
+ b = q->bfirst;
+ if(b == nil)
+ break;
+ QDEBUG checkb(b, "qdiscard");
+ n = BLEN(b);
+ if(n <= len - sofar){
+ q->bfirst = b->next;
+ q->rp += BALLOC(b);
+
+ /* remember to free this */
+ b->next = tofree;
+ tofree = b;
+ } else {
+ n = len - sofar;
+ b->rp += n;
}
+ q->dlen -= n;
+ }
+ iunlock_consumer(q);
- sofar += n;
- } while(sofar < len && (q->state & Qmsg) == 0);
+ freeblist(tofree);
return sofar;
}
/*
- * be extremely careful when calling this,
- * as there is no reference accounting
+ * flush the output queue
*/
void
-qfree(Queue *q)
+qflush(Queue *q)
{
- qclose(q);
- free(q);
+ Block *tofree;
+
+ ilock(&q->lk);
+ tofree = q->bfirst;
+ q->bfirst = nil;
+ q->rp = q->wp;
+ q->dlen = 0;
+ iunlock_consumer(q);
+
+ freeblist(tofree);
}
/*
@@ -1258,32 +1182,42 @@
void
qclose(Queue *q)
{
- Block *bfirst;
+ Block *tofree;
if(q == nil)
return;
- /* mark it */
ilock(&q->lk);
q->state |= Qclosed;
q->state &= ~(Qflow|Qstarve);
kstrcpy(q->err, Ehungup, ERRMAX);
- bfirst = q->bfirst;
+ tofree = q->bfirst;
q->bfirst = nil;
- q->len = 0;
+ q->rp = q->wp;
q->dlen = 0;
q->noblock = 0;
iunlock(&q->lk);
- /* free queued blocks */
- freeblist(bfirst);
-
/* wake up readers/writers */
wakeup(&q->rr);
wakeup(&q->wr);
+
+ /* free queued blocks */
+ freeblist(tofree);
}
/*
+ * be extremely careful when calling this,
+ * as there is no reference accounting
+ */
+void
+qfree(Queue *q)
+{
+ qclose(q);
+ free(q);
+}
+
+/*
* Mark a queue as closed. Wakeup any readers. Don't remove queued
* blocks.
*/
@@ -1290,7 +1224,6 @@
void
qhangup(Queue *q, char *msg)
{
- /* mark it */
ilock(&q->lk);
q->state |= Qclosed;
if(msg == nil || *msg == '\0')
@@ -1336,26 +1269,21 @@
}
/*
- * return space remaining before flow control
+ * return true if we can read without blocking
*/
int
-qwindow(Queue *q)
+qcanread(Queue *q)
{
- int l;
-
- l = q->limit - q->len;
- if(l < 0)
- l = 0;
- return l;
+ return q->bfirst != nil;
}
/*
- * return true if we can read without blocking
+ * return non-zero when the queue is full
*/
int
-qcanread(Queue *q)
+qfull(Queue *q)
{
- return q->bfirst != nil;
+ return q->state & Qflow;
}
/*
@@ -1364,7 +1292,11 @@
void
qsetlimit(Queue *q, int limit)
{
+ assert(limit >= 0);
+
+ ilock(&q->lk);
q->limit = limit;
+ iunlock_consumer(q);
}
/*
@@ -1373,34 +1305,7 @@
void
qnoblock(Queue *q, int onoff)
{
- q->noblock = onoff;
-}
-
-/*
- * flush the output queue
- */
-void
-qflush(Queue *q)
-{
- Block *bfirst;
-
- /* mark it */
ilock(&q->lk);
- bfirst = q->bfirst;
- q->bfirst = nil;
- q->len = 0;
- q->dlen = 0;
- iunlock(&q->lk);
-
- /* free queued blocks */
- freeblist(bfirst);
-
- /* wake up writers */
- wakeup(&q->wr);
-}
-
-int
-qfull(Queue *q)
-{
- return q->state & Qflow;
+ q->noblock = onoff;
+ iunlock_consumer(q);
}