X-Git-Url: http://git.alex.org.uk diff --git a/nbd-tester-client.c b/nbd-tester-client.c index b5801ee..fee2d87 100644 --- a/nbd-tester-client.c +++ b/nbd-tester-client.c @@ -63,6 +63,7 @@ typedef enum { struct reqcontext { uint64_t seq; + char orighandle[8]; struct nbd_request req; struct reqcontext * next; struct reqcontext * prev; @@ -74,6 +75,22 @@ struct rclist { int numitems; }; +struct chunk { + char * buffer; + char * readptr; + char * writeptr; + uint64_t space; + uint64_t length; + struct chunk * next; + struct chunk * prev; +}; + +struct chunklist { + struct chunk * head; + struct chunk * tail; + int numitems; +}; + void rclist_unlink(struct rclist * l, struct reqcontext * p) { if (p && l) { struct reqcontext * prev = p->prev; @@ -97,8 +114,51 @@ void rclist_unlink(struct rclist * l, struct reqcontext * p) { } /* Add a new list item to the tail */ -void rclist_addtail(struct rclist * l, struct reqcontext * p) -{ +void rclist_addtail(struct rclist * l, struct reqcontext * p) { + if (!p || !l) + return; + if (l->tail) { + if (l->tail->next) + g_warning("addtail found list tail has a next pointer"); + l->tail->next = p; + p->next = NULL; + p->prev = l->tail; + l->tail = p; + } else { + if (l->head) + g_warning("addtail found no list tail but a list head"); + l->head = p; + l->tail = p; + p->prev = NULL; + p->next = NULL; + } + l->numitems++; +} + +void chunklist_unlink(struct chunklist * l, struct chunk * p) { + if (p && l) { + struct chunk * prev = p->prev; + struct chunk * next = p->next; + + /* Fix link to previous */ + if (prev) + prev->next = next; + else + l->head = next; + + if (next) + next->prev = prev; + else + l->tail = prev; + + p->prev = NULL; + p->next = NULL; + l->numitems--; + } +} + +/* Add a new list item to the tail */ +void chunklist_addtail(struct chunklist * l, struct chunk * p) { if (!p || !l) return; if (l->tail) { @@ -119,6 +179,83 @@ void rclist_addtail(struct rclist * l, struct reqcontext * p) l->numitems++; } +/* Add some new bytes to a chunklist */ +void addbuffer(struct chunklist * l, void * data, uint64_t len) { + void * buf; + uint64_t size = 64*1024; + struct chunk * pchunk; + + while (len>0) + { + /* First see if there is a current chunk, and if it has space */ + if (l->tail && l->tail->space) { + uint64_t towrite = len; + if (towrite > l->tail->space) + towrite = l->tail->space; + memcpy(l->tail->writeptr, data, towrite); + l->tail->length += towrite; + l->tail->space -= towrite; + l->tail->writeptr += towrite; + len -= towrite; + data += towrite; + } + + if (len>0) { + /* We still need to write more, so prepare a new chunk */ + if ((NULL == (buf = malloc(size))) || (NULL == (pchunk = calloc(1, sizeof(struct chunk))))) { + g_critical("Out of memory"); + exit (1); + } + + pchunk->buffer = buf; + pchunk->readptr = buf; + pchunk->writeptr = buf; + pchunk->space = size; + chunklist_addtail(l, pchunk); + } + } + +} + +/* returns 0 on success, -1 on failure */ +int writebuffer(int fd, struct chunklist * l) { + + struct chunk * pchunk = NULL; + int res; + if (!l) + return 0; + + while (!pchunk) + { + pchunk = l->head; + if (!pchunk) + return 0; + if (!(pchunk->length) || !(pchunk->readptr)) { + chunklist_unlink(l, pchunk); + free(pchunk->buffer); + free(pchunk); + pchunk = NULL; + } + } + + /* OK we have a chunk with some data in */ + res = write(fd, pchunk->readptr, pchunk->length); + if (res==0) + errno = EAGAIN; + if (res<=0) + return -1; + pchunk->length -= res; + pchunk->readptr += res; + if (!pchunk->length) { + chunklist_unlink(l, pchunk); + free(pchunk->buffer); + free(pchunk); + } + return 0; +} + + + #define TEST_WRITE (1<<0) #define TEST_FLUSH (1<<1) @@ -604,6 +741,18 @@ static inline void dumpcommand(char * text, uint32_t command) #endif } +/* return an unused handle */ +uint64_t getrandomhandle(GHashTable *phash) { + uint64_t handle = 0; + int i; + do { + /* RAND_MAX may be as low as 2^15 */ + for (i= 1 ; i<=5; i++) + handle ^= random() ^ (handle << 15); + } while (g_hash_table_lookup(phash, &handle)); + return handle; +} + int integrity_test(gchar* hostname, int port, char* name, int sock, char sock_is_open, char close_sock, int testflags) { struct nbd_reply rep; @@ -625,9 +774,13 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, uint64_t seq=1; uint64_t processed=0; uint64_t printer=0; + uint64_t xfer=0; int readtransactionfile = 1; struct rclist txqueue={NULL, NULL, 0}; struct rclist inflight={NULL, NULL, 0}; + struct chunklist txbuf={NULL, NULL, 0}; + + GHashTable *handlehash = g_hash_table_new(g_int64_hash, g_int64_equal); size=0; if(!sock_is_open) { @@ -702,7 +855,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, goto err_open; } - while (readtransactionfile || txqueue.numitems || inflight.numitems) { + while (readtransactionfile || txqueue.numitems || txbuf.numitems || inflight.numitems) { int ret; uint32_t magic; @@ -717,7 +870,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, FD_ZERO(&rset); if (readtransactionfile) FD_SET(logfd, &rset); - if (txqueue.numitems) + if (txqueue.numitems || txbuf.numitems) FD_SET(sock, &wset); if (inflight.numitems) FD_SET(sock, &rset); @@ -760,6 +913,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, "Could not read transaction log: %s", strerror(errno)); prc->req.magic = htonl(NBD_REQUEST_MAGIC); + memcpy(prc->orighandle, prc->req.handle, 8); prc->seq=seq++; if ((ntohl(prc->req.type) & NBD_CMD_MASK_COMMAND) == NBD_CMD_DISC) { /* no more to read; don't enqueue as no reply @@ -800,12 +954,12 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, /* See if we have a write we can do */ if (FD_ISSET(sock, &wset)) { - prc = txqueue.head; - if (!prc) + if (!(txqueue.head) && !(txbuf.head)) g_warning("Socket write FD set but we shouldn't have been interested"); - else + + /* If there is no buffered data, generate some */ + if (!(txbuf.head) && (NULL != (prc = txqueue.head))) { - rclist_unlink(&txqueue, prc); rclist_addtail(&inflight, prc); @@ -820,15 +974,12 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, from = ntohll(prc->req.from); len = ntohl(prc->req.len); /* we rewrite the handle as they otherwise may not be unique */ - *((uint64_t*)(prc->req.handle))=htonll((uint64_t)prc); - WRITE_ALL_ERRCHK(sock, - &(prc->req), - sizeof(struct nbd_request), - err_open, - "Could not write command: %s", - strerror(errno)); + *((uint64_t*)(prc->req.handle))=getrandomhandle(handlehash); + g_hash_table_insert(handlehash, prc->req.handle, prc); + addbuffer(&txbuf, &(prc->req), sizeof(struct nbd_request)); switch (command & NBD_CMD_MASK_COMMAND) { case NBD_CMD_WRITE: + xfer+=len; while (len > 0) { uint64_t blknum = from>>9; char dbuf[512]; @@ -839,18 +990,15 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, } /* work out what we should be writing */ makebuf(dbuf, prc->seq, blknum); - WRITE_ALL_ERRCHK(sock, - dbuf, - 512, - err_open, - "Could not write data: %s", - strerror(errno)); + addbuffer(&txbuf, dbuf, 512); from += 512; len -= 512; } - - case NBD_CMD_DISC: + break; case NBD_CMD_READ: + xfer+=len; + break; + case NBD_CMD_DISC: case NBD_CMD_FLUSH: break; default: @@ -862,6 +1010,13 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, prc = NULL; } + + /* there should be some now */ + if (writebuffer(sock, &txbuf)<0) { + retval=-1; + snprintf(errstr, errstr_len, "Failed to write to socket buffer: %s", strerror(errno)); + goto err_open; + } } @@ -889,7 +1044,18 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, goto err_open; } - prc=(struct reqcontext *)ntohll(*((uint64_t *)rep.handle)); + prc = g_hash_table_lookup(handlehash, rep.handle); + if (!prc) { + retval=-1; + snprintf(errstr, errstr_len, "Unrecognised handle in reply: 0x%llX", *(long long unsigned int*)(rep.handle)); + goto err_open; + } + if (!g_hash_table_remove(handlehash, rep.handle)) { + retval=-1; + snprintf(errstr, errstr_len, "Could not remove handle from hash: 0x%llX", *(long long unsigned int*)(rep.handle)); + goto err_open; + } + if (prc->req.magic != htonl(NBD_REQUEST_MAGIC)) { retval=-1; snprintf(errstr, errstr_len, "Bad magic in inflight data: %08x", prc->req.magic); @@ -964,7 +1130,7 @@ int integrity_test(gchar* hostname, int port, char* name, int sock, goto err_open; } timespan=timeval_diff_to_double(&stop, &start); - speed=size/timespan; + speed=xfer/timespan; if(speed>1024) { speed=speed/1024.0; speedchar[0]='K'; @@ -999,6 +1165,8 @@ err: if (*errstr) g_warning("%s",errstr); + g_hash_table_destroy(handlehash); + return retval; } @@ -1015,10 +1183,13 @@ int main(int argc, char**argv) { int testflags=0; testfunc test = throughput_test; + /* Ignore SIGPIPE as we want to pick up the error from write() */ + signal (SIGPIPE, SIG_IGN); + if(argc<3) { g_message("%d: Not enough arguments", (int)getpid()); g_message("%d: Usage: %s ", (int)getpid(), argv[0]); - g_message("%d: Or: %s -N ", (int)getpid(), argv[0]); + g_message("%d: Or: %s -N []", (int)getpid(), argv[0]); exit(EXIT_FAILURE); } logging(); @@ -1031,7 +1202,6 @@ int main(int argc, char**argv) { nonopt++; break; case 1: - if(want_port) p=(strtol(argv[2], NULL, 0)); if(p==LONG_MIN||p==LONG_MAX) { g_critical("Could not parse port number: %s", strerror(errno)); @@ -1042,7 +1212,9 @@ int main(int argc, char**argv) { break; case 'N': name=g_strdup(optarg); - p = 10809; + if(!p) { + p = 10809; + } want_port = false; break; case 't':