nbd-tester-client: ignore SIGPIPE so we pick up and print the error
[nbd.git] / nbd-tester-client.c
index 662c6f7..fee2d87 100644 (file)
 #include <unistd.h>
 #include "config.h"
 #include "lfs.h"
-#define MY_NAME "nbd-tester-client"
-#include "cliserv.h"
-
 #include <netinet/in.h>
 #include <glib.h>
 
+#define MY_NAME "nbd-tester-client"
+#include "cliserv.h"
+
 static gchar errstr[1024];
 const static int errstr_len=1024;
 
@@ -63,6 +63,7 @@ typedef enum {
 
 struct reqcontext {
        uint64_t seq;
+       char orighandle[8];
        struct nbd_request req;
        struct reqcontext * next;
        struct reqcontext * prev;
@@ -74,6 +75,22 @@ struct rclist {
        int numitems;
 };
 
+struct chunk {
+       char * buffer;
+       char * readptr;
+       char * writeptr;
+       uint64_t space;
+       uint64_t length;
+       struct chunk * next;
+       struct chunk * prev;
+};
+
+struct chunklist {
+       struct chunk * head;
+       struct chunk * tail;
+       int numitems;
+};
+
 void rclist_unlink(struct rclist * l, struct reqcontext * p) {
        if (p && l) {
                struct reqcontext * prev = p->prev;
@@ -97,8 +114,7 @@ void rclist_unlink(struct rclist * l, struct reqcontext * p) {
 }                                                                      
 
 /* Add a new list item to the tail */
-void rclist_addtail(struct rclist * l, struct reqcontext * p)
-{
+void rclist_addtail(struct rclist * l, struct reqcontext * p) {
        if (!p || !l)
                return;
        if (l->tail) {
@@ -119,6 +135,127 @@ void rclist_addtail(struct rclist * l, struct reqcontext * p)
        l->numitems++;
 }
 
+void chunklist_unlink(struct chunklist * l, struct chunk * p) {
+       if (p && l) {
+               struct chunk * prev = p->prev;
+               struct chunk * next = p->next;
+               
+               /* Fix link to previous */
+               if (prev)
+                       prev->next = next;
+               else
+                       l->head = next;
+               
+               if (next)
+                       next->prev = prev;
+               else
+                       l->tail = prev;
+
+               p->prev = NULL;
+               p->next = NULL;
+               l->numitems--;
+       }                                                       
+}                                                                      
+
+/* Add a new list item to the tail */
+void chunklist_addtail(struct chunklist * l, struct chunk * p) {
+       if (!p || !l)
+               return;
+       if (l->tail) {
+               if (l->tail->next)
+                       g_warning("addtail found list tail has a next pointer");
+               l->tail->next = p;
+               p->next = NULL;
+               p->prev = l->tail;
+               l->tail = p;
+       } else {
+               if (l->head)
+                       g_warning("addtail found no list tail but a list head");
+               l->head = p;
+               l->tail = p;
+               p->prev = NULL;
+               p->next = NULL;
+       }
+       l->numitems++;
+}
+
+/* Add some new bytes to a chunklist */
+void addbuffer(struct chunklist * l, void * data, uint64_t len) {
+       void * buf;
+       uint64_t size = 64*1024;
+       struct chunk * pchunk;
+
+       while (len>0)
+       {
+               /* First see if there is a current chunk, and if it has space */
+               if (l->tail && l->tail->space) {
+                       uint64_t towrite = len;
+                       if (towrite > l->tail->space)
+                               towrite = l->tail->space;
+                       memcpy(l->tail->writeptr, data, towrite);
+                       l->tail->length += towrite;
+                       l->tail->space -= towrite;
+                       l->tail->writeptr += towrite;
+                       len -= towrite;
+                       data += towrite;
+               }
+
+               if (len>0) {
+                       /* We still need to write more, so prepare a new chunk */
+                       if ((NULL == (buf = malloc(size))) || (NULL == (pchunk = calloc(1, sizeof(struct chunk))))) {
+                               g_critical("Out of memory");
+                               exit (1);
+                       }
+
+                       pchunk->buffer = buf;
+                       pchunk->readptr = buf;
+                       pchunk->writeptr = buf;
+                       pchunk->space = size;
+                       chunklist_addtail(l, pchunk);
+               }
+       }
+
+}
+
+/* returns 0 on success, -1 on failure */
+int writebuffer(int fd, struct chunklist * l) {
+
+       struct chunk * pchunk = NULL;
+       int res;
+       if (!l)
+               return 0;
+
+       while (!pchunk)
+       {
+               pchunk = l->head;
+               if (!pchunk)
+                       return 0;
+               if (!(pchunk->length) || !(pchunk->readptr)) {
+                       chunklist_unlink(l, pchunk);
+                       free(pchunk->buffer);
+                       free(pchunk);
+                       pchunk = NULL;
+               }
+       }
+       
+       /* OK we have a chunk with some data in */
+       res = write(fd, pchunk->readptr, pchunk->length);
+       if (res==0)
+               errno = EAGAIN;
+       if (res<=0)
+               return -1;
+       pchunk->length -= res;
+       pchunk->readptr += res;
+       if (!pchunk->length) {
+               chunklist_unlink(l, pchunk);
+               free(pchunk->buffer);
+               free(pchunk);
+       }
+       return 0;
+}
+
+
+
 #define TEST_WRITE (1<<0)
 #define TEST_FLUSH (1<<1)
 
@@ -333,10 +470,9 @@ int oversize_test(gchar* hostname, int port, char* name, int sock,
        int retval=0;
        struct nbd_request req;
        struct nbd_reply rep;
-       int request=0;
        int i=0;
        int serverflags = 0;
-       pid_t mypid = getpid();
+       pid_t G_GNUC_UNUSED mypid = getpid();
        char buf[((1024*1024)+sizeof(struct nbd_request)/2)<<1];
        bool got_err;
 
@@ -401,7 +537,6 @@ int oversize_test(gchar* hostname, int port, char* name, int sock,
 int throughput_test(gchar* hostname, int port, char* name, int sock,
                    char sock_is_open, char close_sock, int testflags) {
        long long int i;
-       char buf[1024];
        char writebuf[1024];
        struct nbd_request req;
        int requests=0;
@@ -414,7 +549,6 @@ int throughput_test(gchar* hostname, int port, char* name, int sock,
        char speedchar[2] = { '\0', '\0' };
        int retval=0;
        int serverflags = 0;
-       size_t tmp;
        signed int do_write=TRUE;
        pid_t mypid = getpid();
 
@@ -607,9 +741,20 @@ static inline void dumpcommand(char * text, uint32_t command)
 #endif
 }
 
+/* return an unused handle */
+uint64_t getrandomhandle(GHashTable *phash) {
+       uint64_t handle = 0;
+       int i;
+       do {
+               /* RAND_MAX may be as low as 2^15 */
+               for (i= 1 ; i<=5; i++)
+                       handle ^= random() ^ (handle << 15); 
+       } while (g_hash_table_lookup(phash, &handle));
+       return handle;
+}
+
 int integrity_test(gchar* hostname, int port, char* name, int sock,
                   char sock_is_open, char close_sock, int testflags) {
-       struct nbd_request req;
        struct nbd_reply rep;
        fd_set rset;
        fd_set wset;
@@ -621,7 +766,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
        char speedchar[2] = { '\0', '\0' };
        int retval=0;
        int serverflags = 0;
-       pid_t mypid = getpid();
+       pid_t G_GNUC_UNUSED mypid = getpid();
        int blkhashfd = -1;
        char *blkhashname=NULL;
        uint32_t *blkhash = NULL;
@@ -629,9 +774,13 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
        uint64_t seq=1;
        uint64_t processed=0;
        uint64_t printer=0;
+       uint64_t xfer=0;
        int readtransactionfile = 1;
        struct rclist txqueue={NULL, NULL, 0};
        struct rclist inflight={NULL, NULL, 0};
+       struct chunklist txbuf={NULL, NULL, 0};
+
+       GHashTable *handlehash = g_hash_table_new(g_int64_hash, g_int64_equal);
 
        size=0;
        if(!sock_is_open) {
@@ -706,11 +855,10 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                goto err_open;
        }
 
-       while (readtransactionfile || txqueue.numitems || inflight.numitems) {
+       while (readtransactionfile || txqueue.numitems || txbuf.numitems || inflight.numitems) {
                int ret;
 
                uint32_t magic;
-                uint64_t hand;
                 uint32_t command;
                 uint64_t from;
                 uint32_t len;
@@ -722,7 +870,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                FD_ZERO(&rset);
                if (readtransactionfile)
                        FD_SET(logfd, &rset);
-               if (txqueue.numitems)
+               if (txqueue.numitems || txbuf.numitems)
                        FD_SET(sock, &wset);
                if (inflight.numitems)
                        FD_SET(sock, &rset);
@@ -765,6 +913,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                                                "Could not read transaction log: %s",
                                                strerror(errno));
                                prc->req.magic = htonl(NBD_REQUEST_MAGIC);
+                               memcpy(prc->orighandle, prc->req.handle, 8);
                                prc->seq=seq++;
                                if ((ntohl(prc->req.type) & NBD_CMD_MASK_COMMAND) == NBD_CMD_DISC) {
                                        /* no more to read; don't enqueue as no reply
@@ -805,12 +954,12 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                /* See if we have a write we can do */
                if (FD_ISSET(sock, &wset))
                {
-                       prc = txqueue.head;
-                       if (!prc)
+                       if (!(txqueue.head) && !(txbuf.head))
                                g_warning("Socket write FD set but we shouldn't have been interested");
-                       else
+
+                       /* If there is no buffered data, generate some */
+                       if (!(txbuf.head) && (NULL != (prc = txqueue.head)))
                        {
-                       
                                rclist_unlink(&txqueue, prc);
                                rclist_addtail(&inflight, prc);
                                
@@ -825,15 +974,12 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                                from = ntohll(prc->req.from);
                                len = ntohl(prc->req.len);
                                /* we rewrite the handle as they otherwise may not be unique */
-                               *((uint64_t*)(prc->req.handle))=htonll((uint64_t)prc);
-                               WRITE_ALL_ERRCHK(sock,
-                                                &(prc->req),
-                                                sizeof(struct nbd_request),
-                                                err_open,
-                                                "Could not write command: %s",
-                                                strerror(errno));
+                               *((uint64_t*)(prc->req.handle))=getrandomhandle(handlehash);
+                               g_hash_table_insert(handlehash, prc->req.handle, prc);
+                               addbuffer(&txbuf, &(prc->req), sizeof(struct nbd_request));
                                switch (command & NBD_CMD_MASK_COMMAND) {
                                case NBD_CMD_WRITE:
+                                       xfer+=len;
                                        while (len > 0) {
                                                uint64_t blknum = from>>9;
                                                char dbuf[512];
@@ -844,18 +990,15 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                                                }
                                                /* work out what we should be writing */
                                                makebuf(dbuf, prc->seq, blknum);
-                                               WRITE_ALL_ERRCHK(sock,
-                                                                dbuf,
-                                                                512,
-                                                                err_open,
-                                                                "Could not write data: %s",
-                                                                strerror(errno));
+                                               addbuffer(&txbuf, dbuf, 512);
                                                from += 512;
                                                len -= 512;
                                        }
-                                       
-                               case NBD_CMD_DISC:
+                                       break;
                                case NBD_CMD_READ:
+                                       xfer+=len;
+                                       break;
+                               case NBD_CMD_DISC:
                                case NBD_CMD_FLUSH:
                                        break;
                                default:
@@ -867,6 +1010,13 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                                
                                prc = NULL;
                        }
+
+                       /* there should be some now */
+                       if (writebuffer(sock, &txbuf)<0) {
+                               retval=-1;
+                               snprintf(errstr, errstr_len, "Failed to write to socket buffer: %s", strerror(errno));
+                               goto err_open;
+                       }
                        
                }
 
@@ -894,7 +1044,18 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                                goto err_open;
                        }
                                
-                       prc=(struct reqcontext *)ntohll(*((uint64_t *)rep.handle));
+                       prc = g_hash_table_lookup(handlehash, rep.handle);
+                       if (!prc) {
+                               retval=-1;
+                               snprintf(errstr, errstr_len, "Unrecognised handle in reply: 0x%llX", *(long long unsigned int*)(rep.handle));
+                               goto err_open;
+                       }
+                       if (!g_hash_table_remove(handlehash, rep.handle)) {
+                               retval=-1;
+                               snprintf(errstr, errstr_len, "Could not remove handle from hash: 0x%llX", *(long long unsigned int*)(rep.handle));
+                               goto err_open;
+                       }
+
                        if (prc->req.magic != htonl(NBD_REQUEST_MAGIC)) {
                                retval=-1;
                                snprintf(errstr, errstr_len, "Bad magic in inflight data: %08x", prc->req.magic);
@@ -969,7 +1130,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock,
                goto err_open;
        }
        timespan=timeval_diff_to_double(&stop, &start);
-       speed=size/timespan;
+       speed=xfer/timespan;
        if(speed>1024) {
                speed=speed/1024.0;
                speedchar[0]='K';
@@ -1004,6 +1165,8 @@ err:
        if (*errstr)
                g_warning("%s",errstr);
 
+       g_hash_table_destroy(handlehash);
+
        return retval;
 }
 
@@ -1020,10 +1183,13 @@ int main(int argc, char**argv) {
        int testflags=0;
        testfunc test = throughput_test;
 
+       /* Ignore SIGPIPE as we want to pick up the error from write() */
+       signal (SIGPIPE, SIG_IGN);
+
        if(argc<3) {
                g_message("%d: Not enough arguments", (int)getpid());
                g_message("%d: Usage: %s <hostname> <port>", (int)getpid(), argv[0]);
-               g_message("%d: Or: %s <hostname> -N <exportname>", (int)getpid(), argv[0]);
+               g_message("%d: Or: %s <hostname> -N <exportname> [<port>]", (int)getpid(), argv[0]);
                exit(EXIT_FAILURE);
        }
        logging();
@@ -1036,7 +1202,6 @@ int main(int argc, char**argv) {
                                                nonopt++;
                                                break;
                                        case 1:
-                                               if(want_port)
                                                p=(strtol(argv[2], NULL, 0));
                                                if(p==LONG_MIN||p==LONG_MAX) {
                                                        g_critical("Could not parse port number: %s", strerror(errno));
@@ -1047,7 +1212,9 @@ int main(int argc, char**argv) {
                                break;
                        case 'N':
                                name=g_strdup(optarg);
-                               p = 10809;
+                               if(!p) {
+                                       p = 10809;
+                               }
                                want_port = false;
                                break;
                        case 't':