X-Git-Url: http://git.alex.org.uk diff --git a/nbd-tester-client.c b/nbd-tester-client.c index ed9214e..fb45a88 100644 --- a/nbd-tester-client.c +++ b/nbd-tester-client.c @@ -46,6 +46,8 @@ const static int errstr_len=1024; static uint64_t size; +static int looseordering = 0; + static gchar * transactionlog = "nbd-tester-client.tr"; typedef enum { @@ -91,6 +93,12 @@ struct chunklist { int numitems; }; +struct blkitem { + uint32_t seq; + int32_t inflightr; + int32_t inflightw; +}; + void rclist_unlink(struct rclist * l, struct reqcontext * p) { if (p && l) { struct reqcontext * prev = p->prev; @@ -769,13 +777,14 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, pid_t G_GNUC_UNUSED mypid = getpid(); int blkhashfd = -1; char *blkhashname=NULL; - uint32_t *blkhash = NULL; + struct blkitem *blkhash = NULL; int logfd=-1; uint64_t seq=1; uint64_t processed=0; uint64_t printer=0; uint64_t xfer=0; int readtransactionfile = 1; + int blocked = 0; struct rclist txqueue={NULL, NULL, 0}; struct rclist inflight={NULL, NULL, 0}; struct chunklist txbuf={NULL, NULL, 0}; @@ -819,7 +828,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, goto err; } - if (-1 == lseek(blkhashfd, (off_t)((size>>9)<<2), SEEK_SET)) { + if (-1 == lseek(blkhashfd, (off_t)((size>>9)*sizeof(struct blkitem)), SEEK_SET)) { g_warning("Could not llseek temp file: %s", strerror(errno)); retval=-1; goto err; @@ -832,7 +841,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, } if (NULL == (blkhash = mmap(NULL, - (size>>9)<<2, + (size>>9)*sizeof(struct blkitem), PROT_READ | PROT_WRITE, MAP_SHARED, blkhashfd, @@ -870,7 +879,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, FD_ZERO(&rset); if (readtransactionfile) FD_SET(logfd, &rset); - if (txqueue.numitems || txbuf.numitems) + if ((!blocked && txqueue.numitems) || txbuf.numitems) FD_SET(sock, &wset); if (inflight.numitems) FD_SET(sock, &rset); @@ -954,25 +963,59 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, /* See if we have a write we can do */ if (FD_ISSET(sock, &wset)) { - if (!(txqueue.head) && !(txbuf.head)) + if ((!(txqueue.head) && !(txbuf.head)) || blocked) g_warning("Socket write FD set but we shouldn't have been interested"); /* If there is no buffered data, generate some */ - if (!(txbuf.head) && (NULL != (prc = txqueue.head))) + if (!blocked && !(txbuf.head) && (NULL != (prc = txqueue.head))) { - rclist_unlink(&txqueue, prc); - rclist_addtail(&inflight, prc); - if (ntohl(prc->req.magic) != NBD_REQUEST_MAGIC) { retval=-1; - g_warning("Asked to write a reply without a magic number"); + g_warning("Asked to write a request without a magic number"); goto err_open; } - dumpcommand("Sending command", prc->req.type); command = ntohl(prc->req.type); from = ntohll(prc->req.from); len = ntohl(prc->req.len); + + /* First check whether we can touch this command at all. If this + * command is a read, and there is an inflight write, OR if this + * command is a write, and there is an inflight read or write, then + * we need to leave the command alone and signal that we are blocked + */ + + if (!looseordering) + { + uint64_t cfrom; + uint32_t clen; + cfrom = from; + clen = len; + while (clen > 0) { + uint64_t blknum = cfrom>>9; + if (cfrom>=size) { + snprintf(errstr, errstr_len, "offset %llx beyond size %llx", + (long long int) cfrom, (long long int)size); + goto err_open; + } + if (blkhash[blknum].inflightw || + (blkhash[blknum].inflightr && + ((command & NBD_CMD_MASK_COMMAND)==NBD_CMD_WRITE))) { + blocked=1; + break; + } + cfrom += 512; + clen -= 512; + } + } + + if (blocked) + goto skipdequeue; + + rclist_unlink(&txqueue, prc); + rclist_addtail(&inflight, prc); + + dumpcommand("Sending command", prc->req.type); /* we rewrite the handle as they otherwise may not be unique */ *((uint64_t*)(prc->req.handle))=getrandomhandle(handlehash); g_hash_table_insert(handlehash, prc->req.handle, prc); @@ -988,6 +1031,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, (long long int) from, (long long int)size); goto err_open; } + (blkhash[blknum].inflightw)++; /* work out what we should be writing */ makebuf(dbuf, prc->seq, blknum); addbuffer(&txbuf, dbuf, 512); @@ -997,6 +1041,17 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, break; case NBD_CMD_READ: xfer+=len; + while (len > 0) { + uint64_t blknum = from>>9; + if (from>=size) { + snprintf(errstr, errstr_len, "offset %llx beyond size %llx", + (long long int) from, (long long int)size); + goto err_open; + } + (blkhash[blknum].inflightr)++; + from += 512; + len -= 512; + } break; case NBD_CMD_DISC: case NBD_CMD_FLUSH: @@ -1010,6 +1065,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, prc = NULL; } + skipdequeue: /* there should be some now */ if (writebuffer(sock, &txbuf)<0) { @@ -1083,11 +1139,20 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, err_open, "Could not read data: %s", strerror(errno)); + if (--(blkhash[blknum].inflightr) <0 ) { + snprintf(errstr, errstr_len, "Received a read reply for offset %llx when not in flight", + (long long int) from); + goto err_open; + } /* work out what we was written */ - if (checkbuf(dbuf, blkhash[blknum], blknum)) - { + if (checkbuf(dbuf, blkhash[blknum].seq, blknum)) { retval=-1; - snprintf(errstr, errstr_len, "Bad reply data: seq %08x", blkhash[blknum]); + snprintf(errstr, errstr_len, "Bad reply data: I wanted blk %08x, seq %08x but I got (at a guess) blk %08x, seq %08x", + (unsigned int) blknum, + blkhash[blknum].seq, + ((uint32_t *)(dbuf))[0], + ((uint32_t *)(dbuf))[1] + ); goto err_open; } @@ -1099,7 +1164,12 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, /* subsequent reads should get data with this seq*/ while (len > 0) { uint64_t blknum = from>>9; - blkhash[blknum]=(uint32_t)(prc->seq); + if (--(blkhash[blknum].inflightw) <0 ) { + snprintf(errstr, errstr_len, "Received a write reply for offset %llx when not in flight", + (long long int) from); + goto err_open; + } + blkhash[blknum].seq=(uint32_t)(prc->seq); from += 512; len -= 512; } @@ -1107,7 +1177,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, default: break; } - + blocked = 0; processed++; rclist_unlink(&inflight, prc); prc->req.magic=0; /* so a duplicate reply is detected */ @@ -1151,7 +1221,7 @@ err_open: } err: if (size && blkhash) - munmap(blkhash, (size>>9)<<2); + munmap(blkhash, (size>>9)*sizeof(struct blkitem)); if (blkhashfd != -1) close (blkhashfd); @@ -1183,14 +1253,17 @@ int main(int argc, char**argv) { int testflags=0; testfunc test = throughput_test; + /* Ignore SIGPIPE as we want to pick up the error from write() */ + signal (SIGPIPE, SIG_IGN); + if(argc<3) { g_message("%d: Not enough arguments", (int)getpid()); g_message("%d: Usage: %s ", (int)getpid(), argv[0]); - g_message("%d: Or: %s -N ", (int)getpid(), argv[0]); + g_message("%d: Or: %s -N []", (int)getpid(), argv[0]); exit(EXIT_FAILURE); } logging(); - while((c=getopt(argc, argv, "-N:t:owfi"))>=0) { + while((c=getopt(argc, argv, "-N:t:owfil"))>=0) { switch(c) { case 1: switch(nonopt) { @@ -1199,7 +1272,6 @@ int main(int argc, char**argv) { nonopt++; break; case 1: - if(want_port) p=(strtol(argv[2], NULL, 0)); if(p==LONG_MIN||p==LONG_MAX) { g_critical("Could not parse port number: %s", strerror(errno)); @@ -1210,7 +1282,9 @@ int main(int argc, char**argv) { break; case 'N': name=g_strdup(optarg); - p = 10809; + if(!p) { + p = 10809; + } want_port = false; break; case 't': @@ -1219,6 +1293,9 @@ int main(int argc, char**argv) { case 'o': test=oversize_test; break; + case 'l': + looseordering=1; + break; case 'w': testflags|=TEST_WRITE; break;