Make integrity tests respect request ordering.
authorAlex Bligh <alex@alex.org.uk>
Mon, 6 Jun 2011 20:34:52 +0000 (21:34 +0100)
committerWouter Verhelst <w@uter.be>
Thu, 9 Jun 2011 14:41:36 +0000 (16:41 +0200)
Prior to this patch, the integrity test fired reads and writes as
quickly as they could at the server. This included firing overlapping
reads and writes, before the prior read or write was acknowledged.
Under the protocol, the server is permitted to reorder reads and
writes until they are acknowledged. Whilst nbd-server does not
currently reorder reads or writes, this caused a "false failure"
in other servers that do reorder reads and writes. Also, the workload
as sent was unrealistic (in that servers do not normally have
overlapping reads and writes in their queue).

This patch maintains a record of the reads and writes inflight, and
ensures that:

1. If any block X is covered by an inflight write request, then
   no other request covering block X will be sent until a reply
   to such inflight write is received.

2. If any block X is covered by an inflight read or write request,
   then no write request covering block X will be sent until a reply
   to such inflight request is received.

In otherwords, disallow all overlapping inflight requests, except
overlapping inflight reads, which are permitted.

The "-l" option to nbd-tester-client can be used to turn on
looseordering, IE not do the above.

I have tested this on nbd-server (no change as expected) and on
a server which does reorder requests (which now passes the tests).

nbd-tester-client.c

index 134652c..fb45a88 100644 (file)
@@ -46,6 +46,8 @@ const static int errstr_len=1024;
 
 static uint64_t size;
 
