X-Git-Url: http://git.alex.org.uk diff --git a/nbd-tester-client.c b/nbd-tester-client.c index 7c1cb75..fee2d87 100644 --- a/nbd-tester-client.c +++ b/nbd-tester-client.c @@ -28,21 +28,26 @@ #include #include #include +#include +#include +#include #include #include #include "config.h" #include "lfs.h" -#define MY_NAME "nbd-tester-client" -#include "cliserv.h" - #include #include +#define MY_NAME "nbd-tester-client" +#include "cliserv.h" + static gchar errstr[1024]; const static int errstr_len=1024; static uint64_t size; +static gchar * transactionlog = "nbd-tester-client.tr"; + typedef enum { CONNECTION_TYPE_NONE, CONNECTION_TYPE_CONNECT, @@ -56,6 +61,204 @@ typedef enum { CONNECTION_CLOSE_FAST, } CLOSE_TYPE; +struct reqcontext { + uint64_t seq; + char orighandle[8]; + struct nbd_request req; + struct reqcontext * next; + struct reqcontext * prev; +}; + +struct rclist { + struct reqcontext * head; + struct reqcontext * tail; + int numitems; +}; + +struct chunk { + char * buffer; + char * readptr; + char * writeptr; + uint64_t space; + uint64_t length; + struct chunk * next; + struct chunk * prev; +}; + +struct chunklist { + struct chunk * head; + struct chunk * tail; + int numitems; +}; + +void rclist_unlink(struct rclist * l, struct reqcontext * p) { + if (p && l) { + struct reqcontext * prev = p->prev; + struct reqcontext * next = p->next; + + /* Fix link to previous */ + if (prev) + prev->next = next; + else + l->head = next; + + if (next) + next->prev = prev; + else + l->tail = prev; + + p->prev = NULL; + p->next = NULL; + l->numitems--; + } +} + +/* Add a new list item to the tail */ +void rclist_addtail(struct rclist * l, struct reqcontext * p) { + if (!p || !l) + return; + if (l->tail) { + if (l->tail->next) + g_warning("addtail found list tail has a next pointer"); + l->tail->next = p; + p->next = NULL; + p->prev = l->tail; + l->tail = p; + } else { + if (l->head) + g_warning("addtail found no list tail but a list head"); + l->head = p; + l->tail = p; + p->prev = NULL; + p->next = NULL; + } + l->numitems++; +} + +void chunklist_unlink(struct chunklist * l, struct chunk * p) { + if (p && l) { + struct chunk * prev = p->prev; + struct chunk * next = p->next; + + /* Fix link to previous */ + if (prev) + prev->next = next; + else + l->head = next; + + if (next) + next->prev = prev; + else + l->tail = prev; + + p->prev = NULL; + p->next = NULL; + l->numitems--; + } +} + +/* Add a new list item to the tail */ +void chunklist_addtail(struct chunklist * l, struct chunk * p) { + if (!p || !l) + return; + if (l->tail) { + if (l->tail->next) + g_warning("addtail found list tail has a next pointer"); + l->tail->next = p; + p->next = NULL; + p->prev = l->tail; + l->tail = p; + } else { + if (l->head) + g_warning("addtail found no list tail but a list head"); + l->head = p; + l->tail = p; + p->prev = NULL; + p->next = NULL; + } + l->numitems++; +} + +/* Add some new bytes to a chunklist */ +void addbuffer(struct chunklist * l, void * data, uint64_t len) { + void * buf; + uint64_t size = 64*1024; + struct chunk * pchunk; + + while (len>0) + { + /* First see if there is a current chunk, and if it has space */ + if (l->tail && l->tail->space) { + uint64_t towrite = len; + if (towrite > l->tail->space) + towrite = l->tail->space; + memcpy(l->tail->writeptr, data, towrite); + l->tail->length += towrite; + l->tail->space -= towrite; + l->tail->writeptr += towrite; + len -= towrite; + data += towrite; + } + + if (len>0) { + /* We still need to write more, so prepare a new chunk */ + if ((NULL == (buf = malloc(size))) || (NULL == (pchunk = calloc(1, sizeof(struct chunk))))) { + g_critical("Out of memory"); + exit (1); + } + + pchunk->buffer = buf; + pchunk->readptr = buf; + pchunk->writeptr = buf; + pchunk->space = size; + chunklist_addtail(l, pchunk); + } + } + +} + +/* returns 0 on success, -1 on failure */ +int writebuffer(int fd, struct chunklist * l) { + + struct chunk * pchunk = NULL; + int res; + if (!l) + return 0; + + while (!pchunk) + { + pchunk = l->head; + if (!pchunk) + return 0; + if (!(pchunk->length) || !(pchunk->readptr)) { + chunklist_unlink(l, pchunk); + free(pchunk->buffer); + free(pchunk); + pchunk = NULL; + } + } + + /* OK we have a chunk with some data in */ + res = write(fd, pchunk->readptr, pchunk->length); + if (res==0) + errno = EAGAIN; + if (res<=0) + return -1; + pchunk->length -= res; + pchunk->readptr += res; + if (!pchunk->length) { + chunklist_unlink(l, pchunk); + free(pchunk->buffer); + free(pchunk); + } + return 0; +} + + + +#define TEST_WRITE (1<<0) +#define TEST_FLUSH (1<<1) + int timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y) { if (x->tv_usec < y->tv_usec) { @@ -88,6 +291,8 @@ static inline int read_all(int f, void *buf, size_t len) { while(len>0) { if((res=read(f, buf, len)) <=0) { + if (!res) + errno=EAGAIN; snprintf(errstr, errstr_len, "Read failed: %s", strerror(errno)); return -1; } @@ -104,6 +309,8 @@ static inline int write_all(int f, void *buf, size_t len) { while(len>0) { if((res=write(f, buf, len)) <=0) { + if (!res) + errno=EAGAIN; snprintf(errstr, errstr_len, "Write failed: %s", strerror(errno)); return -1; } @@ -120,7 +327,7 @@ static inline int write_all(int f, void *buf, size_t len) { #define WRITE_ALL_ERRCHK(f, buf, len, whereto, errmsg...) if((write_all(f, buf, len))<=0) { snprintf(errstr, errstr_len, ##errmsg); goto whereto; } #define WRITE_ALL_ERR_RT(f, buf, len, whereto, rval, errmsg...) if((write_all(f, buf, len))<=0) { snprintf(errstr, errstr_len, ##errmsg); retval = rval; goto whereto; } -int setup_connection(gchar *hostname, int port, gchar* name, CONNECTION_TYPE ctype) { +int setup_connection(gchar *hostname, int port, gchar* name, CONNECTION_TYPE ctype, int* serverflags) { int sock; struct hostent *host; struct sockaddr_in addr; @@ -191,8 +398,9 @@ int setup_connection(gchar *hostname, int port, gchar* name, CONNECTION_TYPE cty READ_ALL_ERRCHK(sock, &size, sizeof(size), err_open, "Could not read size: %s", strerror(errno)); size = ntohll(size); uint16_t flags; - READ_ALL_ERRCHK(sock, buf, sizeof(uint16_t), err_open, "Could not read flags: %s", strerror(errno)); + READ_ALL_ERRCHK(sock, &flags, sizeof(uint16_t), err_open, "Could not read flags: %s", strerror(errno)); flags = ntohs(flags); + *serverflags = flags; READ_ALL_ERRCHK(sock, buf, 124, err_open, "Could not read reserved zeroes: %s", strerror(errno)); goto end; err_open: @@ -258,19 +466,19 @@ end: } int oversize_test(gchar* hostname, int port, char* name, int sock, - char sock_is_open, char close_sock, int write) { + char sock_is_open, char close_sock, int testflags) { int retval=0; struct nbd_request req; struct nbd_reply rep; - int request=0; int i=0; - pid_t mypid = getpid(); + int serverflags = 0; + pid_t G_GNUC_UNUSED mypid = getpid(); char buf[((1024*1024)+sizeof(struct nbd_request)/2)<<1]; bool got_err; /* This should work */ if(!sock_is_open) { - if((sock=setup_connection(hostname, port, name, CONNECTION_TYPE_FULL))<0) { + if((sock=setup_connection(hostname, port, name, CONNECTION_TYPE_FULL, &serverflags))<0) { g_warning("Could not open socket: %s", errstr); retval=-1; goto err; @@ -327,9 +535,8 @@ int oversize_test(gchar* hostname, int port, char* name, int sock, } int throughput_test(gchar* hostname, int port, char* name, int sock, - char sock_is_open, char close_sock, int write) { + char sock_is_open, char close_sock, int testflags) { long long int i; - char buf[1024]; char writebuf[1024]; struct nbd_request req; int requests=0; @@ -341,21 +548,30 @@ int throughput_test(gchar* hostname, int port, char* name, int sock, double speed; char speedchar[2] = { '\0', '\0' }; int retval=0; - size_t tmp; + int serverflags = 0; signed int do_write=TRUE; pid_t mypid = getpid(); - memset (writebuf, 'X', sizeof(1024)); + + if (!(testflags & TEST_WRITE)) + testflags &= ~TEST_FLUSH; + + memset (writebuf, 'X', 1024); size=0; if(!sock_is_open) { - if((sock=setup_connection(hostname, port, name, CONNECTION_TYPE_FULL))<0) { + if((sock=setup_connection(hostname, port, name, CONNECTION_TYPE_FULL, &serverflags))<0) { g_warning("Could not open socket: %s", errstr); retval=-1; goto err; } } + if ((testflags & TEST_FLUSH) && ((serverflags & (NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA)) + != (NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA))) { + snprintf(errstr, errstr_len, "Server did not supply flush capability flags"); + retval = -1; + goto err_open; + } req.magic=htonl(NBD_REQUEST_MAGIC); - req.type=htonl(write?NBD_CMD_WRITE:NBD_CMD_READ); req.len=htonl(1024); if(gettimeofday(&start, NULL)<0) { retval=-1; @@ -364,19 +580,35 @@ int throughput_test(gchar* hostname, int port, char* name, int sock, } for(i=0;i+1024<=size;i+=1024) { if(do_write) { + int sendfua = (testflags & TEST_FLUSH) && (((i>>10) & 15) == 3); + int sendflush = (testflags & TEST_FLUSH) && (((i>>10) & 15) == 11); + req.type=htonl((testflags & TEST_WRITE)?NBD_CMD_WRITE:NBD_CMD_READ); + if (sendfua) + req.type = htonl(NBD_CMD_WRITE | NBD_CMD_FLAG_FUA); memcpy(&(req.handle),&i,sizeof(i)); req.from=htonll(i); if (write_all(sock, &req, sizeof(req)) <0) { retval=-1; goto err_open; } - if (write) { + if (testflags & TEST_WRITE) { if (write_all(sock, writebuf, 1024) <0) { retval=-1; goto err_open; } } printf("%d: Requests(+): %d\n", (int)mypid, ++requests); + if (sendflush) { + long long int j = i ^ (1LL<<63); + req.type = htonl(NBD_CMD_FLUSH); + memcpy(&(req.handle),&j,sizeof(j)); + req.from=0; + if (write_all(sock, &req, sizeof(req)) <0) { + retval=-1; + goto err_open; + } + printf("%d: Requests(+): %d\n", (int)mypid, ++requests); + } } do { FD_ZERO(&set); @@ -387,7 +619,7 @@ int throughput_test(gchar* hostname, int port, char* name, int sock, if(FD_ISSET(sock, &set)) { /* Okay, there's something ready for * reading here */ - if(read_packet_check_header(sock, write?0:1024, i)<0) { + if(read_packet_check_header(sock, (testflags & TEST_WRITE)?0:1024, i)<0) { retval=-1; goto err_open; } @@ -418,7 +650,7 @@ int throughput_test(gchar* hostname, int port, char* name, int sock, if(FD_ISSET(sock, &set)) { /* Okay, there's something ready for * reading here */ - read_packet_check_header(sock, write?0:1024, i); + read_packet_check_header(sock, (testflags & TEST_WRITE)?0:1024, i); printf("%d: Requests(-): %d\n", (int)mypid, --requests); } } while (requests); @@ -441,13 +673,500 @@ int throughput_test(gchar* hostname, int port, char* name, int sock, speed=speed/1024.0; speedchar[0]='G'; } - g_message("%d: Throughput %s test complete. Took %.3f seconds to complete, %.3f%sib/s", (int)getpid(), write?"write":"read", timespan, speed, speedchar); + g_message("%d: Throughput %s test (%s flushes) complete. Took %.3f seconds to complete, %.3f%sib/s", (int)getpid(), (testflags & TEST_WRITE)?"write":"read", (testflags & TEST_FLUSH)?"with":"without", timespan, speed, speedchar); + +err_open: + if(close_sock) { + close_connection(sock, CONNECTION_CLOSE_PROPERLY); + } +err: + return retval; +} + +/* + * fill 512 byte buffer 'buf' with a hashed selection of interesting data based + * only on handle and blknum. The first word is blknum, and the second handle, for ease + * of understanding. Things with handle 0 are blank. + */ +static inline void makebuf(char *buf, uint64_t seq, uint64_t blknum) { + uint64_t x = ((uint64_t)blknum) ^ (seq << 32) ^ (seq >> 32); + uint64_t* p = (uint64_t*)buf; + int i; + if (!seq) { + bzero(buf, 512); + return; + } + for (i = 0; i<512/sizeof(uint64_t); i++) { + int s; + *(p++) = x; + x+=0xFEEDA1ECDEADBEEFULL+i+(((uint64_t)i)<<56); + s = x & 63; + x = x ^ (x<>(64-s)) ^ 0xAA55AA55AA55AA55ULL ^ seq; + } +} + +static inline int checkbuf(char *buf, uint64_t seq, uint64_t blknum) { + char cmp[512]; + makebuf(cmp, seq, blknum); + return memcmp(cmp, buf, 512)?-1:0; +} + +static inline void dumpcommand(char * text, uint32_t command) +{ +#ifdef DEBUG_COMMANDS + command=ntohl(command); + char * ctext; + switch (command & NBD_CMD_MASK_COMMAND) { + case NBD_CMD_READ: + ctext="NBD_CMD_READ"; + break; + case NBD_CMD_WRITE: + ctext="NBD_CMD_WRITE"; + break; + case NBD_CMD_DISC: + ctext="NBD_CMD_DISC"; + break; + case NBD_CMD_FLUSH: + ctext="NBD_CMD_FLUSH"; + break; + default: + ctext="UNKNOWN"; + break; + } + printf("%s: %s [%s] (0x%08x)\n", + text, + ctext, + (command & NBD_CMD_FLAG_FUA)?"FUA":"NONE", + command); +#endif +} + +/* return an unused handle */ +uint64_t getrandomhandle(GHashTable *phash) { + uint64_t handle = 0; + int i; + do { + /* RAND_MAX may be as low as 2^15 */ + for (i= 1 ; i<=5; i++) + handle ^= random() ^ (handle << 15); + } while (g_hash_table_lookup(phash, &handle)); + return handle; +} + +int integrity_test(gchar* hostname, int port, char* name, int sock, + char sock_is_open, char close_sock, int testflags) { + struct nbd_reply rep; + fd_set rset; + fd_set wset; + struct timeval tv; + struct timeval start; + struct timeval stop; + double timespan; + double speed; + char speedchar[2] = { '\0', '\0' }; + int retval=0; + int serverflags = 0; + pid_t G_GNUC_UNUSED mypid = getpid(); + int blkhashfd = -1; + char *blkhashname=NULL; + uint32_t *blkhash = NULL; + int logfd=-1; + uint64_t seq=1; + uint64_t processed=0; + uint64_t printer=0; + uint64_t xfer=0; + int readtransactionfile = 1; + struct rclist txqueue={NULL, NULL, 0}; + struct rclist inflight={NULL, NULL, 0}; + struct chunklist txbuf={NULL, NULL, 0}; + + GHashTable *handlehash = g_hash_table_new(g_int64_hash, g_int64_equal); + + size=0; + if(!sock_is_open) { + if((sock=setup_connection(hostname, port, name, CONNECTION_TYPE_FULL, &serverflags))<0) { + g_warning("Could not open socket: %s", errstr); + retval=-1; + goto err; + } + } + + if ((serverflags & (NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA)) + != (NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA)) + g_warning("Server flags do not support FLUSH and FUA - these may error"); + +#ifdef HAVE_MKSTEMP + blkhashname=strdup("/tmp/blkarray-XXXXXX"); + if (!blkhashname || (-1 == (blkhashfd = mkstemp(blkhashname)))) { + g_warning("Could not open temp file: %s", strerror(errno)); + retval=-1; + goto err; + } +#else + /* use tmpnam here to avoid further feature test nightmare */ + if (-1 == (blkhashfd = open(blkhashname=strdup(tmpnam(NULL)), + O_CREAT | O_RDWR, + S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH))) { + g_warning("Could not open temp file: %s", strerror(errno)); + retval=-1; + goto err; + } +#endif + /* Ensure space freed if we die */ + if (-1 == unlink(blkhashname)) { + g_warning("Could not unlink temp file: %s", strerror(errno)); + retval=-1; + goto err; + } + + if (-1 == lseek(blkhashfd, (off_t)((size>>9)<<2), SEEK_SET)) { + g_warning("Could not llseek temp file: %s", strerror(errno)); + retval=-1; + goto err; + } + + if (-1 == write(blkhashfd, "\0", 1)) { + g_warning("Could not write temp file: %s", strerror(errno)); + retval=-1; + goto err; + } + + if (NULL == (blkhash = mmap(NULL, + (size>>9)<<2, + PROT_READ | PROT_WRITE, + MAP_SHARED, + blkhashfd, + 0))) { + g_warning("Could not mmap temp file: %s", strerror(errno)); + retval=-1; + goto err; + } + + if (-1 == (logfd = open(transactionlog, O_RDONLY))) + { + g_warning("Could open log file: %s", strerror(errno)); + retval=-1; + goto err; + } + + if(gettimeofday(&start, NULL)<0) { + retval=-1; + snprintf(errstr, errstr_len, "Could not measure start time: %s", strerror(errno)); + goto err_open; + } + + while (readtransactionfile || txqueue.numitems || txbuf.numitems || inflight.numitems) { + int ret; + + uint32_t magic; + uint32_t command; + uint64_t from; + uint32_t len; + struct reqcontext * prc; + + *errstr=0; + + FD_ZERO(&wset); + FD_ZERO(&rset); + if (readtransactionfile) + FD_SET(logfd, &rset); + if (txqueue.numitems || txbuf.numitems) + FD_SET(sock, &wset); + if (inflight.numitems) + FD_SET(sock, &rset); + tv.tv_sec=5; + tv.tv_usec=0; + ret = select(1+((sock>logfd)?sock:logfd), &rset, &wset, NULL, &tv); + if (ret == 0) { + retval=-1; + snprintf(errstr, errstr_len, "Timeout reading from socket"); + goto err_open; + } else if (ret<0) { + g_warning("Could not mmap temp file: %s", errstr); + retval=-1; + goto err; + } + /* We know we've got at least one thing to do here then */ + + /* Get a command from the transaction log */ + if (FD_ISSET(logfd, &rset)) { + + /* Read a request or reply from the transaction file */ + READ_ALL_ERRCHK(logfd, + &magic, + sizeof(magic), + err_open, + "Could not read transaction log: %s", + strerror(errno)); + magic = ntohl(magic); + switch (magic) { + case NBD_REQUEST_MAGIC: + if (NULL == (prc = calloc(1, sizeof(struct reqcontext)))) { + retval=-1; + snprintf(errstr, errstr_len, "Could not allocate request"); + goto err_open; + } + READ_ALL_ERRCHK(logfd, + sizeof(magic)+(char *)&(prc->req), + sizeof(struct nbd_request)-sizeof(magic), + err_open, + "Could not read transaction log: %s", + strerror(errno)); + prc->req.magic = htonl(NBD_REQUEST_MAGIC); + memcpy(prc->orighandle, prc->req.handle, 8); + prc->seq=seq++; + if ((ntohl(prc->req.type) & NBD_CMD_MASK_COMMAND) == NBD_CMD_DISC) { + /* no more to read; don't enqueue as no reply + * we will disconnect manually at the end + */ + readtransactionfile = 0; + free (prc); + } else { + dumpcommand("Enqueuing command", prc->req.type); + rclist_addtail(&txqueue, prc); + } + prc = NULL; + break; + case NBD_REPLY_MAGIC: + READ_ALL_ERRCHK(logfd, + sizeof(magic)+(char *)(&rep), + sizeof(struct nbd_reply)-sizeof(magic), + err_open, + "Could not read transaction log: %s", + strerror(errno)); + + if (rep.error) { + retval=-1; + snprintf(errstr, errstr_len, "Transaction log file contained errored transaction"); + goto err_open; + } + + /* We do not need to consume data on a read reply as there is + * none in the log */ + break; + default: + retval=-1; + snprintf(errstr, errstr_len, "Could not measure start time: %08x", magic); + goto err_open; + } + } + + /* See if we have a write we can do */ + if (FD_ISSET(sock, &wset)) + { + if (!(txqueue.head) && !(txbuf.head)) + g_warning("Socket write FD set but we shouldn't have been interested"); + + /* If there is no buffered data, generate some */ + if (!(txbuf.head) && (NULL != (prc = txqueue.head))) + { + rclist_unlink(&txqueue, prc); + rclist_addtail(&inflight, prc); + + if (ntohl(prc->req.magic) != NBD_REQUEST_MAGIC) { + retval=-1; + g_warning("Asked to write a reply without a magic number"); + goto err_open; + } + + dumpcommand("Sending command", prc->req.type); + command = ntohl(prc->req.type); + from = ntohll(prc->req.from); + len = ntohl(prc->req.len); + /* we rewrite the handle as they otherwise may not be unique */ + *((uint64_t*)(prc->req.handle))=getrandomhandle(handlehash); + g_hash_table_insert(handlehash, prc->req.handle, prc); + addbuffer(&txbuf, &(prc->req), sizeof(struct nbd_request)); + switch (command & NBD_CMD_MASK_COMMAND) { + case NBD_CMD_WRITE: + xfer+=len; + while (len > 0) { + uint64_t blknum = from>>9; + char dbuf[512]; + if (from>=size) { + snprintf(errstr, errstr_len, "offset %llx beyond size %llx", + (long long int) from, (long long int)size); + goto err_open; + } + /* work out what we should be writing */ + makebuf(dbuf, prc->seq, blknum); + addbuffer(&txbuf, dbuf, 512); + from += 512; + len -= 512; + } + break; + case NBD_CMD_READ: + xfer+=len; + break; + case NBD_CMD_DISC: + case NBD_CMD_FLUSH: + break; + default: + retval=-1; + snprintf(errstr, errstr_len, "Incomprehensible command: %08x", command); + goto err_open; + break; + } + + prc = NULL; + } + + /* there should be some now */ + if (writebuffer(sock, &txbuf)<0) { + retval=-1; + snprintf(errstr, errstr_len, "Failed to write to socket buffer: %s", strerror(errno)); + goto err_open; + } + + } + + /* See if there is a reply to be processed from the socket */ + if(FD_ISSET(sock, &rset)) { + /* Okay, there's something ready for + * reading here */ + + READ_ALL_ERRCHK(sock, + &rep, + sizeof(struct nbd_reply), + err_open, + "Could not read from server socket: %s", + strerror(errno)); + + if (rep.magic != htonl(NBD_REPLY_MAGIC)) { + retval=-1; + snprintf(errstr, errstr_len, "Bad magic from server"); + goto err_open; + } + + if (rep.error) { + retval=-1; + snprintf(errstr, errstr_len, "Server errored a transaction"); + goto err_open; + } + + prc = g_hash_table_lookup(handlehash, rep.handle); + if (!prc) { + retval=-1; + snprintf(errstr, errstr_len, "Unrecognised handle in reply: 0x%llX", *(long long unsigned int*)(rep.handle)); + goto err_open; + } + if (!g_hash_table_remove(handlehash, rep.handle)) { + retval=-1; + snprintf(errstr, errstr_len, "Could not remove handle from hash: 0x%llX", *(long long unsigned int*)(rep.handle)); + goto err_open; + } + + if (prc->req.magic != htonl(NBD_REQUEST_MAGIC)) { + retval=-1; + snprintf(errstr, errstr_len, "Bad magic in inflight data: %08x", prc->req.magic); + goto err_open; + } + + dumpcommand("Processing reply to command", prc->req.type); + command = ntohl(prc->req.type); + from = ntohll(prc->req.from); + len = ntohl(prc->req.len); + + switch (command & NBD_CMD_MASK_COMMAND) { + case NBD_CMD_READ: + while (len > 0) { + uint64_t blknum = from>>9; + char dbuf[512]; + if (from>=size) { + snprintf(errstr, errstr_len, "offset %llx beyond size %llx", + (long long int) from, (long long int)size); + goto err_open; + } + READ_ALL_ERRCHK(sock, + dbuf, + 512, + err_open, + "Could not read data: %s", + strerror(errno)); + /* work out what we was written */ + if (checkbuf(dbuf, blkhash[blknum], blknum)) + { + retval=-1; + snprintf(errstr, errstr_len, "Bad reply data: seq %08x", blkhash[blknum]); + goto err_open; + + } + from += 512; + len -= 512; + } + break; + case NBD_CMD_WRITE: + /* subsequent reads should get data with this seq*/ + while (len > 0) { + uint64_t blknum = from>>9; + blkhash[blknum]=(uint32_t)(prc->seq); + from += 512; + len -= 512; + } + break; + default: + break; + } + + processed++; + rclist_unlink(&inflight, prc); + prc->req.magic=0; /* so a duplicate reply is detected */ + free(prc); + } + + if (!(printer++ % 10000) || !(readtransactionfile || txqueue.numitems || inflight.numitems) ) + printf("%d: Seq %08lld Queued: %08d Inflight: %08d Done: %08lld\n", + (int)mypid, + (long long int) seq, + txqueue.numitems, + inflight.numitems, + (long long int) processed); + + } + + if (gettimeofday(&stop, NULL)<0) { + retval=-1; + snprintf(errstr, errstr_len, "Could not measure end time: %s", strerror(errno)); + goto err_open; + } + timespan=timeval_diff_to_double(&stop, &start); + speed=xfer/timespan; + if(speed>1024) { + speed=speed/1024.0; + speedchar[0]='K'; + } + if(speed>1024) { + speed=speed/1024.0; + speedchar[0]='M'; + } + if(speed>1024) { + speed=speed/1024.0; + speedchar[0]='G'; + } + g_message("%d: Integrity %s test complete. Took %.3f seconds to complete, %.3f%sib/s", (int)getpid(), (testflags & TEST_WRITE)?"write":"read", timespan, speed, speedchar); err_open: if(close_sock) { close_connection(sock, CONNECTION_CLOSE_PROPERLY); } err: + if (size && blkhash) + munmap(blkhash, (size>>9)<<2); + + if (blkhashfd != -1) + close (blkhashfd); + + if (logfd != -1) + close (logfd); + + if (blkhashname) + free(blkhashname); + + if (*errstr) + g_warning("%s",errstr); + + g_hash_table_destroy(handlehash); + return retval; } @@ -461,17 +1180,20 @@ int main(int argc, char**argv) { int c; bool want_port = TRUE; int nonopt=0; - int write=0; + int testflags=0; testfunc test = throughput_test; + /* Ignore SIGPIPE as we want to pick up the error from write() */ + signal (SIGPIPE, SIG_IGN); + if(argc<3) { g_message("%d: Not enough arguments", (int)getpid()); g_message("%d: Usage: %s ", (int)getpid(), argv[0]); - g_message("%d: Or: %s -N ", (int)getpid(), argv[0]); + g_message("%d: Or: %s -N []", (int)getpid(), argv[0]); exit(EXIT_FAILURE); } logging(); - while((c=getopt(argc, argv, "-N:ow"))>=0) { + while((c=getopt(argc, argv, "-N:t:owfi"))>=0) { switch(c) { case 1: switch(nonopt) { @@ -480,7 +1202,6 @@ int main(int argc, char**argv) { nonopt++; break; case 1: - if(want_port) p=(strtol(argv[2], NULL, 0)); if(p==LONG_MIN||p==LONG_MAX) { g_critical("Could not parse port number: %s", strerror(errno)); @@ -491,19 +1212,30 @@ int main(int argc, char**argv) { break; case 'N': name=g_strdup(optarg); - p = 10809; + if(!p) { + p = 10809; + } want_port = false; break; + case 't': + transactionlog=g_strdup(optarg); + break; case 'o': test=oversize_test; break; case 'w': - write=1; + testflags|=TEST_WRITE; + break; + case 'f': + testflags|=TEST_FLUSH; + break; + case 'i': + test=integrity_test; break; } } - if(test(hostname, (int)p, name, sock, FALSE, TRUE, write)<0) { + if(test(hostname, (int)p, name, sock, FALSE, TRUE, testflags)<0) { g_warning("Could not run test: %s", errstr); exit(EXIT_FAILURE); }