X-Git-Url: http://git.alex.org.uk diff --git a/nbd-tester-client.c b/nbd-tester-client.c index 40a6363..b5801ee 100644 --- a/nbd-tester-client.c +++ b/nbd-tester-client.c @@ -23,23 +23,31 @@ */ #include #include +#include #include #include #include #include +#include +#include +#include #include #include #include "config.h" #include "lfs.h" -#define MY_NAME "nbd-tester-client" -#include "cliserv.h" - #include #include +#define MY_NAME "nbd-tester-client" +#include "cliserv.h" + static gchar errstr[1024]; const static int errstr_len=1024; +static uint64_t size; + +static gchar * transactionlog = "nbd-tester-client.tr"; + typedef enum { CONNECTION_TYPE_NONE, CONNECTION_TYPE_CONNECT, @@ -53,12 +61,101 @@ typedef enum { CONNECTION_CLOSE_FAST, } CLOSE_TYPE; -inline int read_all(int f, void *buf, size_t len) { +struct reqcontext { + uint64_t seq; + struct nbd_request req; + struct reqcontext * next; + struct reqcontext * prev; +}; + +struct rclist { + struct reqcontext * head; + struct reqcontext * tail; + int numitems; +}; + +void rclist_unlink(struct rclist * l, struct reqcontext * p) { + if (p && l) { + struct reqcontext * prev = p->prev; + struct reqcontext * next = p->next; + + /* Fix link to previous */ + if (prev) + prev->next = next; + else + l->head = next; + + if (next) + next->prev = prev; + else + l->tail = prev; + + p->prev = NULL; + p->next = NULL; + l->numitems--; + } +} + +/* Add a new list item to the tail */ +void rclist_addtail(struct rclist * l, struct reqcontext * p) +{ + if (!p || !l) + return; + if (l->tail) { + if (l->tail->next) + g_warning("addtail found list tail has a next pointer"); + l->tail->next = p; + p->next = NULL; + p->prev = l->tail; + l->tail = p; + } else { + if (l->head) + g_warning("addtail found no list tail but a list head"); + l->head = p; + l->tail = p; + p->prev = NULL; + p->next = NULL; + } + l->numitems++; +} + +#define TEST_WRITE (1<<0) +#define TEST_FLUSH (1<<1) + +int timeval_subtract (struct timeval *result, struct timeval *x, + struct timeval *y) { + if (x->tv_usec < y->tv_usec) { + int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1; + y->tv_usec -= 1000000 * nsec; + y->tv_sec += nsec; + } + + if (x->tv_usec - y->tv_usec > 1000000) { + int nsec = (x->tv_usec - y->tv_usec) / 1000000; + y->tv_usec += 1000000 * nsec; + y->tv_sec -= nsec; + } + + result->tv_sec = x->tv_sec - y->tv_sec; + result->tv_usec = x->tv_usec - y->tv_usec; + + return x->tv_sec < y->tv_sec; +} + +double timeval_diff_to_double (struct timeval * x, struct timeval * y) { + struct timeval r; + timeval_subtract(&r, x, y); + return r.tv_sec * 1.0 + r.tv_usec/1000000.0; +} + +static inline int read_all(int f, void *buf, size_t len) { ssize_t res; size_t retval=0; while(len>0) { if((res=read(f, buf, len)) <=0) { + if (!res) + errno=EAGAIN; snprintf(errstr, errstr_len, "Read failed: %s", strerror(errno)); return -1; } @@ -69,12 +166,38 @@ inline int read_all(int f, void *buf, size_t len) { return retval; } -int setup_connection(gchar *hostname, int port, CONNECTION_TYPE ctype) { +static inline int write_all(int f, void *buf, size_t len) { + ssize_t res; + size_t retval=0; + + while(len>0) { + if((res=write(f, buf, len)) <=0) { + if (!res) + errno=EAGAIN; + snprintf(errstr, errstr_len, "Write failed: %s", strerror(errno)); + return -1; + } + len-=res; + buf+=res; + retval+=res; + } + return retval; +} + +#define READ_ALL_ERRCHK(f, buf, len, whereto, errmsg...) if((read_all(f, buf, len))<=0) { snprintf(errstr, errstr_len, ##errmsg); goto whereto; } +#define READ_ALL_ERR_RT(f, buf, len, whereto, rval, errmsg...) if((read_all(f, buf, len))<=0) { snprintf(errstr, errstr_len, ##errmsg); retval = rval; goto whereto; } + +#define WRITE_ALL_ERRCHK(f, buf, len, whereto, errmsg...) if((write_all(f, buf, len))<=0) { snprintf(errstr, errstr_len, ##errmsg); goto whereto; } +#define WRITE_ALL_ERR_RT(f, buf, len, whereto, rval, errmsg...) if((write_all(f, buf, len))<=0) { snprintf(errstr, errstr_len, ##errmsg); retval = rval; goto whereto; } + +int setup_connection(gchar *hostname, int port, gchar* name, CONNECTION_TYPE ctype, int* serverflags) { int sock; struct hostent *host; struct sockaddr_in addr; char buf[256]; + uint64_t mymagic = (name ? opts_magic : cliserv_magic); u64 tmp64; + uint32_t tmp32 = 0; sock=0; if(ctype>10) & 15) == 3); + int sendflush = (testflags & TEST_FLUSH) && (((i>>10) & 15) == 11); + req.type=htonl((testflags & TEST_WRITE)?NBD_CMD_WRITE:NBD_CMD_READ); + if (sendfua) + req.type = htonl(NBD_CMD_WRITE | NBD_CMD_FLAG_FUA); + memcpy(&(req.handle),&i,sizeof(i)); req.from=htonll(i); - write(sock, &req, sizeof(req)); - printf("Requests(+): %d\n", ++requests); + if (write_all(sock, &req, sizeof(req)) <0) { + retval=-1; + goto err_open; + } + if (testflags & TEST_WRITE) { + if (write_all(sock, writebuf, 1024) <0) { + retval=-1; + goto err_open; + } + } + printf("%d: Requests(+): %d\n", (int)mypid, ++requests); + if (sendflush) { + long long int j = i ^ (1LL<<63); + req.type = htonl(NBD_CMD_FLUSH); + memcpy(&(req.handle),&j,sizeof(j)); + req.from=0; + if (write_all(sock, &req, sizeof(req)) <0) { + retval=-1; + goto err_open; + } + printf("%d: Requests(+): %d\n", (int)mypid, ++requests); + } } do { FD_ZERO(&set); @@ -259,11 +482,11 @@ int throughput_test(gchar* hostname, int port, int sock, char sock_is_open, char if(FD_ISSET(sock, &set)) { /* Okay, there's something ready for * reading here */ - if(read_packet_check_header(sock, 1024, i)<0) { + if(read_packet_check_header(sock, (testflags & TEST_WRITE)?0:1024, i)<0) { retval=-1; goto err_open; } - printf("Requests(-): %d\n", --requests); + printf("%d: Requests(-): %d\n", (int)mypid, --requests); } } while FD_ISSET(sock, &set); /* Now wait until we can write again or until a second have @@ -290,8 +513,8 @@ int throughput_test(gchar* hostname, int port, int sock, char sock_is_open, char if(FD_ISSET(sock, &set)) { /* Okay, there's something ready for * reading here */ - read_packet_check_header(sock, 1024, i); - printf("Requests(-): %d\n", --requests); + read_packet_check_header(sock, (testflags & TEST_WRITE)?0:1024, i); + printf("%d: Requests(-): %d\n", (int)mypid, --requests); } } while (requests); if(gettimeofday(&stop, NULL)<0) { @@ -299,21 +522,21 @@ int throughput_test(gchar* hostname, int port, int sock, char sock_is_open, char snprintf(errstr, errstr_len, "Could not measure end time: %s", strerror(errno)); goto err_open; } - timespan=stop.tv_sec-start.tv_sec+(stop.tv_usec-start.tv_usec)/1000000; - speed=(int)(size/timespan); + timespan=timeval_diff_to_double(&stop, &start); + speed=size/timespan; if(speed>1024) { - speed>>=10; + speed=speed/1024.0; speedchar[0]='K'; } if(speed>1024) { - speed>>=10; + speed=speed/1024.0; speedchar[0]='M'; } if(speed>1024) { - speed>>=10; + speed=speed/1024.0; speedchar[0]='G'; } - g_message("Throughput test complete. Took %.3f seconds to complete, %d%sB/s",timespan,speed,speedchar); + g_message("%d: Throughput %s test (%s flushes) complete. Took %.3f seconds to complete, %.3f%sib/s", (int)getpid(), (testflags & TEST_WRITE)?"write":"read", (testflags & TEST_FLUSH)?"with":"without", timespan, speed, speedchar); err_open: if(close_sock) { @@ -323,28 +546,525 @@ err: return retval; } +/* + * fill 512 byte buffer 'buf' with a hashed selection of interesting data based + * only on handle and blknum. The first word is blknum, and the second handle, for ease + * of understanding. Things with handle 0 are blank. + */ +static inline void makebuf(char *buf, uint64_t seq, uint64_t blknum) { + uint64_t x = ((uint64_t)blknum) ^ (seq << 32) ^ (seq >> 32); + uint64_t* p = (uint64_t*)buf; + int i; + if (!seq) { + bzero(buf, 512); + return; + } + for (i = 0; i<512/sizeof(uint64_t); i++) { + int s; + *(p++) = x; + x+=0xFEEDA1ECDEADBEEFULL+i+(((uint64_t)i)<<56); + s = x & 63; + x = x ^ (x<>(64-s)) ^ 0xAA55AA55AA55AA55ULL ^ seq; + } +} + +static inline int checkbuf(char *buf, uint64_t seq, uint64_t blknum) { + char cmp[512]; + makebuf(cmp, seq, blknum); + return memcmp(cmp, buf, 512)?-1:0; +} + +static inline void dumpcommand(char * text, uint32_t command) +{ +#ifdef DEBUG_COMMANDS + command=ntohl(command); + char * ctext; + switch (command & NBD_CMD_MASK_COMMAND) { + case NBD_CMD_READ: + ctext="NBD_CMD_READ"; + break; + case NBD_CMD_WRITE: + ctext="NBD_CMD_WRITE"; + break; + case NBD_CMD_DISC: + ctext="NBD_CMD_DISC"; + break; + case NBD_CMD_FLUSH: + ctext="NBD_CMD_FLUSH"; + break; + default: + ctext="UNKNOWN"; + break; + } + printf("%s: %s [%s] (0x%08x)\n", + text, + ctext, + (command & NBD_CMD_FLAG_FUA)?"FUA":"NONE", + command); +#endif +} + +int integrity_test(gchar* hostname, int port, char* name, int sock, + char sock_is_open, char close_sock, int testflags) { + struct nbd_reply rep; + fd_set rset; + fd_set wset; + struct timeval tv; + struct timeval start; + struct timeval stop; + double timespan; + double speed; + char speedchar[2] = { '\0', '\0' }; + int retval=0; + int serverflags = 0; + pid_t G_GNUC_UNUSED mypid = getpid(); + int blkhashfd = -1; + char *blkhashname=NULL; + uint32_t *blkhash = NULL; + int logfd=-1; + uint64_t seq=1; + uint64_t processed=0; + uint64_t printer=0; + int readtransactionfile = 1; + struct rclist txqueue={NULL, NULL, 0}; + struct rclist inflight={NULL, NULL, 0}; + + size=0; + if(!sock_is_open) { + if((sock=setup_connection(hostname, port, name, CONNECTION_TYPE_FULL, &serverflags))<0) { + g_warning("Could not open socket: %s", errstr); + retval=-1; + goto err; + } + } + + if ((serverflags & (NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA)) + != (NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA)) + g_warning("Server flags do not support FLUSH and FUA - these may error"); + +#ifdef HAVE_MKSTEMP + blkhashname=strdup("/tmp/blkarray-XXXXXX"); + if (!blkhashname || (-1 == (blkhashfd = mkstemp(blkhashname)))) { + g_warning("Could not open temp file: %s", strerror(errno)); + retval=-1; + goto err; + } +#else + /* use tmpnam here to avoid further feature test nightmare */ + if (-1 == (blkhashfd = open(blkhashname=strdup(tmpnam(NULL)), + O_CREAT | O_RDWR, + S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH))) { + g_warning("Could not open temp file: %s", strerror(errno)); + retval=-1; + goto err; + } +#endif + /* Ensure space freed if we die */ + if (-1 == unlink(blkhashname)) { + g_warning("Could not unlink temp file: %s", strerror(errno)); + retval=-1; + goto err; + } + + if (-1 == lseek(blkhashfd, (off_t)((size>>9)<<2), SEEK_SET)) { + g_warning("Could not llseek temp file: %s", strerror(errno)); + retval=-1; + goto err; + } + + if (-1 == write(blkhashfd, "\0", 1)) { + g_warning("Could not write temp file: %s", strerror(errno)); + retval=-1; + goto err; + } + + if (NULL == (blkhash = mmap(NULL, + (size>>9)<<2, + PROT_READ | PROT_WRITE, + MAP_SHARED, + blkhashfd, + 0))) { + g_warning("Could not mmap temp file: %s", strerror(errno)); + retval=-1; + goto err; + } + + if (-1 == (logfd = open(transactionlog, O_RDONLY))) + { + g_warning("Could open log file: %s", strerror(errno)); + retval=-1; + goto err; + } + + if(gettimeofday(&start, NULL)<0) { + retval=-1; + snprintf(errstr, errstr_len, "Could not measure start time: %s", strerror(errno)); + goto err_open; + } + + while (readtransactionfile || txqueue.numitems || inflight.numitems) { + int ret; + + uint32_t magic; + uint32_t command; + uint64_t from; + uint32_t len; + struct reqcontext * prc; + + *errstr=0; + + FD_ZERO(&wset); + FD_ZERO(&rset); + if (readtransactionfile) + FD_SET(logfd, &rset); + if (txqueue.numitems) + FD_SET(sock, &wset); + if (inflight.numitems) + FD_SET(sock, &rset); + tv.tv_sec=5; + tv.tv_usec=0; + ret = select(1+((sock>logfd)?sock:logfd), &rset, &wset, NULL, &tv); + if (ret == 0) { + retval=-1; + snprintf(errstr, errstr_len, "Timeout reading from socket"); + goto err_open; + } else if (ret<0) { + g_warning("Could not mmap temp file: %s", errstr); + retval=-1; + goto err; + } + /* We know we've got at least one thing to do here then */ + + /* Get a command from the transaction log */ + if (FD_ISSET(logfd, &rset)) { + + /* Read a request or reply from the transaction file */ + READ_ALL_ERRCHK(logfd, + &magic, + sizeof(magic), + err_open, + "Could not read transaction log: %s", + strerror(errno)); + magic = ntohl(magic); + switch (magic) { + case NBD_REQUEST_MAGIC: + if (NULL == (prc = calloc(1, sizeof(struct reqcontext)))) { + retval=-1; + snprintf(errstr, errstr_len, "Could not allocate request"); + goto err_open; + } + READ_ALL_ERRCHK(logfd, + sizeof(magic)+(char *)&(prc->req), + sizeof(struct nbd_request)-sizeof(magic), + err_open, + "Could not read transaction log: %s", + strerror(errno)); + prc->req.magic = htonl(NBD_REQUEST_MAGIC); + prc->seq=seq++; + if ((ntohl(prc->req.type) & NBD_CMD_MASK_COMMAND) == NBD_CMD_DISC) { + /* no more to read; don't enqueue as no reply + * we will disconnect manually at the end + */ + readtransactionfile = 0; + free (prc); + } else { + dumpcommand("Enqueuing command", prc->req.type); + rclist_addtail(&txqueue, prc); + } + prc = NULL; + break; + case NBD_REPLY_MAGIC: + READ_ALL_ERRCHK(logfd, + sizeof(magic)+(char *)(&rep), + sizeof(struct nbd_reply)-sizeof(magic), + err_open, + "Could not read transaction log: %s", + strerror(errno)); + + if (rep.error) { + retval=-1; + snprintf(errstr, errstr_len, "Transaction log file contained errored transaction"); + goto err_open; + } + + /* We do not need to consume data on a read reply as there is + * none in the log */ + break; + default: + retval=-1; + snprintf(errstr, errstr_len, "Could not measure start time: %08x", magic); + goto err_open; + } + } + + /* See if we have a write we can do */ + if (FD_ISSET(sock, &wset)) + { + prc = txqueue.head; + if (!prc) + g_warning("Socket write FD set but we shouldn't have been interested"); + else + { + + rclist_unlink(&txqueue, prc); + rclist_addtail(&inflight, prc); + + if (ntohl(prc->req.magic) != NBD_REQUEST_MAGIC) { + retval=-1; + g_warning("Asked to write a reply without a magic number"); + goto err_open; + } + + dumpcommand("Sending command", prc->req.type); + command = ntohl(prc->req.type); + from = ntohll(prc->req.from); + len = ntohl(prc->req.len); + /* we rewrite the handle as they otherwise may not be unique */ + *((uint64_t*)(prc->req.handle))=htonll((uint64_t)prc); + WRITE_ALL_ERRCHK(sock, + &(prc->req), + sizeof(struct nbd_request), + err_open, + "Could not write command: %s", + strerror(errno)); + switch (command & NBD_CMD_MASK_COMMAND) { + case NBD_CMD_WRITE: + while (len > 0) { + uint64_t blknum = from>>9; + char dbuf[512]; + if (from>=size) { + snprintf(errstr, errstr_len, "offset %llx beyond size %llx", + (long long int) from, (long long int)size); + goto err_open; + } + /* work out what we should be writing */ + makebuf(dbuf, prc->seq, blknum); + WRITE_ALL_ERRCHK(sock, + dbuf, + 512, + err_open, + "Could not write data: %s", + strerror(errno)); + from += 512; + len -= 512; + } + + case NBD_CMD_DISC: + case NBD_CMD_READ: + case NBD_CMD_FLUSH: + break; + default: + retval=-1; + snprintf(errstr, errstr_len, "Incomprehensible command: %08x", command); + goto err_open; + break; + } + + prc = NULL; + } + + } + + /* See if there is a reply to be processed from the socket */ + if(FD_ISSET(sock, &rset)) { + /* Okay, there's something ready for + * reading here */ + + READ_ALL_ERRCHK(sock, + &rep, + sizeof(struct nbd_reply), + err_open, + "Could not read from server socket: %s", + strerror(errno)); + + if (rep.magic != htonl(NBD_REPLY_MAGIC)) { + retval=-1; + snprintf(errstr, errstr_len, "Bad magic from server"); + goto err_open; + } + + if (rep.error) { + retval=-1; + snprintf(errstr, errstr_len, "Server errored a transaction"); + goto err_open; + } + + prc=(struct reqcontext *)ntohll(*((uint64_t *)rep.handle)); + if (prc->req.magic != htonl(NBD_REQUEST_MAGIC)) { + retval=-1; + snprintf(errstr, errstr_len, "Bad magic in inflight data: %08x", prc->req.magic); + goto err_open; + } + + dumpcommand("Processing reply to command", prc->req.type); + command = ntohl(prc->req.type); + from = ntohll(prc->req.from); + len = ntohl(prc->req.len); + + switch (command & NBD_CMD_MASK_COMMAND) { + case NBD_CMD_READ: + while (len > 0) { + uint64_t blknum = from>>9; + char dbuf[512]; + if (from>=size) { + snprintf(errstr, errstr_len, "offset %llx beyond size %llx", + (long long int) from, (long long int)size); + goto err_open; + } + READ_ALL_ERRCHK(sock, + dbuf, + 512, + err_open, + "Could not read data: %s", + strerror(errno)); + /* work out what we was written */ + if (checkbuf(dbuf, blkhash[blknum], blknum)) + { + retval=-1; + snprintf(errstr, errstr_len, "Bad reply data: seq %08x", blkhash[blknum]); + goto err_open; + + } + from += 512; + len -= 512; + } + break; + case NBD_CMD_WRITE: + /* subsequent reads should get data with this seq*/ + while (len > 0) { + uint64_t blknum = from>>9; + blkhash[blknum]=(uint32_t)(prc->seq); + from += 512; + len -= 512; + } + break; + default: + break; + } + + processed++; + rclist_unlink(&inflight, prc); + prc->req.magic=0; /* so a duplicate reply is detected */ + free(prc); + } + + if (!(printer++ % 10000) || !(readtransactionfile || txqueue.numitems || inflight.numitems) ) + printf("%d: Seq %08lld Queued: %08d Inflight: %08d Done: %08lld\n", + (int)mypid, + (long long int) seq, + txqueue.numitems, + inflight.numitems, + (long long int) processed); + + } + + if (gettimeofday(&stop, NULL)<0) { + retval=-1; + snprintf(errstr, errstr_len, "Could not measure end time: %s", strerror(errno)); + goto err_open; + } + timespan=timeval_diff_to_double(&stop, &start); + speed=size/timespan; + if(speed>1024) { + speed=speed/1024.0; + speedchar[0]='K'; + } + if(speed>1024) { + speed=speed/1024.0; + speedchar[0]='M'; + } + if(speed>1024) { + speed=speed/1024.0; + speedchar[0]='G'; + } + g_message("%d: Integrity %s test complete. Took %.3f seconds to complete, %.3f%sib/s", (int)getpid(), (testflags & TEST_WRITE)?"write":"read", timespan, speed, speedchar); + +err_open: + if(close_sock) { + close_connection(sock, CONNECTION_CLOSE_PROPERLY); + } +err: + if (size && blkhash) + munmap(blkhash, (size>>9)<<2); + + if (blkhashfd != -1) + close (blkhashfd); + + if (logfd != -1) + close (logfd); + + if (blkhashname) + free(blkhashname); + + if (*errstr) + g_warning("%s",errstr); + + return retval; +} + +typedef int (*testfunc)(gchar*, int, char*, int, char, char, int); + int main(int argc, char**argv) { gchar *hostname; - long int p; - int port; + long int p = 0; + char* name = NULL; int sock=0; + int c; + bool want_port = TRUE; + int nonopt=0; + int testflags=0; + testfunc test = throughput_test; if(argc<3) { - g_message("Not enough arguments"); - g_message("Usage: %s ", argv[0]); + g_message("%d: Not enough arguments", (int)getpid()); + g_message("%d: Usage: %s ", (int)getpid(), argv[0]); + g_message("%d: Or: %s -N ", (int)getpid(), argv[0]); exit(EXIT_FAILURE); } logging(); - hostname=g_strdup(argv[1]); - p=(strtol(argv[2], NULL, 0)); - if(p==LONG_MIN||p==LONG_MAX) { - g_critical("Could not parse port number: %s", strerror(errno)); - exit(EXIT_FAILURE); + while((c=getopt(argc, argv, "-N:t:owfi"))>=0) { + switch(c) { + case 1: + switch(nonopt) { + case 0: + hostname=g_strdup(optarg); + nonopt++; + break; + case 1: + if(want_port) + p=(strtol(argv[2], NULL, 0)); + if(p==LONG_MIN||p==LONG_MAX) { + g_critical("Could not parse port number: %s", strerror(errno)); + exit(EXIT_FAILURE); + } + break; + } + break; + case 'N': + name=g_strdup(optarg); + p = 10809; + want_port = false; + break; + case 't': + transactionlog=g_strdup(optarg); + break; + case 'o': + test=oversize_test; + break; + case 'w': + testflags|=TEST_WRITE; + break; + case 'f': + testflags|=TEST_FLUSH; + break; + case 'i': + test=integrity_test; + break; + } } - port=(int)p; - if(throughput_test(hostname, port, sock, FALSE, TRUE)<0) { - g_warning("Could not run throughput test: %s", errstr); + if(test(hostname, (int)p, name, sock, FALSE, TRUE, testflags)<0) { + g_warning("Could not run test: %s", errstr); exit(EXIT_FAILURE); }