/* * Test client to test the NBD server. Doesn't do anything useful, except * checking that the server does, actually, work. * * Note that the only 'real' test is to check the client against a kernel. If * it works here but does not work in the kernel, then that's most likely a bug * in this program and/or in nbd-server. * * Copyright(c) 2006 Wouter Verhelst * * This program is Free Software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free * Software Foundation, in version 2. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for * more details. * * You should have received a copy of the GNU General Public License along with * this program; if not, write to the Free Software Foundation, Inc., 51 * Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include "config.h" #include "lfs.h" #include #include #define MY_NAME "nbd-tester-client" #include "cliserv.h" static gchar errstr[1024]; const static int errstr_len=1024; static uint64_t size; static gchar * transactionlog = "nbd-tester-client.tr"; typedef enum { CONNECTION_TYPE_NONE, CONNECTION_TYPE_CONNECT, CONNECTION_TYPE_INIT_PASSWD, CONNECTION_TYPE_CLISERV, CONNECTION_TYPE_FULL, } CONNECTION_TYPE; typedef enum { CONNECTION_CLOSE_PROPERLY, CONNECTION_CLOSE_FAST, } CLOSE_TYPE; struct reqcontext { uint64_t seq; struct nbd_request req; struct reqcontext * next; struct reqcontext * prev; }; struct rclist { struct reqcontext * head; struct reqcontext * tail; int numitems; }; void rclist_unlink(struct rclist * l, struct reqcontext * p) { if (p && l) { struct reqcontext * prev = p->prev; struct reqcontext * next = p->next; /* Fix link to previous */ if (prev) prev->next = next; else l->head = next; if (next) next->prev = prev; else l->tail = prev; p->prev = NULL; p->next = NULL; l->numitems--; } } /* Add a new list item to the tail */ void rclist_addtail(struct rclist * l, struct reqcontext * p) { if (!p || !l) return; if (l->tail) { if (l->tail->next) g_warning("addtail found list tail has a next pointer"); l->tail->next = p; p->next = NULL; p->prev = l->tail; l->tail = p; } else { if (l->head) g_warning("addtail found no list tail but a list head"); l->head = p; l->tail = p; p->prev = NULL; p->next = NULL; } l->numitems++; } #define TEST_WRITE (1<<0) #define TEST_FLUSH (1<<1) int timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y) { if (x->tv_usec < y->tv_usec) { int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1; y->tv_usec -= 1000000 * nsec; y->tv_sec += nsec; } if (x->tv_usec - y->tv_usec > 1000000) { int nsec = (x->tv_usec - y->tv_usec) / 1000000; y->tv_usec += 1000000 * nsec; y->tv_sec -= nsec; } result->tv_sec = x->tv_sec - y->tv_sec; result->tv_usec = x->tv_usec - y->tv_usec; return x->tv_sec < y->tv_sec; } double timeval_diff_to_double (struct timeval * x, struct timeval * y) { struct timeval r; timeval_subtract(&r, x, y); return r.tv_sec * 1.0 + r.tv_usec/1000000.0; } static inline int read_all(int f, void *buf, size_t len) { ssize_t res; size_t retval=0; while(len>0) { if((res=read(f, buf, len)) <=0) { if (!res) errno=EAGAIN; snprintf(errstr, errstr_len, "Read failed: %s", strerror(errno)); return -1; } len-=res; buf+=res; retval+=res; } return retval; } static inline int write_all(int f, void *buf, size_t len) { ssize_t res; size_t retval=0; while(len>0) { if((res=write(f, buf, len)) <=0) { if (!res) errno=EAGAIN; snprintf(errstr, errstr_len, "Write failed: %s", strerror(errno)); return -1; } len-=res; buf+=res; retval+=res; } return retval; } #define READ_ALL_ERRCHK(f, buf, len, whereto, errmsg...) if((read_all(f, buf, len))<=0) { snprintf(errstr, errstr_len, ##errmsg); goto whereto; } #define READ_ALL_ERR_RT(f, buf, len, whereto, rval, errmsg...) if((read_all(f, buf, len))<=0) { snprintf(errstr, errstr_len, ##errmsg); retval = rval; goto whereto; } #define WRITE_ALL_ERRCHK(f, buf, len, whereto, errmsg...) if((write_all(f, buf, len))<=0) { snprintf(errstr, errstr_len, ##errmsg); goto whereto; } #define WRITE_ALL_ERR_RT(f, buf, len, whereto, rval, errmsg...) if((write_all(f, buf, len))<=0) { snprintf(errstr, errstr_len, ##errmsg); retval = rval; goto whereto; } int setup_connection(gchar *hostname, int port, gchar* name, CONNECTION_TYPE ctype, int* serverflags) { int sock; struct hostent *host; struct sockaddr_in addr; char buf[256]; uint64_t mymagic = (name ? opts_magic : cliserv_magic); u64 tmp64; uint32_t tmp32 = 0; sock=0; if(ctypeh_addr); if((connect(sock, (struct sockaddr *)&addr, sizeof(addr))<0)) { strncpy(errstr, strerror(errno), errstr_len); goto err_open; } if(ctype>10) & 15) == 3); int sendflush = (testflags & TEST_FLUSH) && (((i>>10) & 15) == 11); req.type=htonl((testflags & TEST_WRITE)?NBD_CMD_WRITE:NBD_CMD_READ); if (sendfua) req.type = htonl(NBD_CMD_WRITE | NBD_CMD_FLAG_FUA); memcpy(&(req.handle),&i,sizeof(i)); req.from=htonll(i); if (write_all(sock, &req, sizeof(req)) <0) { retval=-1; goto err_open; } if (testflags & TEST_WRITE) { if (write_all(sock, writebuf, 1024) <0) { retval=-1; goto err_open; } } printf("%d: Requests(+): %d\n", (int)mypid, ++requests); if (sendflush) { long long int j = i ^ (1LL<<63); req.type = htonl(NBD_CMD_FLUSH); memcpy(&(req.handle),&j,sizeof(j)); req.from=0; if (write_all(sock, &req, sizeof(req)) <0) { retval=-1; goto err_open; } printf("%d: Requests(+): %d\n", (int)mypid, ++requests); } } do { FD_ZERO(&set); FD_SET(sock, &set); tv.tv_sec=0; tv.tv_usec=0; select(sock+1, &set, NULL, NULL, &tv); if(FD_ISSET(sock, &set)) { /* Okay, there's something ready for * reading here */ if(read_packet_check_header(sock, (testflags & TEST_WRITE)?0:1024, i)<0) { retval=-1; goto err_open; } printf("%d: Requests(-): %d\n", (int)mypid, --requests); } } while FD_ISSET(sock, &set); /* Now wait until we can write again or until a second have * passed, whichever comes first*/ FD_ZERO(&set); FD_SET(sock, &set); tv.tv_sec=1; tv.tv_usec=0; do_write=select(sock+1,NULL,&set,NULL,&tv); if(!do_write) printf("Select finished\n"); if(do_write<0) { snprintf(errstr, errstr_len, "select: %s", strerror(errno)); retval=-1; goto err_open; } } /* Now empty the read buffer */ do { FD_ZERO(&set); FD_SET(sock, &set); tv.tv_sec=0; tv.tv_usec=0; select(sock+1, &set, NULL, NULL, &tv); if(FD_ISSET(sock, &set)) { /* Okay, there's something ready for * reading here */ read_packet_check_header(sock, (testflags & TEST_WRITE)?0:1024, i); printf("%d: Requests(-): %d\n", (int)mypid, --requests); } } while (requests); if(gettimeofday(&stop, NULL)<0) { retval=-1; snprintf(errstr, errstr_len, "Could not measure end time: %s", strerror(errno)); goto err_open; } timespan=timeval_diff_to_double(&stop, &start); speed=size/timespan; if(speed>1024) { speed=speed/1024.0; speedchar[0]='K'; } if(speed>1024) { speed=speed/1024.0; speedchar[0]='M'; } if(speed>1024) { speed=speed/1024.0; speedchar[0]='G'; } g_message("%d: Throughput %s test (%s flushes) complete. Took %.3f seconds to complete, %.3f%sib/s", (int)getpid(), (testflags & TEST_WRITE)?"write":"read", (testflags & TEST_FLUSH)?"with":"without", timespan, speed, speedchar); err_open: if(close_sock) { close_connection(sock, CONNECTION_CLOSE_PROPERLY); } err: return retval; } /* * fill 512 byte buffer 'buf' with a hashed selection of interesting data based * only on handle and blknum. The first word is blknum, and the second handle, for ease * of understanding. Things with handle 0 are blank. */ static inline void makebuf(char *buf, uint64_t seq, uint64_t blknum) { uint64_t x = ((uint64_t)blknum) ^ (seq << 32) ^ (seq >> 32); uint64_t* p = (uint64_t*)buf; int i; if (!seq) { bzero(buf, 512); return; } for (i = 0; i<512/sizeof(uint64_t); i++) { int s; *(p++) = x; x+=0xFEEDA1ECDEADBEEFULL+i+(((uint64_t)i)<<56); s = x & 63; x = x ^ (x<>(64-s)) ^ 0xAA55AA55AA55AA55ULL ^ seq; } } static inline int checkbuf(char *buf, uint64_t seq, uint64_t blknum) { char cmp[512]; makebuf(cmp, seq, blknum); return memcmp(cmp, buf, 512)?-1:0; } static inline void dumpcommand(char * text, uint32_t command) { #ifdef DEBUG_COMMANDS command=ntohl(command); char * ctext; switch (command & NBD_CMD_MASK_COMMAND) { case NBD_CMD_READ: ctext="NBD_CMD_READ"; break; case NBD_CMD_WRITE: ctext="NBD_CMD_WRITE"; break; case NBD_CMD_DISC: ctext="NBD_CMD_DISC"; break; case NBD_CMD_FLUSH: ctext="NBD_CMD_FLUSH"; break; default: ctext="UNKNOWN"; break; } printf("%s: %s [%s] (0x%08x)\n", text, ctext, (command & NBD_CMD_FLAG_FUA)?"FUA":"NONE", command); #endif } int integrity_test(gchar* hostname, int port, char* name, int sock, char sock_is_open, char close_sock, int testflags) { struct nbd_reply rep; fd_set rset; fd_set wset; struct timeval tv; struct timeval start; struct timeval stop; double timespan; double speed; char speedchar[2] = { '\0', '\0' }; int retval=0; int serverflags = 0; pid_t G_GNUC_UNUSED mypid = getpid(); int blkhashfd = -1; char *blkhashname=NULL; uint32_t *blkhash = NULL; int logfd=-1; uint64_t seq=1; uint64_t processed=0; uint64_t printer=0; int readtransactionfile = 1; struct rclist txqueue={NULL, NULL, 0}; struct rclist inflight={NULL, NULL, 0}; size=0; if(!sock_is_open) { if((sock=setup_connection(hostname, port, name, CONNECTION_TYPE_FULL, &serverflags))<0) { g_warning("Could not open socket: %s", errstr); retval=-1; goto err; } } if ((serverflags & (NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA)) != (NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA)) g_warning("Server flags do not support FLUSH and FUA - these may error"); #ifdef HAVE_MKSTEMP blkhashname=strdup("/tmp/blkarray-XXXXXX"); if (!blkhashname || (-1 == (blkhashfd = mkstemp(blkhashname)))) { g_warning("Could not open temp file: %s", strerror(errno)); retval=-1; goto err; } #else /* use tmpnam here to avoid further feature test nightmare */ if (-1 == (blkhashfd = open(blkhashname=strdup(tmpnam(NULL)), O_CREAT | O_RDWR, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH))) { g_warning("Could not open temp file: %s", strerror(errno)); retval=-1; goto err; } #endif /* Ensure space freed if we die */ if (-1 == unlink(blkhashname)) { g_warning("Could not unlink temp file: %s", strerror(errno)); retval=-1; goto err; } if (-1 == lseek(blkhashfd, (off_t)((size>>9)<<2), SEEK_SET)) { g_warning("Could not llseek temp file: %s", strerror(errno)); retval=-1; goto err; } if (-1 == write(blkhashfd, "\0", 1)) { g_warning("Could not write temp file: %s", strerror(errno)); retval=-1; goto err; } if (NULL == (blkhash = mmap(NULL, (size>>9)<<2, PROT_READ | PROT_WRITE, MAP_SHARED, blkhashfd, 0))) { g_warning("Could not mmap temp file: %s", strerror(errno)); retval=-1; goto err; } if (-1 == (logfd = open(transactionlog, O_RDONLY))) { g_warning("Could open log file: %s", strerror(errno)); retval=-1; goto err; } if(gettimeofday(&start, NULL)<0) { retval=-1; snprintf(errstr, errstr_len, "Could not measure start time: %s", strerror(errno)); goto err_open; } while (readtransactionfile || txqueue.numitems || inflight.numitems) { int ret; uint32_t magic; uint32_t command; uint64_t from; uint32_t len; struct reqcontext * prc; *errstr=0; FD_ZERO(&wset); FD_ZERO(&rset); if (readtransactionfile) FD_SET(logfd, &rset); if (txqueue.numitems) FD_SET(sock, &wset); if (inflight.numitems) FD_SET(sock, &rset); tv.tv_sec=5; tv.tv_usec=0; ret = select(1+((sock>logfd)?sock:logfd), &rset, &wset, NULL, &tv); if (ret == 0) { retval=-1; snprintf(errstr, errstr_len, "Timeout reading from socket"); goto err_open; } else if (ret<0) { g_warning("Could not mmap temp file: %s", errstr); retval=-1; goto err; } /* We know we've got at least one thing to do here then */ /* Get a command from the transaction log */ if (FD_ISSET(logfd, &rset)) { /* Read a request or reply from the transaction file */ READ_ALL_ERRCHK(logfd, &magic, sizeof(magic), err_open, "Could not read transaction log: %s", strerror(errno)); magic = ntohl(magic); switch (magic) { case NBD_REQUEST_MAGIC: if (NULL == (prc = calloc(1, sizeof(struct reqcontext)))) { retval=-1; snprintf(errstr, errstr_len, "Could not allocate request"); goto err_open; } READ_ALL_ERRCHK(logfd, sizeof(magic)+(char *)&(prc->req), sizeof(struct nbd_request)-sizeof(magic), err_open, "Could not read transaction log: %s", strerror(errno)); prc->req.magic = htonl(NBD_REQUEST_MAGIC); prc->seq=seq++; if ((ntohl(prc->req.type) & NBD_CMD_MASK_COMMAND) == NBD_CMD_DISC) { /* no more to read; don't enqueue as no reply * we will disconnect manually at the end */ readtransactionfile = 0; free (prc); } else { dumpcommand("Enqueuing command", prc->req.type); rclist_addtail(&txqueue, prc); } prc = NULL; break; case NBD_REPLY_MAGIC: READ_ALL_ERRCHK(logfd, sizeof(magic)+(char *)(&rep), sizeof(struct nbd_reply)-sizeof(magic), err_open, "Could not read transaction log: %s", strerror(errno)); if (rep.error) { retval=-1; snprintf(errstr, errstr_len, "Transaction log file contained errored transaction"); goto err_open; } /* We do not need to consume data on a read reply as there is * none in the log */ break; default: retval=-1; snprintf(errstr, errstr_len, "Could not measure start time: %08x", magic); goto err_open; } } /* See if we have a write we can do */ if (FD_ISSET(sock, &wset)) { prc = txqueue.head; if (!prc) g_warning("Socket write FD set but we shouldn't have been interested"); else { rclist_unlink(&txqueue, prc); rclist_addtail(&inflight, prc); if (ntohl(prc->req.magic) != NBD_REQUEST_MAGIC) { retval=-1; g_warning("Asked to write a reply without a magic number"); goto err_open; } dumpcommand("Sending command", prc->req.type); command = ntohl(prc->req.type); from = ntohll(prc->req.from); len = ntohl(prc->req.len); /* we rewrite the handle as they otherwise may not be unique */ *((uint64_t*)(prc->req.handle))=htonll((uint64_t)prc); WRITE_ALL_ERRCHK(sock, &(prc->req), sizeof(struct nbd_request), err_open, "Could not write command: %s", strerror(errno)); switch (command & NBD_CMD_MASK_COMMAND) { case NBD_CMD_WRITE: while (len > 0) { uint64_t blknum = from>>9; char dbuf[512]; if (from>=size) { snprintf(errstr, errstr_len, "offset %llx beyond size %llx", (long long int) from, (long long int)size); goto err_open; } /* work out what we should be writing */ makebuf(dbuf, prc->seq, blknum); WRITE_ALL_ERRCHK(sock, dbuf, 512, err_open, "Could not write data: %s", strerror(errno)); from += 512; len -= 512; } case NBD_CMD_DISC: case NBD_CMD_READ: case NBD_CMD_FLUSH: break; default: retval=-1; snprintf(errstr, errstr_len, "Incomprehensible command: %08x", command); goto err_open; break; } prc = NULL; } } /* See if there is a reply to be processed from the socket */ if(FD_ISSET(sock, &rset)) { /* Okay, there's something ready for * reading here */ READ_ALL_ERRCHK(sock, &rep, sizeof(struct nbd_reply), err_open, "Could not read from server socket: %s", strerror(errno)); if (rep.magic != htonl(NBD_REPLY_MAGIC)) { retval=-1; snprintf(errstr, errstr_len, "Bad magic from server"); goto err_open; } if (rep.error) { retval=-1; snprintf(errstr, errstr_len, "Server errored a transaction"); goto err_open; } prc=(struct reqcontext *)ntohll(*((uint64_t *)rep.handle)); if (prc->req.magic != htonl(NBD_REQUEST_MAGIC)) { retval=-1; snprintf(errstr, errstr_len, "Bad magic in inflight data: %08x", prc->req.magic); goto err_open; } dumpcommand("Processing reply to command", prc->req.type); command = ntohl(prc->req.type); from = ntohll(prc->req.from); len = ntohl(prc->req.len); switch (command & NBD_CMD_MASK_COMMAND) { case NBD_CMD_READ: while (len > 0) { uint64_t blknum = from>>9; char dbuf[512]; if (from>=size) { snprintf(errstr, errstr_len, "offset %llx beyond size %llx", (long long int) from, (long long int)size); goto err_open; } READ_ALL_ERRCHK(sock, dbuf, 512, err_open, "Could not read data: %s", strerror(errno)); /* work out what we was written */ if (checkbuf(dbuf, blkhash[blknum], blknum)) { retval=-1; snprintf(errstr, errstr_len, "Bad reply data: seq %08x", blkhash[blknum]); goto err_open; } from += 512; len -= 512; } break; case NBD_CMD_WRITE: /* subsequent reads should get data with this seq*/ while (len > 0) { uint64_t blknum = from>>9; blkhash[blknum]=(uint32_t)(prc->seq); from += 512; len -= 512; } break; default: break; } processed++; rclist_unlink(&inflight, prc); prc->req.magic=0; /* so a duplicate reply is detected */ free(prc); } if (!(printer++ % 10000) || !(readtransactionfile || txqueue.numitems || inflight.numitems) ) printf("%d: Seq %08lld Queued: %08d Inflight: %08d Done: %08lld\n", (int)mypid, (long long int) seq, txqueue.numitems, inflight.numitems, (long long int) processed); } if (gettimeofday(&stop, NULL)<0) { retval=-1; snprintf(errstr, errstr_len, "Could not measure end time: %s", strerror(errno)); goto err_open; } timespan=timeval_diff_to_double(&stop, &start); speed=size/timespan; if(speed>1024) { speed=speed/1024.0; speedchar[0]='K'; } if(speed>1024) { speed=speed/1024.0; speedchar[0]='M'; } if(speed>1024) { speed=speed/1024.0; speedchar[0]='G'; } g_message("%d: Integrity %s test complete. Took %.3f seconds to complete, %.3f%sib/s", (int)getpid(), (testflags & TEST_WRITE)?"write":"read", timespan, speed, speedchar); err_open: if(close_sock) { close_connection(sock, CONNECTION_CLOSE_PROPERLY); } err: if (size && blkhash) munmap(blkhash, (size>>9)<<2); if (blkhashfd != -1) close (blkhashfd); if (logfd != -1) close (logfd); if (blkhashname) free(blkhashname); if (*errstr) g_warning("%s",errstr); return retval; } typedef int (*testfunc)(gchar*, int, char*, int, char, char, int); int main(int argc, char**argv) { gchar *hostname; long int p = 0; char* name = NULL; int sock=0; int c; bool want_port = TRUE; int nonopt=0; int testflags=0; testfunc test = throughput_test; if(argc<3) { g_message("%d: Not enough arguments", (int)getpid()); g_message("%d: Usage: %s ", (int)getpid(), argv[0]); g_message("%d: Or: %s -N ", (int)getpid(), argv[0]); exit(EXIT_FAILURE); } logging(); while((c=getopt(argc, argv, "-N:t:owfi"))>=0) { switch(c) { case 1: switch(nonopt) { case 0: hostname=g_strdup(optarg); nonopt++; break; case 1: if(want_port) p=(strtol(argv[2], NULL, 0)); if(p==LONG_MIN||p==LONG_MAX) { g_critical("Could not parse port number: %s", strerror(errno)); exit(EXIT_FAILURE); } break; } break; case 'N': name=g_strdup(optarg); p = 10809; want_port = false; break; case 't': transactionlog=g_strdup(optarg); break; case 'o': test=oversize_test; break; case 'w': testflags|=TEST_WRITE; break; case 'f': testflags|=TEST_FLUSH; break; case 'i': test=integrity_test; break; } } if(test(hostname, (int)p, name, sock, FALSE, TRUE, testflags)<0) { g_warning("Could not run test: %s", errstr); exit(EXIT_FAILURE); } return 0; }