diff -ruN nodeos-20000905/envs/anepdump.c nodeos/envs/anepdump.c --- nodeos-20000905/envs/anepdump.c Wed Dec 31 17:00:00 1969 +++ nodeos/envs/anepdump.c Tue Sep 26 15:25:42 2000 @@ -0,0 +1,179 @@ +/* + * Copyright (c) 1999, 2000 The University of Utah and the Flux Group. + * All rights reserved. + * This file is part of Janos. + * + * Janos is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2, as + * published by the Free Software Foundation. + * + * Janos is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details, in the file + * COPYING. If that file is not present, write to the Free + * Software Foundation, 59 Temple Place - Suite 330, Boston, MA + * 02111-1307, USA + * + * The University of Utah and the Flux Group requests users to + * return to flux-dist@cs.utah.edu any improvements that they make. + */ + +/* + * EE to test stdin reading of ANEP packets + */ + +#define RUNFOR 5 +#define NBUFS 100 + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +static void ee_init(void *arg); +an_flowinit_func_t InitEE = { ee_init }; + +static void setup_channels(void); +static void ee_aneppacket(an_pbuf_t *pbuf, void *arg); +static void ee_die(void *event, void *arg); +static void anep_dump(void *data, int size); + +static an_chan_t *anepchan, anepchan_data; +static an_pbuf_t *anepbuf[NBUFS], anepbuf_data[NBUFS]; + +static struct { + an_chanaddr_t addr; + an_chanproto_t proto; + an_cpuspec_t cpu; +} attr; +static an_flow_t *flow; + +int +ee_main(int argc, const char **argv) +{ + an_error_t err; + + flow = an_flow_current(); + printf("%p/%p: Dumping ANEP packets from stdin...\n", + flow, an_thread_current()); + + /* + * Setup channels + */ + setup_channels(); + + /* + * Setup an event to time us out in RUNFOR seconds + */ + err = an_event_schedule(ee_die, 0, RUNFOR * 1000000); + assert(err == 0); + printf("Timer started, running for %d seconds\n", RUNFOR); + + /* + * No longer need this thread + */ + return 0; +} + +static void +setup_channels(void) +{ + int i; + + memset(&attr, 0, sizeof attr); + + /* + * Create packet buffers + */ + for (i = 0; i < NBUFS; i++) { + anepbuf[i] = + an_pbuf_create(&anepbuf_data[i], AN_PBUF_STDIO, 0, 0); + assert(anepbuf[i] != 0); + } + + /* + * and channel + */ + strcpy(attr.proto.data, "stdio:anep"); + attr.proto.len = strlen(attr.proto.data); + /* XXX no addr info */ + attr.cpu.ttype = AN_TH_IMPLICIT; + attr.cpu.threads.im.maxthreads = 1; + anepchan = an_inchan_create(&anepchan_data, flow, 0, + &attr.proto, &attr.addr, anepbuf, NBUFS, + &attr.cpu, ee_aneppacket, 0); + assert(anepchan != 0); +} + +static void +ee_aneppacket(an_pbuf_t *pbuf, void *arg) +{ + anep_dump(an_pbuf_data(pbuf), an_pbuf_size(pbuf)); + an_inchan_addpbuf(anepchan, pbuf); +} + +static void +ee_die(void *event, void *arg) +{ + an_chanstats_t stats; + an_error_t err; + + err = an_chan_stats(anepchan, &stats); + assert(err == 0); + printf("All done!\n"); + + printf("Request: %d pkts (%d pkts/sec), %d bytes (%d bytes/sec)\n", + stats.inpackets, stats.inpackets/RUNFOR, + stats.inbytes, stats.inbytes/RUNFOR); + printf(" : %d queued, %d copied, %d dropped\n", + stats.inbufqueued, stats.inbufcopies, stats.droppackets); + printf(" : %d no bufs, %d/%d failed thread/buf\n", + stats.dropnobufs, stats.dropfailedthread, + stats.dropfailedbuf); + + if (anepchan) + an_inchan_destroy(anepchan); + an_flow_destroy(an_flow_current()); +} + +static void +ee_init(void *arg) +{ + int status; + int argc = 1; + const char *argv[] = { "ChantestEE" }; + + status = ee_main(argc, argv); +} + +static void +anep_dump(void *data, int size) +{ + struct anephdr { + char ah_version; + char ah_flags; + short ah_typeid; + short ah_hlen; + short ah_plen; + } *ah = data; + static int pnum; + + if (size < sizeof *ah) { + printf("*** ANEP packet too small (%d bytes), ignored\n", size); + return; + } + + printf("%d: version=%d, flags=0x%x, type=%d, hlen=%d, plen=%d\n", + ++pnum, ah->ah_version, ah->ah_flags, + ntohs(ah->ah_typeid), ntohs(ah->ah_hlen), ntohs(ah->ah_plen)); + if (ah->ah_version != 1) + printf(" *** bad version number (%d), should be 1\n", + ah->ah_version); +} diff -ruN nodeos-20000905/envs/envs.mk nodeos/envs/envs.mk --- nodeos-20000905/envs/envs.mk Fri Aug 25 21:53:20 2000 +++ nodeos/envs/envs.mk Tue Sep 26 15:28:49 2000 @@ -20,6 +20,9 @@ # EES = null flowtest chantest eventtest socktest threadtest packsched rootflow +ifneq ($(HOST_OS),oskit) +EES += anepdump +endif ifneq ($(HOST_OS),solaris) EES += pingreply forward #cutpingreply #ipforward endif @@ -109,6 +112,11 @@ EE=packsched EESRCS=packsched.c +EELDFLAGS= +include $(SRCDIR)/define_env.mk + +EE=anepdump +EESRCS=anepdump.c EELDFLAGS= include $(SRCDIR)/define_env.mk diff -ruN nodeos-20000905/nodeos/an_pbuf.h nodeos/nodeos/an_pbuf.h --- nodeos-20000905/nodeos/an_pbuf.h Mon Aug 21 13:52:17 2000 +++ nodeos/nodeos/an_pbuf.h Tue Sep 26 15:09:53 2000 @@ -66,6 +66,7 @@ #define AN_PBUF_ETH 1 /* for ethernet packets */ #define AN_PBUF_SOCK 2 /* for UDP packets */ #define AN_PBUF_CLOCK 3 /* for "clock" packets */ +#define AN_PBUF_STDIO 4 /* for "stdio" packets */ /* * Initializes the provided memory as a pbuffer object. diff -ruN nodeos-20000905/src/an_info.c nodeos/src/an_info.c --- nodeos-20000905/src/an_info.c Fri Aug 25 21:53:22 2000 +++ nodeos/src/an_info.c Tue Sep 26 14:50:06 2000 @@ -104,6 +104,9 @@ #ifdef CHAN_IF strcat(options_str, "CHAN_IF "); #endif +#ifdef CHAN_STDIO + strcat(options_str, "CHAN_STDIO "); +#endif } LW_NODEOS_LEAVE; diff -ruN nodeos-20000905/src/an_pbuf.c nodeos/src/an_pbuf.c --- nodeos-20000905/src/an_pbuf.c Fri Aug 25 21:53:23 2000 +++ nodeos/src/an_pbuf.c Tue Sep 26 15:08:55 2000 @@ -52,6 +52,7 @@ case AN_PBUF_ETH: case AN_PBUF_SOCK: case AN_PBUF_CLOCK: + case AN_PBUF_STDIO: bsize = NODEOS_CHAN_BUFSIZE; break; default: diff -ruN nodeos-20000905/src/chan_stdio.c nodeos/src/chan_stdio.c --- nodeos-20000905/src/chan_stdio.c Wed Dec 31 17:00:00 1969 +++ nodeos/src/chan_stdio.c Tue Sep 26 15:23:58 2000 @@ -0,0 +1,405 @@ +/* + * Copyright (c) 2000 The University of Utah and the Flux Group. + * All rights reserved. + * This file is part of Janos. + * + * Janos is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2, as + * published by the Free Software Foundation. + * + * Janos is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details, in the file + * COPYING. If that file is not present, write to the Free + * Software Foundation, 59 Temple Place - Suite 330, Boston, MA + * 02111-1307, USA + * + * The University of Utah and the Flux Group requests users to + * return to flux-dist@cs.utah.edu any improvements that they make. + */ + +/* + * Stdio channel driver. + * Supports reading raw packets from stdin (for anetd). + * Could also transmit on stdout. + * + * Note that we don't actually use stdio routines. + */ +#include "options.h" + +#ifdef CHAN_STDIO + +#include +#include +#include +#include +#include +#include +#ifdef OSKIT_UNIX +#include +#endif + +#include +#include +#include +#include +#include +#include "demux.h" +#include "ani_debug.h" +#include "os.h" + +struct stdiodata { + demux_t *demux;/* list of demuxers for this socket */ + int refs; /* refcount */ + int flags; /* flags */ + ani_thread_t thread;/* service thread */ + ani_icond_t cond; /* synch with service thread */ + ani_ilock_t lock; /* protects the demux list */ +}; +#define ANI_SD_THREAD_DYING 0x01 +#define ANI_SD_THREAD_DEAD 0x02 + +static struct stdiodata stdindata; +static void chan_stdio_intr(void *arg, void *arg2); + +void +chan_stdio_init(void) +{ + ani_ilock_init(&stdindata.lock); + ani_icond_init(&stdindata.cond); +#ifdef OSKIT_UNIX + /* + * XXX OSKit assumes stdin is coming from a console tty device + * so we have to turn off tty processing. + */ + { + struct termios t; + + if (tcgetattr(STDIN_FILENO, &t) == 0) { + t.c_oflag &= ~OPOST; + t.c_lflag &= ~(ICANON|ECHO); + tcsetattr(STDIN_FILENO, TCSANOW, &t); + } + } +#endif +} + +ani_error_t +chan_stdio_create(ani_chan_t *chan) +{ + ani_error_t err; + char *cp; + int len; + + if (ani_chan_type(chan) != CHAN_IN) + return ani_error_set(EINVAL, "only support stdio in channels"); + + cp = ani_chan_name(chan); + len = ani_chan_namelen(chan); + if (strncmp(cp, "stdio:anep", len) != 0) + return ani_error_set(EINVAL, "only support ANEP stdio channels"); + + if (++stdindata.refs == 1) { + err = ani_thread_init(&stdindata.thread, 0, 0); + if (err) + return err; + err = ani_threadpool_osthread_start( + ani_flow_threadpool(ani_flow_root_flow()), + &stdindata.thread, 1, + chan_stdio_intr, 0, 0); + if (ani_error_occured(err)) + return err; + } + + return ani_error_clear(); +} + +void +chan_stdio_destroy(ani_chan_t *chan) +{ + assert_ani_chan_valid(chan); + assert(ani_chan_type(chan) == CHAN_IN); + + ani_ilock_lock(&stdindata.lock); + if (--stdindata.refs == 0) { + stdindata.flags |= ANI_SD_THREAD_DYING; + while ((stdindata.flags & ANI_SD_THREAD_DEAD) == 0) + ani_icond_wait(&stdindata.cond, &stdindata.lock); + stdindata.flags = 0; + ani_thread_deinit(&stdindata.thread); + } + ani_ilock_unlock(&stdindata.lock); +} + +ani_error_t +chan_stdio_enable(ani_chan_t *chan) +{ + ani_error_t err; + + assert_ani_chan_valid(chan); + assert(ani_chan_type(chan) == CHAN_IN); + + ani_ilock_lock(&stdindata.lock); + err = ani_chan_demux_add(chan, &stdindata.demux); + ani_ilock_unlock(&stdindata.lock); + if (ani_error_occured(err) && chan->tdata.in.demux != 0) { +#if 0 + ani_demuxkey_destroy(chan->tdata.in.demux); +#endif + chan->tdata.in.demux = 0; + } + + return err; +} + +void +chan_stdio_disable(ani_chan_t *chan) +{ + assert_ani_chan_valid(chan); + assert(ani_chan_type(chan) == CHAN_IN); + + ani_ilock_lock(&stdindata.lock); + ani_chan_demux_remove(chan, &stdindata.demux); + ani_ilock_unlock(&stdindata.lock); + + if (chan->tdata.in.demux != 0) { +#if 0 + ani_demuxkey_destroy(chan->tdata.in.demux); +#endif + chan->tdata.in.demux = 0; + } +} + +void +chan_stdio_send(ani_chan_t *chan, ani_pbuf_t *pbuf) +{ + panic("chan_stdio_send called!"); +} + +/* + * We wake up the receiver thread peroidically to see if the receiving thread + * needs to be shutdown (i.e., all using channels have been destroyed). + */ +static struct timeval timeo; +static fd_set fdset; + +/* + * Loop reading til we get the specified number of bytes or til all channels + * have died. Returns zero if the indicated data has been read, -1 on EOF or + * an error ow. + */ +static int +chan_stdio_readbytes(void *buf, size_t nbytes) +{ + int fd = STDIN_FILENO; + size_t size; + int err = 0; + + /* + * Do all the same crap we do for a socket channel. + * Set a select timeout so we check periodically for channel death. + * + * XXX this should get broken out into fd_chan_* for both stdin + * and sockets. + */ + while (nbytes > 0) { + /* + * Must reinitialize fdset for each call + */ + FD_ZERO(&fdset); + FD_SET(fd, &fdset); + + /* + * and the timeout (for Linux) + */ + timeo.tv_sec = 0; + timeo.tv_usec = 500000; /* 500ms */ + + /* + * Only one fd in the set, so set the nfds parameter + * to +1 that fd. + */ + switch (select(fd+1, &fdset, 0, 0, &timeo)) { + case 1: + /* + * Data is available, read as much as was requested + */ + size = read(fd, buf, nbytes); + err = size < 0 ? errno : 0; + break; + case 0: + /* + * We got a timeout, go check for channel death + */ + size = 0; + err = EAGAIN; + break; + default: + /* + * We got an error, we will exit for sure + */ + size = -1; + err = errno; + break; + } + + /* + * Check to see if any channels still exist. + * We do this without locking since the DYING flag won't + * be cleared while we are still alive. + */ + if (stdindata.flags & ANI_SD_THREAD_DYING) { + err = ECHILD; /* XXX */ + break; + } + + /* + * Got an error or EOF. + * If error was a periodic timeout, ignore it and keep reading. + * Otherwise, return an error so we can exit. + */ + if (size <= 0) { + if (err == EAGAIN) + continue; + err = -1; + break; + } + + nbytes -= size; + buf += size; + } + + return err; +} + +/* + * XXX + */ +struct anephdr { + char ah_version; + char ah_flags; + short ah_typeid; + short ah_hlen; + short ah_plen; +}; + +/* + * Receiver thread for stdio inChan. + * Applies the demux filters to determine a channel to + * deliver to and fires up a channel thread to deal with it. + */ +static void +chan_stdio_intr(void *arg, void *arg2) +{ + void *bdata; + ssize_t bsize, size; + ani_error_t err; + int ierr; + ani_pbuf_t *rbuf, *pbuf; + ani_chan_t *chan; + + /* + * Locking forces synchronization with our creator + */ + ani_ilock_lock(&stdindata.lock); + ani_ilock_unlock(&stdindata.lock); + + /* + * Create an initial read buffer. We swap this buffer with + * empty ones from stdin's channels when we hand off packets. + */ + rbuf = ani_mempool_alloc(ani_mempool_rootpool(), sizeof *rbuf); + assert(rbuf != 0); + err = ani_pbuf_init(rbuf, ani_flow_current(), 0, + NODEOS_CHAN_BUFSIZE, 0, NODEOS_CHAN_BUFSIZE); + assert(!ani_error_occured(err)); + bdata = ani_pbuf_data(rbuf); + bsize = ani_pbuf_size(rbuf); + + while (1) { + chan = 0; + + /* + * Read enough of the packet to find the ANEP total length. + */ + ierr = chan_stdio_readbytes(bdata, sizeof(struct anephdr)); + if (ierr != 0) + break; + + size = ntohs(((struct anephdr *)bdata)->ah_plen); + assert(size >= sizeof(struct anephdr)); + assert(size <= bsize); + + /* + * Now read the rest of the packet + */ + ierr = chan_stdio_readbytes(bdata + sizeof(struct anephdr), + size - sizeof(struct anephdr)); + if (ierr != 0) + break; + + /* + * Valid packet. Locate the target channel for this data. + * If none, just drop it and continue. + */ + ani_ilock_lock(&stdindata.lock); + chan = demux_apply(stdindata.demux, bdata, size); + ani_ilock_unlock(&stdindata.lock); + if (chan == 0) + continue; + + /* + * Allocate a packet buffer from the channel. + * If none are available, just drop the packet. + */ + ani_chan_lock(chan); + err = ani_chan_pbuf_alloc(chan, NODEOS_CHAN_BUFSIZE, &pbuf); + if (ani_error_occured(err)) { + ani_chan_unlock(chan); + continue; + } + + /* + * Swap the buffer obtained from the channel with our buffer. + * The channel gets the full buffer in exchange for one of its + * empty ones that we will use for the next read. + */ + ani_pbuf_setsize(rbuf, size, 0); + ani_pbuf_swap(rbuf, pbuf); + + /* + * Finally, fire up a handler thread. + */ + err = ani_chan_thread_start(chan, pbuf); + if (ani_error_occured(err)) + ani_chan_pbuf_free(chan, pbuf); + + ani_chan_unlock(chan); + + /* + * Reset the buffer pointer/size since we have a new + * physical buffer. + */ + bdata = ani_pbuf_data(rbuf); + bsize = ani_pbuf_size(rbuf); + } + + /* + * We have broken out of the read. Most likely because stdin + * was closed. Free the read buffer, mark that we are dead, + * inform any interested party, and exit. + */ + ani_pbuf_deinit(rbuf); + ani_ilock_lock(&stdindata.lock); + stdindata.flags |= ANI_SD_THREAD_DEAD; + if (stdindata.flags & ANI_SD_THREAD_DYING) + ani_icond_broadcast(&stdindata.cond); + else + /* unexpected exit */ + printf("stdin: receive thread died: %s\n", + ierr == -1 ? "EOF" : strerror(ierr)); + ani_ilock_unlock(&stdindata.lock); +} + +#endif diff -ruN nodeos-20000905/src/channel.c nodeos/src/channel.c --- nodeos-20000905/src/channel.c Fri Aug 25 21:53:24 2000 +++ nodeos/src/channel.c Tue Sep 26 14:49:13 2000 @@ -58,6 +58,14 @@ extern ani_error_t chan_socket_enable(ani_chan_t *chan); extern void chan_socket_disable(ani_chan_t *chan); #endif +#ifdef CHAN_STDIO +extern void chan_stdio_init(void); +extern ani_error_t chan_stdio_create(ani_chan_t *chan); +extern void chan_stdio_destroy(ani_chan_t *chan); +extern void chan_stdio_send(ani_chan_t *chan, ani_pbuf_t *pbuf); +extern ani_error_t chan_stdio_enable(ani_chan_t *chan); +extern void chan_stdio_disable(ani_chan_t *chan); +#endif /* * Protocol-specific processing modules. @@ -90,6 +98,13 @@ "socket", chan_socket_init, chan_socket_create, chan_socket_destroy, chan_socket_send, chan_socket_enable, chan_socket_disable +}, +#endif +#ifdef CHAN_STDIO +{ + "stdio", chan_stdio_init, + chan_stdio_create, chan_stdio_destroy, chan_stdio_send, + chan_stdio_enable, chan_stdio_disable }, #endif { 0, 0, 0, 0, 0, 0, 0 } diff -ruN nodeos-20000905/src/nodeos.mk nodeos/src/nodeos.mk --- nodeos-20000905/src/nodeos.mk Fri Aug 25 21:53:25 2000 +++ nodeos/src/nodeos.mk Tue Sep 26 15:11:22 2000 @@ -25,7 +25,7 @@ SRCS = main.c flow.c threadpool.c thread.c credential.c resource.c \ mempool.c channel.c pbuffer.c synch.c demux.c event.c \ demuxkey.c hnode.c if.c ani_time.c \ - chan_clock.c chan_eth.c chan_socket.c \ + chan_clock.c chan_eth.c chan_socket.c chan_stdio.c \ an_mutex.c an_cond.c an_time.c an_info.c \ an_cred.c an_flow.c an_thread.c an_mempool.c \ an_chan.c an_demuxkey.c an_pbuf.c \ diff -ruN nodeos-20000905/src/options.h nodeos/src/options.h --- nodeos-20000905/src/options.h Fri Aug 4 15:08:01 2000 +++ nodeos/src/options.h Tue Sep 26 14:50:45 2000 @@ -26,14 +26,23 @@ * Channel types, choose all that matter */ #define CHAN_CLOCK -#if defined(OSKIT) + +#if defined(OSKIT) && !defined(OSKIT_UNIX) + /* Can only have one of ETH and socket defined. Choose: */ #undef CHAN_ETH #define CHAN_SOCKET + +/* Doesn't really make sense */ +#undef CHAN_STDIO + #else -/* On unicies, do both. */ + +/* On unicies, do everything. */ #define CHAN_ETH #define CHAN_SOCKET +#define CHAN_STDIO + #endif /*