#include #include #include #include #include "pool.h" #include "playlist.h" typedef struct Wmsg Wmsg; enum { Busy = 0x01, Open = 0x02, Trunc = 0x04, Eof = 0x08, }; File files[] = { [Qdir] = {.dir = {0,0,{Qdir, 0,QTDIR}, 0555|DMDIR, 0,0,0, "."}}, [Qplayctl] = {.dir = {0,0,{Qplayctl, 0,QTFILE}, 0666, 0,0,0, "playctl"}}, [Qplaylist] = {.dir = {0,0,{Qplaylist, 0,QTFILE}, 0666|DMAPPEND, 0,0,0, "playlist"}}, [Qplayvol] = {.dir = {0,0,{Qplayvol, 0,QTFILE}, 0666, 0,0,0, "playvol"}}, [Qplaystat] = {.dir = {0,0,{Qplaystat, 0,QTFILE}, 0444, 0,0,0, "playstat"}}, }; Channel *reqs; Channel *workers; Channel *volumechan; Channel *playchan; Channel *playlistreq; Playlist playlist; int volume[8]; char *statetxt[] = { [Nostate] = "panic!", [Error] = "error", [Stop] = "stop", [Pause] = "pause", [Play] = "play", [Resume] = "resume", [Skip] = "skip", nil }; // low-order bits: position in play list, high-order: play state: Pmsg playstate = {Stop, 0}; char *rflush(Worker*), *rauth(Worker*), *rattach(Worker*), *rwalk(Worker*), *ropen(Worker*), *rcreate(Worker*), *rread(Worker*), *rwrite(Worker*), *rclunk(Worker*), *rremove(Worker*), *rstat(Worker*), *rwstat(Worker*), *rversion(Worker*); char *(*fcalls[])(Worker*) = { [Tflush] rflush, [Tversion] rversion, [Tauth] rauth, [Tattach] rattach, [Twalk] rwalk, [Topen] ropen, [Tcreate] rcreate, [Tread] rread, [Twrite] rwrite, [Tclunk] rclunk, [Tremove] rremove, [Tstat] rstat, [Twstat] rwstat, }; int messagesize = Messagesize; Fid *fids; char Eperm[] = "permission denied"; char Enotdir[] = "not a directory"; char Enoauth[] = "authentication not required"; char Enotexist[] = "file does not exist"; char Einuse[] = "file in use"; char Eexist[] = "file exists"; char Enotowner[] = "not owner"; char Eisopen[] = "file already open for I/O"; char Excl[] = "exclusive use file already open"; char Ename[] = "illegal name"; char Ebadctl[] = "unknown control message"; char Epast[] = "reading past eof"; Fid *oldfid(int); Fid *newfid(int); void volumeupdater(void*); void playupdater(void*); char *playerror; static int lookup(char *cmd, char *list[]) { int i; for (i = 0; list[i] != nil; i++) if (strcmp(cmd, list[i]) == 0) return i; return -1; } char* rversion(Worker *w) { Req *r; Fid *f; r = w->r; if(r->ifcall.msize < 256) return "max messagesize too small"; if(r->ifcall.msize < messagesize) messagesize = r->ifcall.msize; r->ofcall.msize = messagesize; if(strncmp(r->ifcall.version, "9P2000", 6) != 0) return "unknown 9P version"; else r->ofcall.version = "9P2000"; for(f = fids; f; f = f->next) if(f->flags & Busy) f->flags &= ~(Open|Busy); return nil; } char* rauth(Worker*) { return Enoauth; } char* rflush(Worker *w) { Wmsg m; int i; Req *r; r = w->r; m.cmd = Flush; m.off = r->ifcall.oldtag; m.arg = nil; for(i = 1; i < nelem(files); i++) bcastmsg(files[i].workers, &m); if (debug & DbgWorker) fprint(2, "flush done on tag %d\n", r->ifcall.oldtag); return 0; } char* rattach(Worker *w) { Fid *f; Req *r; r = w->r; r->fid = newfid(r->ifcall.fid); f = r->fid; f->flags |= Busy; f->file = &files[Qdir]; r->ofcall.qid = f->file->dir.qid; if(!aflag && strcmp(r->ifcall.uname, user) != 0) return Eperm; return 0; } static Fid* doclone(Fid *f, int nfid) { Fid *nf; nf = newfid(nfid); if(nf->flags & Busy) return nil; nf->flags |= Busy; nf->flags &= ~(Open); nf->file = f->file; return nf; } char* dowalk(Fid *f, char *name) { int t; if (strcmp(name, ".") == 0) return nil; if (strcmp(name, "..") == 0){ f->file = &files[Qdir]; return nil; } if(f->file != &files[Qdir]) return Enotexist; for (t = 1; t < Nqid; t++){ if(strcmp(name, files[t].dir.name) == 0){ f->file = &files[t]; return nil; } } return Enotexist; } char* rwalk(Worker *w) { Fid *f, *nf; char *rv; int i; File *savefile; Req *r; r = w->r; r->fid = oldfid(r->ifcall.fid); if((f = r->fid) == nil) return Enotexist; if(f->flags & Open) return Eisopen; r->ofcall.nwqid = 0; nf = nil; savefile = f->file; /* clone if requested */ if(r->ifcall.newfid != r->ifcall.fid){ nf = doclone(f, r->ifcall.newfid); if(nf == nil) return "new fid in use"; f = nf; } /* if it's just a clone, return */ if(r->ifcall.nwname == 0 && nf != nil) return nil; /* walk each element */ rv = nil; for(i = 0; i < r->ifcall.nwname; i++){ rv = dowalk(f, r->ifcall.wname[i]); if(rv != nil){ if(nf != nil) nf->flags &= ~(Open|Busy); else f->file = savefile; break; } r->ofcall.wqid[i] = f->file->dir.qid; } r->ofcall.nwqid = i; /* we only error out if no walk */ if(i > 0) rv = nil; return rv; } char * ropen(Worker *w) { Fid *f, *ff; Wmsg m; Req *r; r = w->r; r->fid = oldfid(r->ifcall.fid); if((f = r->fid) == nil) return Enotexist; if(f->flags & Open) return Eisopen; if(r->ifcall.mode != OREAD) if((f->file->dir.mode & 0x2) == 0) return Eperm; if((r->ifcall.mode & OTRUNC) && f->file == &files[Qplaylist]){ playlist.nlines = 0; playlist.ndata = 0; free(playlist.lines); free(playlist.data); playlist.lines = nil; playlist.data = nil; f->file->dir.length = 0; f->file->dir.qid.vers++; /* Mark all fids for this file `Trunc'ed */ for(ff = fids; ff; ff = ff->next) if(ff->file == &files[Qplaylist] && (ff->flags & Open)) ff->flags |= Trunc; m.cmd = Check; m.off = 0; m.arg = nil; bcastmsg(f->file->workers, &m); } r->ofcall.iounit = 0; r->ofcall.qid = f->file->dir.qid; f->flags |= Open; return nil; } char * rcreate(Worker*) { return Eperm; } int readtopdir(Fid*, uchar *buf, long off, int cnt, int blen) { int i, m, n; long pos; n = 0; pos = 0; for (i = 1; i < Nqid; i++){ m = convD2M(&files[i].dir, &buf[n], blen-n); if(off <= pos){ if(m <= BIT16SZ || m > cnt) break; n += m; cnt -= m; } pos += m; } return n; } char* rread(Worker *w) { Fid *f; Req *r; long off, cnt; int n, i; Wmsg m; char *p; r = w->r; r->fid = oldfid(r->ifcall.fid); if((f = r->fid) == nil) return Enotexist; r->ofcall.count = 0; off = r->ifcall.offset; cnt = r->ifcall.count; if(cnt > messagesize - IOHDRSZ) cnt = messagesize - IOHDRSZ; if(f->file == &files[Qdir]){ n = readtopdir(f, r->indata, off, cnt, messagesize - IOHDRSZ); r->ofcall.count = n; return nil; } if(f->file == files + Qplaystat){ p = getplaystat(r->ofcall.data, r->ofcall.data + sizeof r->indata); readbuf(r, r->ofcall.data, p - r->ofcall.data); return nil; } m.cmd = 0; while(f->vers == f->file->dir.qid.vers && (f->flags & Eof)){ /* Wait until file state changes (f->file->dir.qid.vers is incremented) */ m = waitmsg(w, f->file->workers); if(m.cmd == Flush && m.off == r->ifcall.tag) return (char*)~0; /* no answer needed */ assert(m.cmd != Work); yield(); } if(f->file == files + Qplaylist){ f->flags &= ~Eof; if((f->flags & Trunc) && r->ifcall.offset != 0){ f->flags &= ~Trunc; return Epast; } f->flags &= ~Trunc; if(r->ifcall.offset < playlist.ndata) readbuf(r, playlist.data, playlist.ndata); else if(r->ifcall.offset == playlist.ndata){ r->ofcall.count = 0; /* Arrange for this fid to wait next time: */ f->vers = f->file->dir.qid.vers; f->flags |= Eof; }else{ /* Beyond eof, bad seek? */ return Epast; } }else if(f->file == files + Qplayctl){ f->flags &= ~Eof; if(m.cmd == Error){ snprint(r->ofcall.data, sizeof r->indata, "%s %ud %q", statetxt[m.cmd], m.off, m.arg); free(m.arg); }else if(f->vers == f->file->dir.qid.vers){ r->ofcall.count = 0; /* Arrange for this fid to wait next time: */ f->flags |= Eof; return nil; }else{ snprint(r->ofcall.data, sizeof r->indata, "%s %ud", statetxt[playstate.cmd], playstate.off); f->vers = f->file->dir.qid.vers; } r->ofcall.count = strlen(r->ofcall.data); if(r->ofcall.count > r->ifcall.count) r->ofcall.count = r->ifcall.count; }else if(f->file == files + Qplayvol){ f->flags &= ~Eof; if(f->vers == f->file->dir.qid.vers){ r->ofcall.count = 0; /* Arrange for this fid to wait next time: */ f->flags |= Eof; }else{ p = seprint(r->ofcall.data, r->ofcall.data + sizeof r->indata, "volume '"); for(i = 0; i < nelem(volume); i++){ if(volume[i] == Undef) break; p = seprint(p, r->ofcall.data + sizeof r->indata, "%d ", volume[i]); } p = seprint(p, r->ofcall.data + sizeof r->indata, "'"); r->ofcall.count = p - r->ofcall.data; if(r->ofcall.count > r->ifcall.count) r->ofcall.count = r->ifcall.count; f->vers = f->file->dir.qid.vers; } }else abort(); return nil; } char* rwrite(Worker *w) { long cnt, i, nf, cmd; Pmsg newstate; char *fields[3], *p, *q; Wmsg m; Fid *f; Req *r; r = w->r; r->fid = oldfid(r->ifcall.fid); if((f = r->fid) == nil) return Enotexist; r->ofcall.count = 0; cnt = r->ifcall.count; if(cnt > messagesize - IOHDRSZ) cnt = messagesize - IOHDRSZ; if(f->file == &files[Qplayctl]){ r->ifcall.data[cnt] = '\0'; if (debug & DbgPlayer) fprint(2, "rwrite playctl: %s\n", r->ifcall.data); nf = tokenize(r->ifcall.data, fields, 4); if (nf == 0){ r->ofcall.count = r->ifcall.count; return nil; } if (nf == 2) i = strtol(fields[1], nil, 0); else i = playstate.off; newstate = playstate; if ((cmd = lookup(fields[0], statetxt)) < 0) return Ebadctl; switch(cmd){ case Play: newstate.cmd = cmd; newstate.off = i; break; case Pause: if (playstate.cmd != Play) break; // fall through case Stop: newstate.cmd = cmd; newstate.off = playstate.off; break; case Resume: if(playstate.cmd == Stop) break; newstate.cmd = Resume; newstate.off = playstate.off; break; case Skip: if (nf == 2) i += playstate.off; else i = playstate.off +1; if(i < 0) i = 0; else if (i >= playlist.nlines) i = playlist.nlines - 1; newstate.cmd = Play; newstate.off = i; } if (newstate.off >= playlist.nlines){ newstate.cmd = Stop; newstate.off = playlist.nlines; } if (debug & DbgPlayer) fprint(2, "new state %s-%ud\n", statetxt[newstate.cmd], newstate.off); if (newstate.m != playstate.m) sendul(playc, newstate.m); f->file->dir.qid.vers++; } else if(f->file == &files[Qplayvol]){ char *subfields[nelem(volume)]; int v[nelem(volume)]; r->ifcall.data[cnt] = '\0'; if (debug & DbgPlayer) fprint(2, "rwrite playvol: %s\n", r->ifcall.data); nf = tokenize(r->ifcall.data, fields, 4); if (nf == 0){ r->ofcall.count = r->ifcall.count; return nil; } if (nf != 2 || strcmp(fields[0], "volume") != 0) return Ebadctl; if (debug & DbgPlayer) fprint(2, "new volume '"); nf = tokenize(fields[1], subfields, nelem(subfields)); if (nf <= 0 || nf > nelem(volume)) return "volume"; for (i = 0; i < nf; i++){ v[i] = strtol(subfields[i], nil, 0); if (debug & DbgPlayer) fprint(2, " %d", v[i]); } if (debug & DbgPlayer) fprint(2, "'\n"); while (i < nelem(volume)) v[i++] = Undef; volumeset(v); r->ofcall.count = r->ifcall.count; return nil; } else if(f->file == &files[Qplaylist]){ if (debug & DbgPlayer){ fprint(2, "rwrite playlist: `"); write(2, r->ifcall.data, cnt); fprint(2, "'\n"); } playlist.data = realloc(playlist.data, playlist.ndata + cnt + 2); if (playlist.data == 0) sysfatal("realloc: %r"); memmove(playlist.data + playlist.ndata, r->ifcall.data, cnt); if (playlist.data[playlist.ndata + cnt-1] != '\n') playlist.data[playlist.ndata + cnt++] = '\n'; playlist.data[playlist.ndata + cnt] = '\0'; p = playlist.data + playlist.ndata; while (*p){ playlist.lines = realloc(playlist.lines, (playlist.nlines+1)*sizeof(playlist.lines[0])); if(playlist.lines == nil) sysfatal("realloc: %r"); playlist.lines[playlist.nlines] = playlist.ndata; q = strchr(p, '\n'); if (q == nil) break; if(debug & DbgPlayer) fprint(2, "[%lud]: ", playlist.nlines); playlist.nlines++; q++; if(debug & DbgPlayer) write(2, p, q-p); playlist.ndata += q - p; p = q; } f->file->dir.length = playlist.ndata; f->file->dir.qid.vers++; }else return Eperm; r->ofcall.count = r->ifcall.count; m.cmd = Check; m.off = 0; m.arg = nil; bcastmsg(f->file->workers, &m); return nil; } char * rclunk(Worker *w) { Fid *f; f = oldfid(w->r->ifcall.fid); if(f == nil) return Enotexist; f->flags &= ~(Open|Busy); return 0; } char * rremove(Worker*) { return Eperm; } char * rstat(Worker *w) { Req *r; r = w->r; r->fid = oldfid(r->ifcall.fid); if(r->fid == nil) return Enotexist; r->ofcall.nstat = convD2M(&r->fid->file->dir, r->indata, messagesize - IOHDRSZ); r->ofcall.stat = r->indata; return 0; } char * rwstat(Worker*) { return Eperm; } Fid * oldfid(int fid) { Fid *f; for(f = fids; f; f = f->next) if(f->fid == fid) return f; return nil; } Fid * newfid(int fid) { Fid *f, *ff; ff = nil; for(f = fids; f; f = f->next) if(f->fid == fid){ return f; }else if(ff == nil && (f->flags & Busy) == 0) ff = f; if(ff == nil){ ff = mallocz(sizeof *ff, 1); if (ff == nil) sysfatal("malloc: %r"); ff->next = fids; ff->readers = 0; fids = ff; } ff->fid = fid; ff->file = nil; ff->vers = ~0; return ff; } void work(Worker *w) { Req *r; char *err; int n; r = w->r; r->ofcall.data = (char*)r->indata; if(!fcalls[r->ifcall.type]) err = "bad fcall type"; else err = (*fcalls[r->ifcall.type])(w); if(err != (char*)~0){ /* ~0 indicates Flush received */ if(err){ r->ofcall.type = Rerror; r->ofcall.ename = err; }else{ r->ofcall.type = r->ifcall.type + 1; r->ofcall.fid = r->ifcall.fid; } r->ofcall.tag = r->ifcall.tag; if(debug & DbgFs) fprint(2, "io:->%F\n", &r->ofcall);/**/ n = convS2M(&r->ofcall, r->outdata, messagesize); if(write(srvfd[0], r->outdata, n) != n) sysfatal("mount write"); } reqfree(r); w->r = nil; } void worker(void *arg) { Worker *w; Wmsg m; w = arg; recv(w->eventc, &m); for(;;){ assert(m.cmd == Work); w->r = m.arg; if(debug & DbgWorker) fprint(2, "worker 0x%p:<-%F\n", w, &w->r->ifcall); work(w); if(debug & DbgWorker) fprint(2, "worker 0x%p wait for next\n", w); m = waitmsg(w, workers); } } void allocwork(Req *r) { Worker *w; Wmsg m; m.cmd = Work; m.off = 0; m.arg = r; if(sendmsg(workers, &m)) return; /* No worker ready to accept request, allocate one */ w = malloc(sizeof(Worker)); w->eventc = chancreate(sizeof(Wmsg), 1); if(debug & DbgWorker) fprint(2, "new worker 0x%p\n", w);/**/ threadcreate(worker, w, STACKSIZE); send(w->eventc, &m); } void srvio(void *arg) { char e[32]; int n; Req *r; Channel *dispatchc; threadsetname("file server IO"); dispatchc = arg; r = reqalloc(); for(;;){ /* * reading from a pipe or a network device * will give an error after a few eof reads * however, we cannot tell the difference * between a zero-length read and an interrupt * on the processes writing to us, * so we wait for the error */ n = read9pmsg(srvfd[0], r->indata, messagesize); if(n == 0) continue; if(n < 0){ rerrstr(e, sizeof e); if (strcmp(e, "interrupted") == 0){ if (debug & DbgFs) fprint(2, "read9pmsg interrupted\n"); continue; } sysfatal("srvio: %s", e); } if(convM2S(r->indata, n, &r->ifcall) == 0) continue; if(debug & DbgFs) fprint(2, "io:<-%F\n", &r->ifcall); sendp(dispatchc, r); r = reqalloc(); } } char * getplaylist(int n) { Wmsg m; m.cmd = Preq; m.off = n; m.arg = nil; send(playlistreq, &m); recv(playlistreq, &m); if(m.cmd == Error) return nil; assert(m.cmd == Prep); assert(m.arg); return m.arg; } void playlistsrv(void*) { Wmsg m; char *p, *q, *r; char *fields[2]; int n; /* Runs in the srv proc */ threadsetname("playlistsrv"); while(recv(playlistreq, &m)){ assert(m.cmd == Preq); m.cmd = Error; if(m.off < playlist.nlines){ p = playlist.data + playlist.lines[m.off]; q = strchr(p, '\n'); if (q == nil) sysfatal("playlistsrv: no newline character found"); n = q-p; r = malloc(n+1); memmove(r, p, n); r[n] = 0; tokenize(r, fields, nelem(fields)); assert(fields[0] == r); m.cmd = Prep; m.arg = r; } send(playlistreq, &m); } } void srv(void*) { Req *r; Channel *dispatchc; /* * This is the proc with all the action. * When a file request comes in, it is dispatched to this proc * for processing. Two extra threads field changes in play state * and volume state. * By keeping all the action in this proc, we won't need locks */ threadsetname("srv"); close(srvfd[1]); dispatchc = chancreate(sizeof(Req*), 1); procrfork(srvio, dispatchc, STACKSIZE, RFFDG); threadcreate(volumeupdater, nil, STACKSIZE); threadcreate(playupdater, nil, STACKSIZE); threadcreate(playlistsrv, nil, STACKSIZE); while(r = recvp(dispatchc)) allocwork(r); } void playupdater(void*) { Wmsg m; /* This is a thread in the srv proc */ while(recv(playchan, &m)){ if(debug & DbgPlayer) fprint(2, "playupdate: %s %d %s\n", statetxt[m.cmd], m.off, m.arg?m.arg:""); if(playstate.m == m.m) continue; if(m.cmd == Stop && m.off == 0xffff) m.off = playlist.nlines; if(m.cmd != Error){ playstate.m = m.m; m.cmd = Check; assert(m.arg == nil); } files[Qplayctl].dir.qid.vers++; bcastmsg(files[Qplayctl].workers, &m); } } void volumeupdater(void*) { Wmsg m; int v[nelem(volume)]; /* This is a thread in the srv proc */ while(recv(volumechan, v)){ if(debug & DbgPlayer) fprint(2, "volumeupdate: volume now %d %d %d %d\n", volume[0], volume[1], volume[2], volume[3]); memmove(volume, v, sizeof(volume)); files[Qplayvol].dir.qid.vers++; m.cmd = Check; m.arg = nil; bcastmsg(files[Qplayvol].workers, &m); } } void playupdate(Pmsg p, char *s) { Wmsg m; m.m = p.m; m.arg = s ? strdup(s) : nil; send(playchan, &m); }