#include #include #include #include "queue.h" long ventisendbytes, ventisendpackets; long ventirecvbytes, ventirecvpackets; static int _vtsend(VtConn *z, Packet *p) { IOchunk ioc; int n, tot; uchar buf[2]; if(z->state != VtStateConnected) { werrstr("session not connected"); return -1; } /* add framing */ n = packetsize(p); if(n >= (1<<16)) { werrstr("packet too large"); packetfree(p); return -1; } buf[0] = n>>8; buf[1] = n; packetprefix(p, buf, 2); ventisendbytes += n+2; ventisendpackets++; tot = 0; for(;;){ n = packetfragments(p, &ioc, 1, 0); if(n == 0) break; if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){ vtlog(VtServerLog, "%T %s: sending packet %p: %r
\n", z->addr, p); packetfree(p); return -1; } packetconsume(p, nil, ioc.len); tot += ioc.len; } vtlog(VtServerLog, "%T %s: sent packet %p (%d bytes)
\n", z->addr, p, tot); packetfree(p); return 1; } static int interrupted(void) { char e[ERRMAX]; rerrstr(e, sizeof e); return strstr(e, "interrupted") != nil; } static Packet* _vtrecv(VtConn *z) { uchar buf[10], *b; int n; Packet *p; int size, len; if(z->state != VtStateConnected) { werrstr("session not connected"); return nil; } p = z->part; /* get enough for head size */ size = packetsize(p); while(size < 2) { b = packettrailer(p, 2); assert(b != nil); if(0) fprint(2, "%d read hdr\n", getpid()); n = read(z->infd, b, 2); if(0) fprint(2, "%d got %d (%r)\n", getpid(), n); if(n==0 || (n<0 && !interrupted())) goto Err; size += n; packettrim(p, 0, size); } if(packetconsume(p, buf, 2) < 0) goto Err; len = (buf[0] << 8) | buf[1]; size -= 2; while(size < len) { n = len - size; if(n > MaxFragSize) n = MaxFragSize; b = packettrailer(p, n); if(0) fprint(2, "%d read body %d\n", getpid(), n); n = read(z->infd, b, n); if(0) fprint(2, "%d got %d (%r)\n", getpid(), n); if(n > 0) size += n; packettrim(p, 0, size); if(n==0 || (n<0 && !interrupted())) goto Err; } ventirecvbytes += len; ventirecvpackets++; p = packetsplit(p, len); vtlog(VtServerLog, "%T %s: read packet %p len %d
\n", z->addr, p, len); return p; Err: vtlog(VtServerLog, "%T %s: error reading packet: %r
\n", z->addr); return nil; } /* * If you fork off two procs running vtrecvproc and vtsendproc, * then vtrecv/vtsend (and thus vtrpc) will never block except on * rendevouses, which is nice when it's running in one thread of many. */ void vtrecvproc(void *v) { Packet *p; VtConn *z; Queue *q; z = v; q = _vtqalloc(); qlock(&z->lk); z->readq = q; qlock(&z->inlk); rwakeup(&z->rpcfork); qunlock(&z->lk); while((p = _vtrecv(z)) != nil) if(_vtqsend(q, p) < 0){ packetfree(p); break; } qunlock(&z->inlk); qlock(&z->lk); _vtqhangup(q); while((p = _vtnbqrecv(q)) != nil) packetfree(p); _vtqdecref(q); z->readq = nil; rwakeup(&z->rpcfork); qunlock(&z->lk); vthangup(z); } void vtsendproc(void *v) { Queue *q; Packet *p; VtConn *z; z = v; q = _vtqalloc(); qlock(&z->lk); z->writeq = q; qlock(&z->outlk); rwakeup(&z->rpcfork); qunlock(&z->lk); while((p = _vtqrecv(q)) != nil) if(_vtsend(z, p) < 0) break; qunlock(&z->outlk); qlock(&z->lk); _vtqhangup(q); while((p = _vtnbqrecv(q)) != nil) packetfree(p); _vtqdecref(q); z->writeq = nil; rwakeup(&z->rpcfork); qunlock(&z->lk); return; } Packet* vtrecv(VtConn *z) { Packet *p; Queue *q; qlock(&z->lk); if(z->state != VtStateConnected){ werrstr("not connected"); qunlock(&z->lk); return nil; } if(z->readq){ q = _vtqincref(z->readq); qunlock(&z->lk); p = _vtqrecv(q); _vtqdecref(q); return p; } qlock(&z->inlk); qunlock(&z->lk); p = _vtrecv(z); qunlock(&z->inlk); if(!p) vthangup(z); return p; } int vtsend(VtConn *z, Packet *p) { Queue *q; qlock(&z->lk); if(z->state != VtStateConnected){ packetfree(p); werrstr("not connected"); qunlock(&z->lk); return -1; } if(z->writeq){ q = _vtqincref(z->writeq); qunlock(&z->lk); if(_vtqsend(q, p) < 0){ _vtqdecref(q); packetfree(p); return -1; } _vtqdecref(q); return 0; } qlock(&z->outlk); qunlock(&z->lk); if(_vtsend(z, p) < 0){ qunlock(&z->outlk); vthangup(z); return -1; } qunlock(&z->outlk); return 0; }