/* * Copyright (c) 2013, Coraid, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Coraid nor the * names of its contributors may be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL CORAID BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include <9p.h> #include "dat.h" enum { CDirty = 1, CFlushing = 2, CFree = 4, CBrelease = 1, CBclean, CBread, CBwrite, CCanfree, CReset, Ncht = 4001, Nreaders = 4, Ridle = 0, Rloading = 1, }; typedef struct CBlock CBlock; typedef struct Cachereq Cachereq; typedef struct Cacheresp Cacheresp; typedef struct Reader Reader; struct CBlock { Ref ref; uvlong blkno; uchar buf[BlkSize]; int flags; CBlock *next, *prev; CBlock *htnext, *htprev; CBlock *wnext, *wprev; }; struct Cachereq { int req; uvlong blk; Channel *resp; }; struct Cacheresp { int res; void *p; }; struct Reader { char *dev; Channel *rdchan; int state; uvlong loading; }; static int mypid; static Channel *wbtrigger; static CBlock *ht[Ncht]; static CBlock *chd, *ctl; static CBlock *whd, *wtl; static CBlock *freehd; static int maxcache; static Ref ncache; static Ref ndirty; static Ref nwlist; static int timertid; static uvlong nmiss; static uvlong nread; static uvlong nwrite; static ulong hrate; static int syncing; static Reader rds[Nreaders]; static Lock calock; Channel *cachechan; /* * Because all the allocs and frees are done in threads of the * same process, we shouldn't need any locks */ static CBlock * cballoc(void) { CBlock *p; if(freehd == nil) return θmalloc(sizeof(CBlock)); lock(&calock); p = freehd; freehd = p->next; if(!(p->flags & CFree)) fprint(2, "Internal error: non-free block on free list\n"); p->flags &= ~CFree; p->next = nil; unlock(&calock); return p; } static void cbfree(CBlock *p) { if(p->flags & CFree) fprint(2, "Freeing already free block?!?!?\n"); else if(p->ref.ref != 0) fprint(2, "Freeing in use block\n"); else if(p->next || p->prev || p->htnext || p->htprev || p->wnext || p->wprev) fprint(2, "Freeing block in data structures\n"); else { lock(&calock); p->flags |= CFree; p->next = freehd; freehd = p; unlock(&calock); } } static CBlock * lookup(uvlong blk) { CBlock *p; int idx; idx = blk % Ncht; for(p = ht[idx]; p && p->blkno != blk; p = p->htnext) ; return p; } static void updatestats(int hit) { if(!hit) { ++nmiss; hrate = (999 * hrate + 500) / 1000; } else hrate = (999 * hrate + 500) / 1000 + 1000; } static void insht(CBlock *p) { int idx; idx = p->blkno % Ncht; if(ht[idx]) ht[idx]->htprev = p; p->htnext = ht[idx]; ht[idx] = p; } static void rmht(CBlock *p) { CBlock *nxt, *prv; int idx; idx = p->blkno % Ncht; nxt = p->htnext; prv = p->htprev; if(nxt) nxt->htprev = prv; if(prv) prv->htnext = nxt; if(ht[idx] == p) ht[idx] = nxt; p->htnext = nil; p->htprev = nil; } static void inslru(CBlock *p) { if(chd == nil) chd = p; else ctl->next = p; p->prev = ctl; p->next = nil; ctl = p; incref(&ncache); } static void rmlru(CBlock *p) { if(p->next) p->next->prev = p->prev; else ctl = p->prev; if(p->prev) p->prev->next = p->next; else chd = p->next; p->next = nil; p->prev = nil; decref(&ncache); } static void insw(CBlock *p) { if(whd == nil) whd = p; else wtl->wnext = p; p->wprev = wtl; p->wnext = nil; wtl = p; incref(&nwlist); } static void rmw(CBlock *p) { if(p->wnext) p->wnext->wprev = p->wprev; else wtl = p->wprev; if(p->wprev) p->wprev->wnext = p->wnext; else whd = p->wnext; p->wnext = nil; p->wprev = nil; decref(&nwlist); } static void mvlru(CBlock *p) { if(p != ctl) { rmlru(p); inslru(p); } } static void dolru(void) { CBlock *p; int i; if(ncache.ref < maxcache) return; for(p = chd, i = 0; p && (p->ref.ref > 0 || (p->flags & (CDirty | CFlushing))); p = p->next, ++i) if(i > ncache.ref) { fprint(2, "cycle in LRU list? n:%ld d:%ld\n", ncache.ref, ndirty.ref); chd->prev = nil; ctl->next = nil; return; } if(p) { rmht(p); rmlru(p); if(p->flags & CDirty) { decref(&ndirty); p->flags &= ~CDirty; rmw(p); } cbfree(p); } } static long _iopread(va_list *arg) { void *a; vlong off; long n; int fd; fd = va_arg(*arg, int); a = va_arg(*arg, void*); n = va_arg(*arg, long); off = va_arg(*arg, vlong); return pread(fd, a, n, off); } static long _iopwrite(va_list *arg) { void *a; vlong off; long n; int fd; fd = va_arg(*arg, int); a = va_arg(*arg, void*); n = va_arg(*arg, long); off = va_arg(*arg, vlong); return pwrite(fd, a, n, off); } static void wbtimer(void *) { while(!shutdown) { sleep(15000); if(syncing != 1) sendul(wbtrigger, 1); } sendul(wbtrigger, 1); } static void wbthread(void *d) { Ioproc *wbio; CBlock *p; char *dev; int fd; dev = d; wbio = ioproc(); fd = ioopen(wbio, dev, ORDWR); if(fd < 0) sysfatal("wb open: %r"); while(!shutdown) { recvul(wbtrigger); syncing = 1; do { for(p = whd; p && p->ref.ref > 0 && p->blkno >= super.firstdat; p = p->wnext) ; if(p) { p->flags |= CFlushing; p->flags &= ~CDirty; decref(&ndirty); rmw(p); ++nwrite; iocall(wbio, _iopwrite, fd, p->buf, BlkSize, p->blkno * BlkSize); p->flags &= ~CFlushing; } } while(p); syncing = 0; } ioclose(wbio, fd); closeioproc(wbio); } static int _brelease(uvlong blk) { CBlock *p; int rv; rv = 0; p = lookup(blk); if(p) { if(p->ref.ref == 0) { fprint(2, "trying to decrement below 0: blk %ulld\n", blk); rv = -1; } else decref(&p->ref); } else rv = -1; dolru(); return rv; } static void * _cbclean(uvlong blk) { CBlock *p; p = lookup(blk); if(p) { memset(p->buf, 0, BlkSize); mvlru(p); incref(&p->ref); updatestats(1); return p->buf; } updatestats(0); dolru(); p = cballoc(); memset(p->buf, 0, BlkSize); p->blkno = blk; incref(&p->ref); insht(p); inslru(p); return p->buf; } static void reader(void *a) { Cachereq r; Cacheresp rsp; Ioproc *cio; Reader *rp; CBlock *p; int cfd, i; rp = a; cio = ioproc(); cfd = ioopen(cio, rp->dev, ORDWR); if(cfd < 0) sysfatal("Couldn't open device: %r"); while(1) { if(recv(rp->rdchan, &r) == 0) { if(shutdown) { closeioproc(cio); threadexits(nil); } continue; } /* * See if it got loaded while it was in the channel queue */ p = lookup(r.blk); if(p) { mvlru(p); incref(&p->ref); updatestats(1); rsp.p = p->buf; send(r.resp, &rsp); continue; } /* * If another reader is already loading this block, pass off the request * to that reader. That way, by the time this request gets looked at * again, the block will already be loaded. */ for(i = 0; i < Nreaders && (rds[i].state != Rloading || rds[i].loading != r.blk); ++i) ; if(i < Nreaders) { send(rds[i].rdchan, &r); continue; } rp->state = Rloading; rp->loading = r.blk; dolru(); p = cballoc(); p->blkno = r.blk; incref(&p->ref); ++nread; if(iocall(cio, _iopread, cfd, p->buf, BlkSize, r.blk * BlkSize) != BlkSize) { rp->state = Ridle; cbfree(p); rsp.p = nil; send(r.resp, &rsp); continue; } insht(p); inslru(p); rp->state = Ridle; rsp.p = p->buf; send(r.resp, &rsp); } } static void _cbread(Cachereq *r) { Cacheresp rsp; CBlock *p; static int rr; p = lookup(r->blk); if(p) { mvlru(p); incref(&p->ref); updatestats(1); rsp.p = p->buf; send(r->resp, &rsp); return; } updatestats(0); send(rds[rr].rdchan, r); ++rr; if(rr >= Nreaders) rr = 0; } static void _cbwrite(uvlong blk) { CBlock *p; p = lookup(blk); if(p) { mvlru(p); if(!(p->flags & CDirty)) { p->flags |= CDirty; incref(&ndirty); insw(p); } } if(ndirty.ref > ncache.ref / 10 && !syncing) nbsendul(wbtrigger, 1); } static int _ccanfree(uvlong blk) { CBlock *p; p = lookup(blk); if(p) { if(p->ref.ref > 0 /* || (p->flags & (CDirty | CFlushing)) */ ) { fprint(2, "Wanting to free block %ulld with ref %ld and flags %x\n", blk, p->ref.ref, p->flags); return 0; } if(p->flags & CDirty) { decref(&ndirty); rmw(p); p->flags &= ~CDirty; } rmht(p); rmlru(p); cbfree(p); } return 1; } static void _resetcache(void) { CBlock *p; while(1) { for(p = chd; p && p->ref.ref > 0; p = p->next) ; if(p == nil) break; rmht(p); rmlru(p); cbfree(p); } if(chd) fprint(2, "warning: active blocks during reset\n"); } static void handler(void *) { Cacheresp rsp; Cachereq r; mypid = threadpid(threadid()); while(1) { if(recv(cachechan, &r) == 0) { if(shutdown) threadexits(nil); continue; } switch(r.req) { case CBrelease: rsp.res = _brelease(r.blk); if(r.resp) send(r.resp, &rsp); break; case CBclean: rsp.p = _cbclean(r.blk); send(r.resp, &rsp); break; case CBread: _cbread(&r); break; case CBwrite: _cbwrite(r.blk); send(r.resp, &rsp); break; case CCanfree: rsp.res = _ccanfree(r.blk); send(r.resp, &rsp); break; case CReset: _resetcache(); send(r.resp, &rsp); break; } } } void initcache(char *dev, int m) { int i; maxcache = m; for(i = 0; i < Nreaders; ++i) { rds[i].dev = dev; rds[i].rdchan = chancreate(sizeof(Cachereq), 10); threadcreate(reader, &rds[i], 8192); } cachechan = chancreate(sizeof(Cachereq), 2); threadcreate(handler, nil, 8192); wbtrigger = chancreate(sizeof(ulong), 2); threadcreate(wbthread, dev, 8192); timertid = proccreate(wbtimer, nil, 1024); } void haltcache(void) { int i; for(i = 0; i < Nreaders; ++i) chanclose(rds[i].rdchan); chanclose(cachechan); threadkill(timertid); sendul(wbtrigger, 1); for(i = 0; i < 30 && whd; ++i) { fprint(2, "."); sleep(1000); } } int brelease(uvlong blk) { Cachereq r; if(mypid == threadpid(threadid())) // return _brelease(blk); { int n; n=_brelease(blk); if(n==-1) fprint(2, "brelease error called from %p\n", getcallerpc(&blk)); return n; } r.req = CBrelease; r.blk = blk; r.resp = nil; send(cachechan, &r); return 0; } void * cbclean(uvlong blk) { Cachereq r; Cacheresp rsp; if(mypid == threadpid(threadid())) return _cbclean(blk); r.req = CBclean; r.blk = blk; r.resp = chancreate(sizeof(Cacheresp), 0); send(cachechan, &r); recv(r.resp, &rsp); chanfree(r.resp); return rsp.p; } void * cbread(uvlong blk) { Cachereq r; Cacheresp rsp; CBlock *p; if(mypid == threadpid(threadid())) { p = lookup(blk); if(p) { mvlru(p); incref(&p->ref); updatestats(1); return p->buf; } } r.req = CBread; r.blk = blk; r.resp = chancreate(sizeof(Cacheresp), 0); send(cachechan, &r); recv(r.resp, &rsp); chanfree(r.resp); return rsp.p; } void cbwrite(uvlong blk) { Cachereq r; Cacheresp rsp; if(mypid == threadpid(threadid())) { _cbwrite(blk); return; } r.req = CBwrite; r.blk = blk; r.resp = chancreate(sizeof(Cacheresp), 0); send(cachechan, &r); recv(r.resp, &rsp); chanfree(r.resp); } int ccanfree(uvlong blk) { Cachereq r; Cacheresp rsp; if(mypid == threadpid(threadid())) return _ccanfree(blk); r.req = CCanfree; r.blk = blk; r.resp = chancreate(sizeof(Cacheresp), 0); send(cachechan, &r); recv(r.resp, &rsp); chanfree(r.resp); return rsp.res; } int cread(void *a, int n, uvlong off) { uchar *p; uvlong blk; ulong boff; blk = off / BlkSize; boff = off % BlkSize; if(boff + n > BlkSize) { fprint(2, "invalid block crossing\n"); return -1; } p = cbread(blk); if(p == nil) return -1; memmove(a, p + boff, n); brelease(blk); return n; } int cwrite(void *a, int n, uvlong off) { uchar *p; uvlong blk; ulong boff; blk = off / BlkSize; if(blk == 0) return -1; boff = off % BlkSize; if(boff + n > BlkSize) { fprint(2, "invalid block crossing\n"); return -1; } p = cbread(blk); if(p == nil) return -1; memmove(p + boff, a, n); cbwrite(blk); brelease(blk); return n; } void csync(void) { syncing = 2; threadint(timertid); while(syncing != 0) yield(); } static char cstatbuf[1024]; char * prcstat(void) { CBlock *cb; char *p, *e; int ldirty, i, nhash; int refhist[10]; int saidit = 0; ldirty = 0; p = cstatbuf; e = p + nelem(cstatbuf); memset(refhist, 0, 10 * sizeof(int)); p = seprint(p, e, "Cache stats:\n"); p = seprint(p, e, "ncache: %ld\n", ncache.ref); p = seprint(p, e, "nwlist: %ld\n", nwlist.ref); p = seprint(p, e, "ndirty: %ld\n", ndirty.ref); for(cb = chd; cb; cb = cb->next) { if(cb->flags & CDirty) { if(!saidit) {p = seprint(p, e, "dirty block ref:%ld blk:%ulld\n", cb->ref.ref, cb->blkno); ++saidit;} ++ldirty; } if(cb->ref.ref < 0) { p = seprint(p, e, "bad ref count: %ld on block %ulld; setting to 0\n", cb->ref.ref, cb->blkno); cb->ref.ref = 0; } else if(cb->ref.ref >= 9) ++refhist[9]; else ++refhist[cb->ref.ref]; if(cb->ref.ref > 0) p = seprint(p, e, "In use block: %ulld flags %ux\n", cb->blkno, cb->flags); } nhash = 0; for(i = 0; i < Ncht; ++i) { for(cb = ht[i]; cb; cb = cb->htnext) ++nhash; } p = seprint(p, e, "nhash: %d\n", nhash); p = seprint(p, e, "ldirty: %d\n", ldirty); p = seprint(p, e, "nread: %ulld\n", nread); p = seprint(p, e, "nwrite: %ulld\n", nwrite); p = seprint(p, e, "nmiss: %ulld\n", nmiss); p = seprint(p, e, "hit rate: %uld%%\n", (hrate + 5000) / 10000); p = seprint(p, e, "ref count histogram:\n"); p = seprint(p, e, " 0 1 2 3 4 5 6 7 8 >8\n"); for(i = 0; i < 10; ++i) p = seprint(p, e, "%4d ", refhist[i]); seprint(p, e, "\n"); return cstatbuf; } void resetcache(void) { Cachereq r; Cacheresp rsp; if(mypid == threadpid(threadid())) { _resetcache(); return; } r.req = CReset; r.resp = chancreate(sizeof(Cacheresp), 0); send(cachechan, &r); recv(r.resp, &rsp); chanfree(r.resp); }