X-Git-Url: http://git.alex.org.uk diff --git a/nbd-tester-client.c b/nbd-tester-client.c index 662c6f7..fb45a88 100644 --- a/nbd-tester-client.c +++ b/nbd-tester-client.c @@ -35,17 +35,19 @@ #include #include "config.h" #include "lfs.h" -#define MY_NAME "nbd-tester-client" -#include "cliserv.h" - #include #include +#define MY_NAME "nbd-tester-client" +#include "cliserv.h" + static gchar errstr[1024]; const static int errstr_len=1024; static uint64_t size; +static int looseordering = 0; + static gchar * transactionlog = "nbd-tester-client.tr"; typedef enum { @@ -63,6 +65,7 @@ typedef enum { struct reqcontext { uint64_t seq; + char orighandle[8]; struct nbd_request req; struct reqcontext * next; struct reqcontext * prev; @@ -74,6 +77,28 @@ struct rclist { int numitems; }; +struct chunk { + char * buffer; + char * readptr; + char * writeptr; + uint64_t space; + uint64_t length; + struct chunk * next; + struct chunk * prev; +}; + +struct chunklist { + struct chunk * head; + struct chunk * tail; + int numitems; +}; + +struct blkitem { + uint32_t seq; + int32_t inflightr; + int32_t inflightw; +}; + void rclist_unlink(struct rclist * l, struct reqcontext * p) { if (p && l) { struct reqcontext * prev = p->prev; @@ -97,8 +122,7 @@ void rclist_unlink(struct rclist * l, struct reqcontext * p) { } /* Add a new list item to the tail */ -void rclist_addtail(struct rclist * l, struct reqcontext * p) -{ +void rclist_addtail(struct rclist * l, struct reqcontext * p) { if (!p || !l) return; if (l->tail) { @@ -119,6 +143,127 @@ void rclist_addtail(struct rclist * l, struct reqcontext * p) l->numitems++; } +void chunklist_unlink(struct chunklist * l, struct chunk * p) { + if (p && l) { + struct chunk * prev = p->prev; + struct chunk * next = p->next; + + /* Fix link to previous */ + if (prev) + prev->next = next; + else + l->head = next; + + if (next) + next->prev = prev; + else + l->tail = prev; + + p->prev = NULL; + p->next = NULL; + l->numitems--; + } +} + +/* Add a new list item to the tail */ +void chunklist_addtail(struct chunklist * l, struct chunk * p) { + if (!p || !l) + return; + if (l->tail) { + if (l->tail->next) + g_warning("addtail found list tail has a next pointer"); + l->tail->next = p; + p->next = NULL; + p->prev = l->tail; + l->tail = p; + } else { + if (l->head) + g_warning("addtail found no list tail but a list head"); + l->head = p; + l->tail = p; + p->prev = NULL; + p->next = NULL; + } + l->numitems++; +} + +/* Add some new bytes to a chunklist */ +void addbuffer(struct chunklist * l, void * data, uint64_t len) { + void * buf; + uint64_t size = 64*1024; + struct chunk * pchunk; + + while (len>0) + { + /* First see if there is a current chunk, and if it has space */ + if (l->tail && l->tail->space) { + uint64_t towrite = len; + if (towrite > l->tail->space) + towrite = l->tail->space; + memcpy(l->tail->writeptr, data, towrite); + l->tail->length += towrite; + l->tail->space -= towrite; + l->tail->writeptr += towrite; + len -= towrite; + data += towrite; + } + + if (len>0) { + /* We still need to write more, so prepare a new chunk */ + if ((NULL == (buf = malloc(size))) || (NULL == (pchunk = calloc(1, sizeof(struct chunk))))) { + g_critical("Out of memory"); + exit (1); + } + + pchunk->buffer = buf; + pchunk->readptr = buf; + pchunk->writeptr = buf; + pchunk->space = size; + chunklist_addtail(l, pchunk); + } + } + +} + +/* returns 0 on success, -1 on failure */ +int writebuffer(int fd, struct chunklist * l) { + + struct chunk * pchunk = NULL; + int res; + if (!l) + return 0; + + while (!pchunk) + { + pchunk = l->head; + if (!pchunk) + return 0; + if (!(pchunk->length) || !(pchunk->readptr)) { + chunklist_unlink(l, pchunk); + free(pchunk->buffer); + free(pchunk); + pchunk = NULL; + } + } + + /* OK we have a chunk with some data in */ + res = write(fd, pchunk->readptr, pchunk->length); + if (res==0) + errno = EAGAIN; + if (res<=0) + return -1; + pchunk->length -= res; + pchunk->readptr += res; + if (!pchunk->length) { + chunklist_unlink(l, pchunk); + free(pchunk->buffer); + free(pchunk); + } + return 0; +} + + + #define TEST_WRITE (1<<0) #define TEST_FLUSH (1<<1) @@ -333,10 +478,9 @@ int oversize_test(gchar* hostname, int port, char* name, int sock, int retval=0; struct nbd_request req; struct nbd_reply rep; - int request=0; int i=0; int serverflags = 0; - pid_t mypid = getpid(); + pid_t G_GNUC_UNUSED mypid = getpid(); char buf[((1024*1024)+sizeof(struct nbd_request)/2)<<1]; bool got_err; @@ -401,7 +545,6 @@ int oversize_test(gchar* hostname, int port, char* name, int sock, int throughput_test(gchar* hostname, int port, char* name, int sock, char sock_is_open, char close_sock, int testflags) { long long int i; - char buf[1024]; char writebuf[1024]; struct nbd_request req; int requests=0; @@ -414,7 +557,6 @@ int throughput_test(gchar* hostname, int port, char* name, int sock, char speedchar[2] = { '\0', '\0' }; int retval=0; int serverflags = 0; - size_t tmp; signed int do_write=TRUE; pid_t mypid = getpid(); @@ -607,9 +749,20 @@ static inline void dumpcommand(char * text, uint32_t command) #endif } +/* return an unused handle */ +uint64_t getrandomhandle(GHashTable *phash) { + uint64_t handle = 0; + int i; + do { + /* RAND_MAX may be as low as 2^15 */ + for (i= 1 ; i<=5; i++) + handle ^= random() ^ (handle << 15); + } while (g_hash_table_lookup(phash, &handle)); + return handle; +} + int integrity_test(gchar* hostname, int port, char* name, int sock, char sock_is_open, char close_sock, int testflags) { - struct nbd_request req; struct nbd_reply rep; fd_set rset; fd_set wset; @@ -621,17 +774,22 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, char speedchar[2] = { '\0', '\0' }; int retval=0; int serverflags = 0; - pid_t mypid = getpid(); + pid_t G_GNUC_UNUSED mypid = getpid(); int blkhashfd = -1; char *blkhashname=NULL; - uint32_t *blkhash = NULL; + struct blkitem *blkhash = NULL; int logfd=-1; uint64_t seq=1; uint64_t processed=0; uint64_t printer=0; + uint64_t xfer=0; int readtransactionfile = 1; + int blocked = 0; struct rclist txqueue={NULL, NULL, 0}; struct rclist inflight={NULL, NULL, 0}; + struct chunklist txbuf={NULL, NULL, 0}; + + GHashTable *handlehash = g_hash_table_new(g_int64_hash, g_int64_equal); size=0; if(!sock_is_open) { @@ -670,7 +828,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, goto err; } - if (-1 == lseek(blkhashfd, (off_t)((size>>9)<<2), SEEK_SET)) { + if (-1 == lseek(blkhashfd, (off_t)((size>>9)*sizeof(struct blkitem)), SEEK_SET)) { g_warning("Could not llseek temp file: %s", strerror(errno)); retval=-1; goto err; @@ -683,7 +841,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, } if (NULL == (blkhash = mmap(NULL, - (size>>9)<<2, + (size>>9)*sizeof(struct blkitem), PROT_READ | PROT_WRITE, MAP_SHARED, blkhashfd, @@ -706,11 +864,10 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, goto err_open; } - while (readtransactionfile || txqueue.numitems || inflight.numitems) { + while (readtransactionfile || txqueue.numitems || txbuf.numitems || inflight.numitems) { int ret; uint32_t magic; - uint64_t hand; uint32_t command; uint64_t from; uint32_t len; @@ -722,7 +879,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, FD_ZERO(&rset); if (readtransactionfile) FD_SET(logfd, &rset); - if (txqueue.numitems) + if ((!blocked && txqueue.numitems) || txbuf.numitems) FD_SET(sock, &wset); if (inflight.numitems) FD_SET(sock, &rset); @@ -765,6 +922,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, "Could not read transaction log: %s", strerror(errno)); prc->req.magic = htonl(NBD_REQUEST_MAGIC); + memcpy(prc->orighandle, prc->req.handle, 8); prc->seq=seq++; if ((ntohl(prc->req.type) & NBD_CMD_MASK_COMMAND) == NBD_CMD_DISC) { /* no more to read; don't enqueue as no reply @@ -805,35 +963,66 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, /* See if we have a write we can do */ if (FD_ISSET(sock, &wset)) { - prc = txqueue.head; - if (!prc) + if ((!(txqueue.head) && !(txbuf.head)) || blocked) g_warning("Socket write FD set but we shouldn't have been interested"); - else + + /* If there is no buffered data, generate some */ + if (!blocked && !(txbuf.head) && (NULL != (prc = txqueue.head))) { - - rclist_unlink(&txqueue, prc); - rclist_addtail(&inflight, prc); - if (ntohl(prc->req.magic) != NBD_REQUEST_MAGIC) { retval=-1; - g_warning("Asked to write a reply without a magic number"); + g_warning("Asked to write a request without a magic number"); goto err_open; } - dumpcommand("Sending command", prc->req.type); command = ntohl(prc->req.type); from = ntohll(prc->req.from); len = ntohl(prc->req.len); + + /* First check whether we can touch this command at all. If this + * command is a read, and there is an inflight write, OR if this + * command is a write, and there is an inflight read or write, then + * we need to leave the command alone and signal that we are blocked + */ + + if (!looseordering) + { + uint64_t cfrom; + uint32_t clen; + cfrom = from; + clen = len; + while (clen > 0) { + uint64_t blknum = cfrom>>9; + if (cfrom>=size) { + snprintf(errstr, errstr_len, "offset %llx beyond size %llx", + (long long int) cfrom, (long long int)size); + goto err_open; + } + if (blkhash[blknum].inflightw || + (blkhash[blknum].inflightr && + ((command & NBD_CMD_MASK_COMMAND)==NBD_CMD_WRITE))) { + blocked=1; + break; + } + cfrom += 512; + clen -= 512; + } + } + + if (blocked) + goto skipdequeue; + + rclist_unlink(&txqueue, prc); + rclist_addtail(&inflight, prc); + + dumpcommand("Sending command", prc->req.type); /* we rewrite the handle as they otherwise may not be unique */ - *((uint64_t*)(prc->req.handle))=htonll((uint64_t)prc); - WRITE_ALL_ERRCHK(sock, - &(prc->req), - sizeof(struct nbd_request), - err_open, - "Could not write command: %s", - strerror(errno)); + *((uint64_t*)(prc->req.handle))=getrandomhandle(handlehash); + g_hash_table_insert(handlehash, prc->req.handle, prc); + addbuffer(&txbuf, &(prc->req), sizeof(struct nbd_request)); switch (command & NBD_CMD_MASK_COMMAND) { case NBD_CMD_WRITE: + xfer+=len; while (len > 0) { uint64_t blknum = from>>9; char dbuf[512]; @@ -842,20 +1031,29 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, (long long int) from, (long long int)size); goto err_open; } + (blkhash[blknum].inflightw)++; /* work out what we should be writing */ makebuf(dbuf, prc->seq, blknum); - WRITE_ALL_ERRCHK(sock, - dbuf, - 512, - err_open, - "Could not write data: %s", - strerror(errno)); + addbuffer(&txbuf, dbuf, 512); from += 512; len -= 512; } - - case NBD_CMD_DISC: + break; case NBD_CMD_READ: + xfer+=len; + while (len > 0) { + uint64_t blknum = from>>9; + if (from>=size) { + snprintf(errstr, errstr_len, "offset %llx beyond size %llx", + (long long int) from, (long long int)size); + goto err_open; + } + (blkhash[blknum].inflightr)++; + from += 512; + len -= 512; + } + break; + case NBD_CMD_DISC: case NBD_CMD_FLUSH: break; default: @@ -867,6 +1065,14 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, prc = NULL; } + skipdequeue: + + /* there should be some now */ + if (writebuffer(sock, &txbuf)<0) { + retval=-1; + snprintf(errstr, errstr_len, "Failed to write to socket buffer: %s", strerror(errno)); + goto err_open; + } } @@ -894,7 +1100,18 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, goto err_open; } - prc=(struct reqcontext *)ntohll(*((uint64_t *)rep.handle)); + prc = g_hash_table_lookup(handlehash, rep.handle); + if (!prc) { + retval=-1; + snprintf(errstr, errstr_len, "Unrecognised handle in reply: 0x%llX", *(long long unsigned int*)(rep.handle)); + goto err_open; + } + if (!g_hash_table_remove(handlehash, rep.handle)) { + retval=-1; + snprintf(errstr, errstr_len, "Could not remove handle from hash: 0x%llX", *(long long unsigned int*)(rep.handle)); + goto err_open; + } + if (prc->req.magic != htonl(NBD_REQUEST_MAGIC)) { retval=-1; snprintf(errstr, errstr_len, "Bad magic in inflight data: %08x", prc->req.magic); @@ -922,11 +1139,20 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, err_open, "Could not read data: %s", strerror(errno)); + if (--(blkhash[blknum].inflightr) <0 ) { + snprintf(errstr, errstr_len, "Received a read reply for offset %llx when not in flight", + (long long int) from); + goto err_open; + } /* work out what we was written */ - if (checkbuf(dbuf, blkhash[blknum], blknum)) - { + if (checkbuf(dbuf, blkhash[blknum].seq, blknum)) { retval=-1; - snprintf(errstr, errstr_len, "Bad reply data: seq %08x", blkhash[blknum]); + snprintf(errstr, errstr_len, "Bad reply data: I wanted blk %08x, seq %08x but I got (at a guess) blk %08x, seq %08x", + (unsigned int) blknum, + blkhash[blknum].seq, + ((uint32_t *)(dbuf))[0], + ((uint32_t *)(dbuf))[1] + ); goto err_open; } @@ -938,7 +1164,12 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, /* subsequent reads should get data with this seq*/ while (len > 0) { uint64_t blknum = from>>9; - blkhash[blknum]=(uint32_t)(prc->seq); + if (--(blkhash[blknum].inflightw) <0 ) { + snprintf(errstr, errstr_len, "Received a write reply for offset %llx when not in flight", + (long long int) from); + goto err_open; + } + blkhash[blknum].seq=(uint32_t)(prc->seq); from += 512; len -= 512; } @@ -946,7 +1177,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, default: break; } - + blocked = 0; processed++; rclist_unlink(&inflight, prc); prc->req.magic=0; /* so a duplicate reply is detected */ @@ -969,7 +1200,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, goto err_open; } timespan=timeval_diff_to_double(&stop, &start); - speed=size/timespan; + speed=xfer/timespan; if(speed>1024) { speed=speed/1024.0; speedchar[0]='K'; @@ -990,7 +1221,7 @@ err_open: } err: if (size && blkhash) - munmap(blkhash, (size>>9)<<2); + munmap(blkhash, (size>>9)*sizeof(struct blkitem)); if (blkhashfd != -1) close (blkhashfd); @@ -1004,6 +1235,8 @@ err: if (*errstr) g_warning("%s",errstr); + g_hash_table_destroy(handlehash); + return retval; } @@ -1020,14 +1253,17 @@ int main(int argc, char**argv) { int testflags=0; testfunc test = throughput_test; + /* Ignore SIGPIPE as we want to pick up the error from write() */ + signal (SIGPIPE, SIG_IGN); + if(argc<3) { g_message("%d: Not enough arguments", (int)getpid()); g_message("%d: Usage: %s ", (int)getpid(), argv[0]); - g_message("%d: Or: %s -N ", (int)getpid(), argv[0]); + g_message("%d: Or: %s -N []", (int)getpid(), argv[0]); exit(EXIT_FAILURE); } logging(); - while((c=getopt(argc, argv, "-N:t:owfi"))>=0) { + while((c=getopt(argc, argv, "-N:t:owfil"))>=0) { switch(c) { case 1: switch(nonopt) { @@ -1036,7 +1272,6 @@ int main(int argc, char**argv) { nonopt++; break; case 1: - if(want_port) p=(strtol(argv[2], NULL, 0)); if(p==LONG_MIN||p==LONG_MAX) { g_critical("Could not parse port number: %s", strerror(errno)); @@ -1047,7 +1282,9 @@ int main(int argc, char**argv) { break; case 'N': name=g_strdup(optarg); - p = 10809; + if(!p) { + p = 10809; + } want_port = false; break; case 't': @@ -1056,6 +1293,9 @@ int main(int argc, char**argv) { case 'o': test=oversize_test; break; + case 'l': + looseordering=1; + break; case 'w': testflags|=TEST_WRITE; break;