+static int looseordering = 0;
+
 static gchar * transactionlog = "nbd-tester-client.tr";
 
 typedef enum {
@@ -91,6 +93,12 @@ struct chunklist {
        int numitems;
 };
 
+struct blkitem {
+       uint32_t seq;
+       int32_t inflightr;
+       int32_t inflightw;
+};
+
 void rclist_unlink(struct rclist * l, struct reqcontext * p) {
        if (p && l) {
                struct reqcontext * prev = p->prev;
@@ -769,13 +777,14 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
        pid_t G_GNUC_UNUSED mypid = getpid();
        int blkhashfd = -1;
        char *blkhashname=NULL;
-       uint32_t *blkhash = NULL;
+       struct blkitem *blkhash = NULL;
        int logfd=-1;
        uint64_t seq=1;
        uint64_t processed=0;
        uint64_t printer=0;
        uint64_t xfer=0;
        int readtransactionfile = 1;
+       int blocked = 0;
        struct rclist txqueue={NULL, NULL, 0};
        struct rclist inflight={NULL, NULL, 0};
        struct chunklist txbuf={NULL, NULL, 0};
@@ -819,7 +828,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                goto err;
        }
 
-       if (-1 == lseek(blkhashfd, (off_t)((size>>9)<<2), SEEK_SET)) {
+       if (-1 == lseek(blkhashfd, (off_t)((size>>9)*sizeof(struct blkitem)), SEEK_SET)) {
                g_warning("Could not llseek temp file: %s", strerror(errno));
                retval=-1;
                goto err;
@@ -832,7 +841,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
        }
 
        if (NULL == (blkhash = mmap(NULL,
-                                   (size>>9)<<2,
+                                   (size>>9)*sizeof(struct blkitem),
                                    PROT_READ | PROT_WRITE,
                                    MAP_SHARED,
                                    blkhashfd,
@@ -870,7 +879,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                FD_ZERO(&rset);
                if (readtransactionfile)
                        FD_SET(logfd, &rset);
-               if (txqueue.numitems || txbuf.numitems)
+               if ((!blocked && txqueue.numitems) || txbuf.numitems)
                        FD_SET(sock, &wset);
                if (inflight.numitems)
                        FD_SET(sock, &rset);
@@ -954,25 +963,59 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                /* See if we have a write we can do */
                if (FD_ISSET(sock, &wset))
                {
-                       if (!(txqueue.head) && !(txbuf.head))
+                       if ((!(txqueue.head) && !(txbuf.head)) || blocked)
                                g_warning("Socket write FD set but we shouldn't have been interested");
 
                        /* If there is no buffered data, generate some */
-                       if (!(txbuf.head) && (NULL != (prc = txqueue.head)))
+                       if (!blocked && !(txbuf.head) && (NULL != (prc = txqueue.head)))
                        {
-                               rclist_unlink(&txqueue, prc);
-                               rclist_addtail(&inflight, prc);
-                               
                                if (ntohl(prc->req.magic) != NBD_REQUEST_MAGIC) {
                                        retval=-1;
-                                       g_warning("Asked to write a reply without a magic number");
+                                       g_warning("Asked to write a request without a magic number");
                                        goto err_open;
                                }
                                        
-                               dumpcommand("Sending command", prc->req.type);
                                command = ntohl(prc->req.type);
                                from = ntohll(prc->req.from);
                                len = ntohl(prc->req.len);
+
+                               /* First check whether we can touch this command at all. If this
+                                * command is a read, and there is an inflight write, OR if this
+                                * command is a write, and there is an inflight read or write, then
+                                * we need to leave the command alone and signal that we are blocked
+                                */
+                               
+                               if (!looseordering)
+                               {
+                                       uint64_t cfrom;
+                                       uint32_t clen;
+                                       cfrom = from;
+                                       clen = len;
+                                       while (clen > 0) {
+                                               uint64_t blknum = cfrom>>9;
+                                               if (cfrom>=size) {
+                                                       snprintf(errstr, errstr_len, "offset %llx beyond size %llx",
+                                                                (long long int) cfrom, (long long int)size);
+                                                       goto err_open;
+                                               }
+                                               if (blkhash[blknum].inflightw ||
+                                                   (blkhash[blknum].inflightr &&
+                                                    ((command & NBD_CMD_MASK_COMMAND)==NBD_CMD_WRITE))) {
+                                                       blocked=1;
+                                                       break;
+                                               }
+                                               cfrom += 512;
+                                               clen -= 512;
+                                       }
+                               }
+
+                               if (blocked)
+                                       goto skipdequeue;
+
+                               rclist_unlink(&txqueue, prc);
+                               rclist_addtail(&inflight, prc);
+                               
+                               dumpcommand("Sending command", prc->req.type);
                                /* we rewrite the handle as they otherwise may not be unique */
                                *((uint64_t*)(prc->req.handle))=getrandomhandle(handlehash);
                                g_hash_table_insert(handlehash, prc->req.handle, prc);
@@ -988,6 +1031,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                                                                 (long long int) from, (long long int)size);
                                                        goto err_open;
                                                }
+                                               (blkhash[blknum].inflightw)++;
                                                /* work out what we should be writing */
                                                makebuf(dbuf, prc->seq, blknum);
                                                addbuffer(&txbuf, dbuf, 512);
@@ -997,6 +1041,17 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                                        break;
                                case NBD_CMD_READ:
                                        xfer+=len;
+                                       while (len > 0) {
+                                               uint64_t blknum = from>>9;
+                                               if (from>=size) {
+                                                       snprintf(errstr, errstr_len, "offset %llx beyond size %llx",
+                                                                (long long int) from, (long long int)size);
+                                                       goto err_open;
+                                               }
+                                               (blkhash[blknum].inflightr)++;
+                                               from += 512;
+                                               len -= 512;
+                                       }
                                        break;
                                case NBD_CMD_DISC:
                                case NBD_CMD_FLUSH:
@@ -1010,6 +1065,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                                
                                prc = NULL;
                        }
+               skipdequeue:
 
                        /* there should be some now */
                        if (writebuffer(sock, &txbuf)<0) {
@@ -1083,13 +1139,17 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                                                        err_open,
                                                        "Could not read data: %s",
                                                        strerror(errno));
+                                       if (--(blkhash[blknum].inflightr) <0 ) {
+                                               snprintf(errstr, errstr_len, "Received a read reply for offset %llx when not in flight",
+                                                        (long long int) from);
+                                               goto err_open;
+                                       }
                                        /* work out what we was written */
-                                       if (checkbuf(dbuf, blkhash[blknum], blknum))
-                                       {
+                                       if (checkbuf(dbuf, blkhash[blknum].seq, blknum)) {
                                                retval=-1;
                                                snprintf(errstr, errstr_len, "Bad reply data: I wanted blk %08x, seq %08x but I got (at a guess) blk %08x, seq %08x",
                                                         (unsigned int) blknum,
-                                                        blkhash[blknum],
+                                                        blkhash[blknum].seq,
                                                         ((uint32_t *)(dbuf))[0],
                                                         ((uint32_t *)(dbuf))[1]
                                                         );
@@ -1104,7 +1164,12 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                                /* subsequent reads should get data with this seq*/
                                while (len > 0) {
                                        uint64_t blknum = from>>9;
-                                       blkhash[blknum]=(uint32_t)(prc->seq);
+                                       if (--(blkhash[blknum].inflightw) <0 ) {
+                                               snprintf(errstr, errstr_len, "Received a write reply for offset %llx when not in flight",
+                                                        (long long int) from);
+                                               goto err_open;
+                                       }
+                                       blkhash[blknum].seq=(uint32_t)(prc->seq);
                                        from += 512;
                                        len -= 512;
                                }
@@ -1112,7 +1177,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                        default:
                                break;
                        }
-                       
+                       blocked = 0;
                        processed++;
                        rclist_unlink(&inflight, prc);
                        prc->req.magic=0; /* so a duplicate reply is detected */
@@ -1156,7 +1221,7 @@ err_open:
        }
 err:
        if (size && blkhash)
-               munmap(blkhash, (size>>9)<<2);
+               munmap(blkhash, (size>>9)*sizeof(struct blkitem));
 
        if (blkhashfd != -1)
                close (blkhashfd);
@@ -1198,7 +1263,7 @@ int main(int argc, char**argv) {
                exit(EXIT_FAILURE);
        }
        logging();
-       while((c=getopt(argc, argv, "-N:t:owfi"))>=0) {
+       while((c=getopt(argc, argv, "-N:t:owfil"))>=0) {
                switch(c) {
                        case 1:
                                switch(nonopt) {
@@ -1228,6 +1293,9 @@ int main(int argc, char**argv) {
                        case 'o':
                                test=oversize_test;
                                break;
+                       case 'l':
+                               looseordering=1;
+                               break;
                        case 'w':
                                testflags|=TEST_WRITE;
                                break;