#include "stdinc.h" #include "9.h" #include "dat.h" #include "fns.h" enum { NConInit = 128, NMsgInit = 384, NMsgProcInit = 64, NMsizeInit = 8192+IOHDRSZ, }; static struct { VtLock* alock; /* alloc */ Msg* ahead; VtRendez* arendez; int maxmsg; int nmsg; int nmsgstarve; VtLock* rlock; /* read */ Msg* rhead; Msg* rtail; VtRendez* rrendez; int maxproc; int nproc; int nprocstarve; u32int msize; /* immutable */ } mbox; static struct { VtLock* alock; /* alloc */ Con* ahead; VtRendez* arendez; VtLock* clock; Con* chead; Con* ctail; int maxcon; int ncon; int nconstarve; u32int msize; } cbox; static void conFree(Con* con) { assert(con->version == nil); assert(con->mhead == nil); assert(con->whead == nil); assert(con->nfid == 0); assert(con->state == ConMoribund); if(con->fd >= 0){ close(con->fd); con->fd = -1; } con->state = ConDead; con->aok = 0; con->flags = 0; con->isconsole = 0; vtLock(cbox.alock); if(con->cprev != nil) con->cprev->cnext = con->cnext; else cbox.chead = con->cnext; if(con->cnext != nil) con->cnext->cprev = con->cprev; else cbox.ctail = con->cprev; con->cprev = con->cnext = nil; if(cbox.ncon > cbox.maxcon){ if(con->name != nil) vtMemFree(con->name); vtLockFree(con->fidlock); vtMemFree(con->data); vtRendezFree(con->wrendez); vtLockFree(con->wlock); vtRendezFree(con->mrendez); vtLockFree(con->mlock); vtRendezFree(con->rendez); vtLockFree(con->lock); vtMemFree(con); cbox.ncon--; vtUnlock(cbox.alock); return; } con->anext = cbox.ahead; cbox.ahead = con; if(con->anext == nil) vtWakeup(cbox.arendez); vtUnlock(cbox.alock); } static void msgFree(Msg* m) { assert(m->rwnext == nil); assert(m->flush == nil); vtLock(mbox.alock); if(mbox.nmsg > mbox.maxmsg){ vtMemFree(m->data); vtMemFree(m); mbox.nmsg--; vtUnlock(mbox.alock); return; } m->anext = mbox.ahead; mbox.ahead = m; if(m->anext == nil) vtWakeup(mbox.arendez); vtUnlock(mbox.alock); } static Msg* msgAlloc(Con* con) { Msg *m; vtLock(mbox.alock); while(mbox.ahead == nil){ if(mbox.nmsg >= mbox.maxmsg){ mbox.nmsgstarve++; vtSleep(mbox.arendez); continue; } m = vtMemAllocZ(sizeof(Msg)); m->data = vtMemAlloc(mbox.msize); m->msize = mbox.msize; mbox.nmsg++; mbox.ahead = m; break; } m = mbox.ahead; mbox.ahead = m->anext; m->anext = nil; vtUnlock(mbox.alock); m->con = con; m->state = MsgR; m->nowq = 0; return m; } static void msgMunlink(Msg* m) { Con *con; con = m->con; if(m->mprev != nil) m->mprev->mnext = m->mnext; else con->mhead = m->mnext; if(m->mnext != nil) m->mnext->mprev = m->mprev; else con->mtail = m->mprev; m->mprev = m->mnext = nil; } void msgFlush(Msg* m) { Con *con; Msg *flush, *old; con = m->con; if(Dflag) fprint(2, "msgFlush %F\n", &m->t); /* * If this Tflush has been flushed, nothing to do. * Look for the message to be flushed in the * queue of all messages still on this connection. * If it's not found must assume Elvis has already * left the building and reply normally. */ vtLock(con->mlock); if(m->state == MsgF){ vtUnlock(con->mlock); return; } for(old = con->mhead; old != nil; old = old->mnext) if(old->t.tag == m->t.oldtag) break; if(old == nil){ if(Dflag) fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag); vtUnlock(con->mlock); return; } if(Dflag) fprint(2, "\tmsgFlush found %F\n", &old->t); /* * Found it. * There are two cases where the old message can be * truly flushed and no reply to the original message given. * The first is when the old message is in MsgR state; no * processing has been done yet and it is still on the read * queue. The second is if old is a Tflush, which doesn't * affect the server state. In both cases, put the old * message into MsgF state and let MsgWrite toss it after * pulling it off the queue. */ if(old->state == MsgR || old->t.type == Tflush){ old->state = MsgF; if(Dflag) fprint(2, "msgFlush: change %d from MsgR to MsgF\n", m->t.oldtag); } /* * Link this flush message and the old message * so multiple flushes can be coalesced (if there are * multiple Tflush messages for a particular pending * request, it is only necessary to respond to the last * one, so any previous can be removed) and to be * sure flushes wait for their corresponding old * message to go out first. * Waiting flush messages do not go on the write queue, * they are processed after the old message is dealt * with. There's no real need to protect the setting of * Msg.nowq, the only code to check it runs in this * process after this routine returns. */ if((flush = old->flush) != nil){ if(Dflag) fprint(2, "msgFlush: remove %d from %d list\n", old->flush->t.tag, old->t.tag); m->flush = flush->flush; flush->flush = nil; msgMunlink(flush); msgFree(flush); } old->flush = m; m->nowq = 1; if(Dflag) fprint(2, "msgFlush: add %d to %d queue\n", m->t.tag, old->t.tag); vtUnlock(con->mlock); } static void msgProc(void*) { Msg *m; char *e; Con *con; vtThreadSetName("msgProc"); for(;;){ /* * If surplus to requirements, exit. * If not, wait for and pull a message off * the read queue. */ vtLock(mbox.rlock); if(mbox.nproc > mbox.maxproc){ mbox.nproc--; vtUnlock(mbox.rlock); break; } while(mbox.rhead == nil) vtSleep(mbox.rrendez); m = mbox.rhead; mbox.rhead = m->rwnext; m->rwnext = nil; vtUnlock(mbox.rlock); con = m->con; e = nil; /* * If the message has been flushed before * any 9P processing has started, mark it so * none will be attempted. */ vtLock(con->mlock); if(m->state == MsgF) e = "flushed"; else m->state = Msg9; vtUnlock(con->mlock); if(e == nil){ /* * explain this */ vtLock(con->lock); if(m->t.type == Tversion){ con->version = m; con->state = ConDown; while(con->mhead != m) vtSleep(con->rendez); assert(con->state == ConDown); if(con->version == m){ con->version = nil; con->state = ConInit; } else e = "Tversion aborted"; } else if(con->state != ConUp) e = "connection not ready"; vtUnlock(con->lock); } /* * Dispatch if not error already. */ m->r.tag = m->t.tag; if(e == nil && !(*rFcall[m->t.type])(m)) e = vtGetError(); if(e != nil){ m->r.type = Rerror; m->r.ename = e; } else m->r.type = m->t.type+1; /* * Put the message (with reply) on the * write queue and wakeup the write process. */ if(!m->nowq){ vtLock(con->wlock); if(con->whead == nil) con->whead = m; else con->wtail->rwnext = m; con->wtail = m; vtWakeup(con->wrendez); vtUnlock(con->wlock); } } } static void msgRead(void* v) { Msg *m; Con *con; int eof, fd, n; vtThreadSetName("msgRead"); con = v; fd = con->fd; eof = 0; while(!eof){ m = msgAlloc(con); while((n = read9pmsg(fd, m->data, con->msize)) == 0) ; if(n < 0){ m->t.type = Tversion; m->t.fid = NOFID; m->t.tag = NOTAG; m->t.msize = con->msize; m->t.version = "9PEoF"; eof = 1; } else if(convM2S(m->data, n, &m->t) != n){ if(Dflag) fprint(2, "msgRead: convM2S error: %s\n", con->name); msgFree(m); continue; } if(Dflag) fprint(2, "msgRead %p: t %F\n", con, &m->t); vtLock(con->mlock); if(con->mtail != nil){ m->mprev = con->mtail; con->mtail->mnext = m; } else{ con->mhead = m; m->mprev = nil; } con->mtail = m; vtUnlock(con->mlock); vtLock(mbox.rlock); if(mbox.rhead == nil){ mbox.rhead = m; if(!vtWakeup(mbox.rrendez)){ if(mbox.nproc < mbox.maxproc){ if(vtThread(msgProc, nil) > 0) mbox.nproc++; } else mbox.nprocstarve++; } /* * don't need this surely? vtWakeup(mbox.rrendez); */ } else mbox.rtail->rwnext = m; mbox.rtail = m; vtUnlock(mbox.rlock); } } static void msgWrite(void* v) { Con *con; int eof, n; Msg *flush, *m; vtThreadSetName("msgWrite"); con = v; if(vtThread(msgRead, con) < 0){ conFree(con); return; } for(;;){ /* * Wait for and pull a message off the write queue. */ vtLock(con->wlock); while(con->whead == nil) vtSleep(con->wrendez); m = con->whead; con->whead = m->rwnext; m->rwnext = nil; assert(!m->nowq); vtUnlock(con->wlock); eof = 0; /* * Write each message (if it hasn't been flushed) * followed by any messages waiting for it to complete. */ vtLock(con->mlock); while(m != nil){ msgMunlink(m); if(Dflag) fprint(2, "msgWrite %d: r %F\n", m->state, &m->r); if(m->state != MsgF){ m->state = MsgW; vtUnlock(con->mlock); n = convS2M(&m->r, con->data, con->msize); if(write(con->fd, con->data, n) != n) eof = 1; vtLock(con->mlock); } if((flush = m->flush) != nil){ assert(flush->nowq); m->flush = nil; } msgFree(m); m = flush; } vtUnlock(con->mlock); vtLock(con->lock); if(eof && con->fd >= 0){ close(con->fd); con->fd = -1; } if(con->state == ConDown) vtWakeup(con->rendez); if(con->state == ConMoribund && con->mhead == nil){ vtUnlock(con->lock); conFree(con); break; } vtUnlock(con->lock); } } Con* conAlloc(int fd, char* name, int flags) { Con *con; char buf[128], *p; int rfd, n; vtLock(cbox.alock); while(cbox.ahead == nil){ if(cbox.ncon >= cbox.maxcon){ cbox.nconstarve++; vtSleep(cbox.arendez); continue; } con = vtMemAllocZ(sizeof(Con)); con->lock = vtLockAlloc(); con->rendez = vtRendezAlloc(con->lock); con->data = vtMemAlloc(cbox.msize); con->msize = cbox.msize; con->alock = vtLockAlloc(); con->mlock = vtLockAlloc(); con->mrendez = vtRendezAlloc(con->mlock); con->wlock = vtLockAlloc(); con->wrendez = vtRendezAlloc(con->wlock); con->fidlock = vtLockAlloc(); cbox.ncon++; cbox.ahead = con; break; } con = cbox.ahead; cbox.ahead = con->anext; con->anext = nil; if(cbox.ctail != nil){ con->cprev = cbox.ctail; cbox.ctail->cnext = con; } else{ cbox.chead = con; con->cprev = nil; } cbox.ctail = con; assert(con->mhead == nil); assert(con->whead == nil); assert(con->fhead == nil); assert(con->nfid == 0); con->state = ConNew; con->fd = fd; if(con->name != nil){ vtMemFree(con->name); con->name = nil; } if(name != nil) con->name = vtStrDup(name); else con->name = vtStrDup("unknown"); con->remote[0] = 0; snprint(buf, sizeof buf, "%s/remote", con->name); if((rfd = open(buf, OREAD)) >= 0){ n = read(rfd, buf, sizeof buf-1); close(rfd); if(n > 0){ buf[n] = 0; if((p = strchr(buf, '\n')) != nil) *p = 0; strecpy(con->remote, con->remote+sizeof con->remote, buf); } } con->flags = flags; con->isconsole = 0; vtUnlock(cbox.alock); if(vtThread(msgWrite, con) < 0){ conFree(con); return nil; } return con; } static int cmdMsg(int argc, char* argv[]) { char *p; char *usage = "usage: msg [-m nmsg] [-p nproc]"; int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve; maxmsg = maxproc = 0; ARGBEGIN{ default: return cliError(usage); case 'm': p = ARGF(); if(p == nil) return cliError(usage); maxmsg = strtol(argv[0], &p, 0); if(maxmsg <= 0 || p == argv[0] || *p != '\0') return cliError(usage); break; case 'p': p = ARGF(); if(p == nil) return cliError(usage); maxproc = strtol(argv[0], &p, 0); if(maxproc <= 0 || p == argv[0] || *p != '\0') return cliError(usage); break; }ARGEND if(argc) return cliError(usage); vtLock(mbox.alock); if(maxmsg) mbox.maxmsg = maxmsg; maxmsg = mbox.maxmsg; nmsg = mbox.nmsg; nmsgstarve = mbox.nmsgstarve; vtUnlock(mbox.alock); vtLock(mbox.rlock); if(maxproc) mbox.maxproc = maxproc; maxproc = mbox.maxproc; nproc = mbox.nproc; nprocstarve = mbox.nprocstarve; vtUnlock(mbox.rlock); consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc); consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n", nmsg, nmsgstarve, nproc, nprocstarve); return 1; } static int scmp(Fid *a, Fid *b) { if(a == 0) return 1; if(b == 0) return -1; return strcmp(a->uname, b->uname); } static Fid* fidMerge(Fid *a, Fid *b) { Fid *s, **l; l = &s; while(a || b){ if(scmp(a, b) < 0){ *l = a; l = &a->sort; a = a->sort; }else{ *l = b; l = &b->sort; b = b->sort; } } *l = 0; return s; } static Fid* fidMergeSort(Fid *f) { int delay; Fid *a, *b; if(f == nil) return nil; if(f->sort == nil) return f; a = b = f; delay = 1; while(a && b){ if(delay) /* easy way to handle 2-element list */ delay = 0; else a = a->sort; if(b = b->sort) b = b->sort; } b = a->sort; a->sort = nil; a = fidMergeSort(f); b = fidMergeSort(b); return fidMerge(a, b); } static int cmdWho(int argc, char* argv[]) { char *usage = "usage: who"; int i, l1, l2, l; Con *con; Fid *fid, *last; ARGBEGIN{ default: return cliError(usage); }ARGEND if(argc > 0) return cliError(usage); vtRLock(cbox.clock); l1 = 0; l2 = 0; for(con=cbox.chead; con; con=con->cnext){ if((l = strlen(con->name)) > l1) l1 = l; if((l = strlen(con->remote)) > l2) l2 = l; } for(con=cbox.chead; con; con=con->cnext){ consPrint("\t%-*s %-*s", l1, con->name, l2, con->remote); vtLock(con->fidlock); last = nil; for(i=0; ifidhash[i]; fid; fid=fid->hash) if(fid->fidno != NOFID && fid->uname){ fid->sort = last; last = fid; } fid = fidMergeSort(last); last = nil; for(; fid; last=fid, fid=fid->sort) if(last==nil || strcmp(fid->uname, last->uname) != 0) consPrint(" %q", fid->uname); vtUnlock(con->fidlock); consPrint("\n"); } vtRUnlock(cbox.clock); return 1; } void msgInit(void) { mbox.alock = vtLockAlloc(); mbox.arendez = vtRendezAlloc(mbox.alock); mbox.rlock = vtLockAlloc(); mbox.rrendez = vtRendezAlloc(mbox.rlock); mbox.maxmsg = NMsgInit; mbox.maxproc = NMsgProcInit; mbox.msize = NMsizeInit; cliAddCmd("msg", cmdMsg); } static int cmdCon(int argc, char* argv[]) { char *p; Con *con; char *usage = "usage: con [-m ncon]"; int maxcon, ncon, nconstarve; maxcon = 0; ARGBEGIN{ default: return cliError(usage); case 'm': p = ARGF(); if(p == nil) return cliError(usage); maxcon = strtol(argv[0], &p, 0); if(maxcon <= 0 || p == argv[0] || *p != '\0') return cliError(usage); break; }ARGEND if(argc) return cliError(usage); vtLock(cbox.clock); if(maxcon) cbox.maxcon = maxcon; maxcon = cbox.maxcon; ncon = cbox.ncon; nconstarve = cbox.nconstarve; vtUnlock(cbox.clock); consPrint("\tcon -m %d\n", maxcon); consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve); vtRLock(cbox.clock); for(con = cbox.chead; con != nil; con = con->cnext){ consPrint("\t%s\n", con->name); } vtRUnlock(cbox.clock); return 1; } void conInit(void) { cbox.alock = vtLockAlloc(); cbox.arendez = vtRendezAlloc(cbox.alock); cbox.clock = vtLockAlloc(); cbox.maxcon = NConInit; cbox.msize = NMsizeInit; cliAddCmd("con", cmdCon); cliAddCmd("who", cmdWho); }