#include #include #include #include #include #include <9p.h> #define BUCKSIZE 777777 #define MAGIC 77777 #define MAXQ 777 #define SMBUF 777 /* provides input/output multiplexing for 'screen' like functionality */ /* usually used in combination with hubshell client and hub wrapper script */ typedef struct Hub Hub; /* A Hub is a file that functions as a multiplexed pipe */ typedef struct Msgq Msgq; /* The Msgq is a per-client fid structure to track location */ struct Hub{ char name[SMBUF]; /* name */ char bucket[BUCKSIZE]; /* data buffer */ char *inbuckp; /* location to store next message */ int buckfull; /* amount of data stored in bucket */ char *buckwrap; /* exact limit of written data before pointer reset */ Req *qreqs[MAXQ]; /* pointers to queued read Reqs */ int rstatus[MAXQ]; /* status of read requests */ int qrnum; /* index of read Reqs waiting to be filled */ int qans; /* number of read Reqs answered */ Req *qwrits[MAXQ]; /* Similar for write Reqs */ int wstatus[MAXQ]; int qwnum; int qwans; int ketchup; /* makes it all taste better */ int tomatoflag; /* fruit vs veggies */ QLock wrlk; /* writer lock during fear */ QLock replk; /* reply lock during fear */ int killme; /* i need to die */ }; struct Msgq{ ulong myfid; /* Msgq is associated with client fids */ char *nxt; /* Location of this client in the buffer */ int bufuse; /* how much of the buffer has been used */ }; enum { UP = 1, DOWN = 2, WAIT = 3, DONE = 4, }; char *srvname; int paranoia; /* In paranoid mode loose reader/writer sync is maintained */ int freeze; /* In frozen mode the hubs operate simply as a ramfs */ static char Ebad[] = "something bad happened"; static char Enomem[] = "no memory"; void wrsend(Hub *h); void msgsend(Hub *h); void hubcmd(char *cmd); void zerohub(Hub *h); void fsread(Req *r); void fswrite(Req *r); void fscreate(Req *r); void fsopen(Req *r); void fsdestroyfile(File *f); void usage(void); Srv fs = { .open= fsopen, .read= fsread, .write= fswrite, .create= fscreate, }; /* msgsend replies to Reqs queued by fsread */ void msgsend(Hub *h) { Req *r; Msgq *mq; u32int count; if(h->qrnum == 0){ return; } for(int i = h->qans; i <= h->qrnum; i++){ if(paranoia == UP){ qlock(&h->replk); } if(h->rstatus[i] != WAIT){ if((i == h->qans) && (i < h->qrnum)){ h->qans++; } if(paranoia == UP){ qunlock(&h->replk); } continue; } r = h->qreqs[i]; mq = r->fid->aux; if(mq->nxt == h->inbuckp){ if(paranoia == UP){ qunlock(&h->replk); } continue; } count = r->ifcall.count; if(mq->nxt >= h->buckwrap){ mq->nxt = h->bucket; mq->bufuse = 0; } if(mq->nxt + count > h->buckwrap){ count = h->buckwrap - mq->nxt; } if((mq->bufuse + count > h->buckfull) && (mq->bufuse < h->buckfull)){ count = h->buckfull - mq->bufuse; } memmove(r->ofcall.data, mq->nxt, count); r->ofcall.count = count; mq->nxt += count; mq->bufuse += count; h->rstatus[i] = DONE; if((i == h->qans) && (i < h->qrnum)){ h->qans++; } respond(r, nil); if(paranoia == UP){ h->ketchup = mq->bufuse; if(mq->bufuse <= h->buckfull){ h->tomatoflag = DOWN; /* DOWN means do not wait for us */ } else { h->tomatoflag = UP; } qunlock(&h->replk); } } } /* wrsend replies to Reqs queued by fswrite */ void wrsend(Hub *h) { Req *r; u32int count; if(h->qwnum == 0){ return; } if(paranoia == UP){ /* If we are paranoid, we fork and slack off while the readers catch up */ qlock(&h->wrlk); if((h->ketchup < h->buckfull - MAGIC) || (h->ketchup > h->buckfull)){ if(rfork(RFPROC|RFMEM) == 0){ sleep(100); h->killme = UP; for(int i = 0; ((i < 77) && (h->tomatoflag == UP)); i++){ sleep(7); /* Give the readers some time to catch up and drop the flag */ } } else { return; /* We want this flow of control to become an incoming read request */ } } } for(int i = h->qwans; i <= h->qwnum; i++){ if(h->wstatus[i] != WAIT){ if((i == h->qwans) && (i < h->qwnum)){ h->qwans++; } continue; } r = h->qwrits[i]; count = r->ifcall.count; if((h->buckfull + count) >= BUCKSIZE - 8192){ h->buckwrap = h->inbuckp; h->inbuckp = h->bucket; h->buckfull = 0; } memmove(h->inbuckp, r->ifcall.data, count); h->inbuckp += count; h->buckfull += count; r->fid->file->length = h->buckfull; r->ofcall.count = count; h->wstatus[i] = DONE; if((i == h->qwans) && (i < h->qwnum)){ h->qwans++; } respond(r, nil); if(paranoia == UP){ if(h->wrlk.locked == 1){ qunlock(&h->wrlk); } if(h->killme == UP){ /* If killme is up we forked another flow of control and need to die */ h->killme = DOWN; exits(nil); } } } } /* queue all reads unless Hubs are set to freeze, in which case we behave like ramfiles */ void fsread(Req *r) { Hub *h; Msgq *mq; u32int count; vlong offset; char tmpstr[SMBUF]; h = r->fid->file->aux; if(strncmp(h->name, "ctl", 3) == 0){ count = r->ifcall.count; offset = r->ifcall.offset; if(offset > 0){ r->ofcall.count = 0; respond(r, nil); return; } sprint(tmpstr, "\tHubfs %s status:\nParanoia == %d\tFreeze == %d\n", srvname, paranoia, freeze); if(strlen(tmpstr) <= count){ count = strlen(tmpstr); } else { respond(r, "read count too small to answer\b"); } memmove(r->ofcall.data, tmpstr, count); r->ofcall.count = count; respond(r, nil); return; } if(freeze == UP){ mq = r->fid->aux; if(mq->bufuse > 0){ if(h->qrnum >= MAXQ -2){ h->qrnum = 1; h->qans = 1; } h->qrnum++; h->rstatus[h->qrnum] = WAIT; h->qreqs[h->qrnum] = r; return; } count = r->ifcall.count; offset = r->ifcall.offset; while(offset >= BUCKSIZE){ offset -= BUCKSIZE; } if(offset >= h->buckfull){ r->ofcall.count = 0; respond(r, nil); return; } if((offset + count >= h->buckfull) && (offset < h->buckfull)){ count = h->buckfull - offset; } memmove(r->ofcall.data, h->bucket + offset, count); r->ofcall.count = count; respond(r, nil); return; } h->qrnum++; if(h->qrnum >= MAXQ - 2){ msgsend(h); h->qrnum = 1; h->qans = 1; } h->rstatus[h->qrnum] = WAIT; h->qreqs[h->qrnum] = r; msgsend(h); } /* queue writes unless hubs are set to frozen mode */ void fswrite(Req *r) { Hub *h; u32int count; vlong offset; h = r->fid->file->aux; if(strncmp(h->name, "ctl", 3) == 0){ hubcmd(r->ifcall.data); } if(freeze == UP){ count = r->ifcall.count; offset = r->ifcall.offset; while(offset >= BUCKSIZE){ offset -= BUCKSIZE; } h->inbuckp = h->bucket +offset; h->buckfull = h->inbuckp - h->bucket; if(h->buckfull + count >= BUCKSIZE){ h->inbuckp = h->bucket; h->buckfull = 0; } memmove(h->inbuckp, r->ifcall.data, count); h->inbuckp += count; h->buckfull += count; r->fid->file->length = h->buckfull; r->ofcall.count = count; respond(r, nil); return; } h->qwnum++; if(h->qwnum >= MAXQ - 2){ msgsend(h); h->qwnum = 1; h->qwans = 1; } h->wstatus[h->qwnum] = WAIT; h->qwrits[h->qwnum] = r; wrsend(h); msgsend(h); } void fscreate(Req *r) { Hub *h; File *f; if(f = createfile(r->fid->file, r->ifcall.name, r->fid->uid, r->ifcall.perm, nil)){ h = emalloc9p(sizeof(Hub)); zerohub(h); strcat(h->name, r->ifcall.name); f->aux = h; r->fid->file = f; r->ofcall.qid = f->qid; respond(r, nil); return; } respond(r, Ebad); } void fsopen(Req *r) { Hub *h; Msgq *q; h = r->fid->file->aux; q = (Msgq*)emalloc9p(sizeof(Msgq)); memset(q, 0, sizeof(Msgq)); q->myfid = r->fid->fid; q->nxt = h->bucket; q->bufuse = 0; r->fid->aux = q; if(h && (r->ifcall.mode&OTRUNC)){ h->inbuckp = h->bucket; h->buckfull = 0; r->fid->file->length = 0; } respond(r, nil); } void fsdestroyfile(File *f) { Hub *h; h = f->aux; if(h){ free(h); } } void zerohub(Hub *h) { memset(h, 0, sizeof(Hub)); h->inbuckp = h->bucket; h->qrnum = 0; h->qans = 1; h->qwnum = 0; h->qwans = 0; h->ketchup = 0; h->buckwrap = h->inbuckp + BUCKSIZE; } void hubcmd(char *cmd) { if(strncmp(cmd, "quit", 4) == 0){ sysfatal("quit command sent to hubfs!"); } if(strncmp(cmd, "fear", 4) == 0){ paranoia = UP; print("you feel a cold creeping feeling coming up your spine...\n"); return; } if(strncmp(cmd, "calm", 4) == 0){ paranoia = DOWN; print("you feel relaxed and confident.\n"); return; } if(strncmp(cmd, "freeze", 6) == 0){ freeze = UP; print("the pipes freeze in place!\n"); return; } if(strncmp(cmd, "melt", 4) == 0){ freeze = DOWN; print("the pipes thaw and data flows freely again\n"); return; } fprint(2, "no matching command found\n"); } void usage(void) { fprint(2, "usage: hubfs [-D] [-s srvname] [-m mtpt]\n"); exits("usage"); } void main(int argc, char **argv) { char *addr = nil; char *mtpt = nil; Qid q; srvname = nil; fs.tree = alloctree(nil, nil, DMDIR|0777, fsdestroyfile); q = fs.tree->root->qid; ARGBEGIN{ case 'D': chatty9p++; break; case 'a': addr = EARGF(usage()); break; case 's': srvname = EARGF(usage()); break; case 'm': mtpt = EARGF(usage()); break; default: usage(); }ARGEND; if(argc) usage(); if(chatty9p) fprint(2, "hubsrv.nopipe %d srvname %s mtpt %s\n", fs.nopipe, srvname, mtpt); if(addr == nil && srvname == nil && mtpt == nil) sysfatal("must specify -a, -s, or -m option"); if(addr) listensrv(&fs, addr); if(srvname || mtpt) postmountsrv(&fs, srvname, mtpt, MREPL|MCREATE); exits(0); } /* basic 9pfile implementation taken from /sys/src/lib9p/ramfs.c */