/* network i/o */ #include "all.h" #include "io.h" #include /* 9p2000 */ #include enum { Maxfdata = 8192, Nqueue = 200, /* queue size (tunable) */ Netclosed = 0, /* Connection state */ Netopen, }; /* * the kernel file server read packets directly from * its ethernet(s) and did all the protocol processing. * if the incoming packets were 9p (over il/ip), they * were queued for the server processes to operate upon. * * in user mode, we have one process per incoming connection * instead, and those processes get just the data, minus * tcp and ip headers, so they just see a stream of 9p messages, * which they then queue for the server processes. * * there used to be more queueing (in the kernel), with separate * processes for ethernet input, il input, 9p processing, il output * and ethernet output, and queues connecting them. we now let * the kernel's network queues, protocol stacks and processes do * much of this work. * * partly as a result of this, we can now process 9p messages * transported via tcp, exploit multiple x86 processors, and * were able to shed 70% of the file server's source, by line count. * * the upshot is that Ether (now Network) is no longer a perfect fit for * the way network i/o is done now. the notion of `connection' * is being introduced to complement it. */ typedef struct Network Network; typedef struct Netconn Netconn; typedef struct Conn9p Conn9p; /* a network, not necessarily an ethernet */ struct Network { int ctlrno; char iname[NAMELEN]; char oname[NAMELEN]; char *dialstr; char anndir[40]; char lisdir[40]; int annfd; /* fd from announce */ }; /* an open tcp (or other transport) connection */ struct Netconn { Queue* reply; /* network output */ char* raddr; /* remote caller's addr */ Chan* chan; /* list of tcp channels */ int alloc; /* flag: allocated */ int state; Conn9p* conn9p; /* not reference-counted */ Lock; }; /* * incoming 9P network connection from a given machine. * typically will multiplex 9P sessions for multiple users. */ struct Conn9p { QLock; Ref; int fd; char* dir; Netconn*netconn; /* cross-connection */ char* raddr; }; static Network netif[Maxnets]; static struct { Lock; Chan* chan; } netchans; static Queue *netoq; /* only one network output queue is needed */ char *annstrs[Maxnets] = { "tcp!*!9fs", }; /* never returns nil */ static Chan* getchan(Conn9p *conn9p) { Netconn *netconn; Chan *cp, *xcp; lock(&netchans); /* look for conn9p's Chan */ xcp = nil; for(cp = netchans.chan; cp; cp = netconn->chan) { netconn = cp->pdata; if(!netconn->alloc) xcp = cp; /* remember free Chan */ else if(netconn->raddr != nil && strcmp(conn9p->raddr, netconn->raddr) == 0) { unlock(&netchans); return cp; /* found conn9p's Chan */ } } /* conn9p's Chan not found; if no free Chan, allocate & fill in one */ cp = xcp; if(cp == nil) { cp = fs_chaninit(Devnet, 1, sizeof(Netconn)); netconn = cp->pdata; netconn->chan = netchans.chan; netconn->state = Netopen; /* a guess */ /* cross-connect netconn and conn9p */ netconn->conn9p = conn9p; /* not reference-counted */ conn9p->netconn = netconn; netchans.chan = cp; } /* fill in Chan's netconn */ netconn = cp->pdata; netconn->raddr = strdup(conn9p->raddr); /* fill in Chan */ cp->send = serveq; if (cp->reply == nil) cp->reply = netoq; netconn->reply = netoq; cp->protocol = nil; cp->msize = 0; cp->whotime = 0; strncpy(cp->whochan, conn9p->raddr, sizeof cp->whochan); // cp->whoprint = tcpwhoprint; netconn->alloc = 1; unlock(&netchans); return cp; } static char * fd2name(int fd) { char data[128]; if (fd2path(fd, data, sizeof data) < 0) return strdup("/GOK"); return strdup(data); } static void hangupdfd(int dfd) { int ctlfd; char *end, *data; data = fd2name(dfd); close(dfd); end = strstr(data, "/data"); if (end != nil) strcpy(end, "/ctl"); ctlfd = open(data, OWRITE); if (ctlfd >= 0) { hangup(ctlfd); close(ctlfd); } free(data); } void closechan(int n) { Chan *cp; for(cp = chans; cp; cp = cp->next) if(cp->whotime != 0 && cp->chan == n) fileinit(cp); } void nethangup(Chan *cp, char *msg, int dolock) { Netconn *netconn; netconn = cp->pdata; netconn->state = Netclosed; if(msg != nil) print("hangup! %s %s\n", msg, netconn->raddr); fileinit(cp); cp->whotime = 0; strcpy(cp->whoname, ""); if(dolock) lock(&netchans); netconn->alloc = 0; free(netconn->raddr); netconn->raddr = nil; if(dolock) unlock(&netchans); } void chanhangup(Chan *cp, char *msg, int dolock) { Netconn *netconn = cp->pdata; Conn9p *conn9p = netconn->conn9p; if (conn9p->fd > 0) hangupdfd(conn9p->fd); /* drop it */ nethangup(cp, msg, dolock); } /* * returns length of next 9p message (including the length) and * leaves it in the first few bytes of abuf. */ static long size9pmsg(int fd, void *abuf, uint n) { int m; uchar *buf = abuf; if (n < BIT32SZ) return -1; /* caller screwed up */ /* read count */ m = readn(fd, buf, BIT32SZ); if(m != BIT32SZ){ if(m < 0) return -1; return 0; } return GBIT32(buf); } static int readalloc9pmsg(int fd, Msgbuf **mbp) { int m, len; uchar lenbuf[BIT32SZ]; Msgbuf *mb; *mbp = nil; len = size9pmsg(fd, lenbuf, BIT32SZ); if (len <= 0) return len; if(len <= BIT32SZ || len > IOHDRSZ+Maxfdata){ werrstr("bad length in 9P2000 message header"); return -1; } if ((mb = mballoc(len, nil, Mbeth1)) == nil) panic("readalloc9pmsg: mballoc failed"); *mbp = mb; memmove(mb->data, lenbuf, BIT32SZ); len -= BIT32SZ; m = readn(fd, mb->data+BIT32SZ, len); if(m < len) return 0; return BIT32SZ+m; } static void connection(void *v) { int n; char buf[64]; Chan *chan9p; Conn9p *conn9p = v; Msgbuf *mb; NetConnInfo *nci; incref(conn9p); /* count connections */ nci = getnetconninfo(conn9p->dir, conn9p->fd); if (nci == nil) panic("connection: getnetconninfo(%s, %d) failed", conn9p->dir, conn9p->fd); conn9p->raddr = nci->raddr; chan9p = getchan(conn9p); print("new connection on %s pid %d from %s\n", conn9p->dir, getpid(), conn9p->raddr); /* * reading from a pipe or a network device * will give an error after a few eof reads. * however, we cannot tell the difference * between a zero-length read and an interrupt * on the processes writing to us, * so we wait for the error. */ while (conn9p->fd > 0 && (n = readalloc9pmsg(conn9p->fd, &mb)) >= 0) { if(n == 0) continue; mb->param = (uintptr)conn9p; /* has fd for replies */ mb->chan = chan9p; assert(mb->magic == Mbmagic); incref(conn9p); /* & count packets in flight */ fs_send(serveq, mb); /* to 9P server processes */ /* mb will be freed by receiving process */ } rerrstr(buf, sizeof buf); qlock(conn9p); print("connection hung up from %s\n", conn9p->dir); if (conn9p->fd > 0) /* not poisoned yet? */ hangupdfd(conn9p->fd); /* poison the fd */ nethangup(chan9p, "remote hung up", 1); closechan(chan9p->chan); conn9p->fd = -1; /* poison conn9p */ if (decref(conn9p) == 0) { /* last conn.? turn the lights off */ free(conn9p->dir); qunlock(conn9p); free(conn9p); } else qunlock(conn9p); freenetconninfo(nci); if(buf[0] == '\0' || strstr(buf, "hungup") != nil) exits(""); sysfatal("mount read, pid %d", getpid()); } static void neti(void *v) { int lisfd, accfd; Network *net; Conn9p *conn9p; net = v; print("net%di\n", net->ctlrno); for(;;) { lisfd = listen(net->anndir, net->lisdir); if (lisfd < 0) { print("listen %s failed: %r\n", net->anndir); continue; } /* got new call on lisfd */ accfd = accept(lisfd, net->lisdir); if (accfd < 0) { print("accept %d (from %s) failed: %r\n", lisfd, net->lisdir); continue; } /* accepted that call */ conn9p = malloc(sizeof *conn9p); conn9p->dir = strdup(net->lisdir); conn9p->fd = accfd; newproc(connection, conn9p, smprint("9P read %s", conn9p->dir)); close(lisfd); } } /* only need one of these for all network connections, thus all interfaces */ static void neto(void *) { int len, datafd; Msgbuf *mb; Conn9p *conn9p; print("neto\n"); for(;;) { /* receive 9P answer from 9P server processes */ while((mb = fs_recv(netoq, 0)) == nil) continue; if(mb->data == nil) { print("neto: pkt nil cat=%d free=%d\n", mb->category, mb->flags&FREE); if(!(mb->flags & FREE)) mbfree(mb); continue; } /* send answer back over the network connection in the reply */ len = mb->count; conn9p = (Conn9p *)mb->param; assert(conn9p); qlock(conn9p); datafd = conn9p->fd; assert(len >= 0); /* datafd < 0 probably indicates poisoning by the read side */ if (datafd < 0 || write(datafd, mb->data, len) != len) { print( "network write error (%r);"); print(" closing connection for %s\n", conn9p->dir); nethangup(getchan(conn9p), "network write error", 1); if (datafd > 0) hangupdfd(datafd); /* drop it */ conn9p->fd = -1; /* poison conn9p */ } mbfree(mb); if (decref(conn9p) == 0) panic("neto: zero ref count"); qunlock(conn9p); } } void netstart(void) { int netorun = 0; Network *net; if(netoq == nil) netoq = newqueue(Nqueue, "network reply"); for(net = &netif[0]; net < &netif[Maxnets]; net++){ if(net->dialstr == nil) continue; sprint(net->oname, "neto"); if (netorun++ == 0) newproc(neto, nil, net->oname); sprint(net->iname, "net%di", net->ctlrno); newproc(neti, net, net->iname); } } void netinit(void) { Network *net; for (net = netif; net < netif + Maxnets; net++) { net->dialstr = annstrs[net - netif]; if (net->dialstr == nil) continue; net->annfd = announce(net->dialstr, net->anndir); /* /bin/service/tcp564 may already have grabbed the port */ if (net->annfd < 0) sysfatal("can't announce %s: %r", net->dialstr); print("netinit: announced on %s\n", net->dialstr); } }