#include <unistd.h>
#include "config.h"
#include "lfs.h"
-#define MY_NAME "nbd-tester-client"
-#include "cliserv.h"
-
#include <netinet/in.h>
#include <glib.h>
+#define MY_NAME "nbd-tester-client"
+#include "cliserv.h"
+
static gchar errstr[1024];
const static int errstr_len=1024;
struct reqcontext {
uint64_t seq;
+ char orighandle[8];
struct nbd_request req;
struct reqcontext * next;
struct reqcontext * prev;
int numitems;
};
+struct chunk {
+ char * buffer;
+ char * readptr;
+ char * writeptr;
+ uint64_t space;
+ uint64_t length;
+ struct chunk * next;
+ struct chunk * prev;
+};
+
+struct chunklist {
+ struct chunk * head;
+ struct chunk * tail;
+ int numitems;
+};
+
void rclist_unlink(struct rclist * l, struct reqcontext * p) {
if (p && l) {
struct reqcontext * prev = p->prev;
}
/* Add a new list item to the tail */
-void rclist_addtail(struct rclist * l, struct reqcontext * p)
-{
+void rclist_addtail(struct rclist * l, struct reqcontext * p) {
+ if (!p || !l)
+ return;
+ if (l->tail) {
+ if (l->tail->next)
+ g_warning("addtail found list tail has a next pointer");
+ l->tail->next = p;
+ p->next = NULL;
+ p->prev = l->tail;
+ l->tail = p;
+ } else {
+ if (l->head)
+ g_warning("addtail found no list tail but a list head");
+ l->head = p;
+ l->tail = p;
+ p->prev = NULL;
+ p->next = NULL;
+ }
+ l->numitems++;
+}
+
+void chunklist_unlink(struct chunklist * l, struct chunk * p) {
+ if (p && l) {
+ struct chunk * prev = p->prev;
+ struct chunk * next = p->next;
+
+ /* Fix link to previous */
+ if (prev)
+ prev->next = next;
+ else
+ l->head = next;
+
+ if (next)
+ next->prev = prev;
+ else
+ l->tail = prev;
+
+ p->prev = NULL;
+ p->next = NULL;
+ l->numitems--;
+ }
+}
+
+/* Add a new list item to the tail */
+void chunklist_addtail(struct chunklist * l, struct chunk * p) {
if (!p || !l)
return;
if (l->tail) {
l->numitems++;
}
+/* Add some new bytes to a chunklist */
+void addbuffer(struct chunklist * l, void * data, uint64_t len) {
+ void * buf;
+ uint64_t size = 64*1024;
+ struct chunk * pchunk;
+
+ while (len>0)
+ {
+ /* First see if there is a current chunk, and if it has space */
+ if (l->tail && l->tail->space) {
+ uint64_t towrite = len;
+ if (towrite > l->tail->space)
+ towrite = l->tail->space;
+ memcpy(l->tail->writeptr, data, towrite);
+ l->tail->length += towrite;
+ l->tail->space -= towrite;
+ l->tail->writeptr += towrite;
+ len -= towrite;
+ data += towrite;
+ }
+
+ if (len>0) {
+ /* We still need to write more, so prepare a new chunk */
+ if ((NULL == (buf = malloc(size))) || (NULL == (pchunk = calloc(1, sizeof(struct chunk))))) {
+ g_critical("Out of memory");
+ exit (1);
+ }
+
+ pchunk->buffer = buf;
+ pchunk->readptr = buf;
+ pchunk->writeptr = buf;
+ pchunk->space = size;
+ chunklist_addtail(l, pchunk);
+ }
+ }
+
+}
+
+/* returns 0 on success, -1 on failure */
+int writebuffer(int fd, struct chunklist * l) {
+
+ struct chunk * pchunk = NULL;
+ int res;
+ if (!l)
+ return 0;
+
+ while (!pchunk)
+ {
+ pchunk = l->head;
+ if (!pchunk)
+ return 0;
+ if (!(pchunk->length) || !(pchunk->readptr)) {
+ chunklist_unlink(l, pchunk);
+ free(pchunk->buffer);
+ free(pchunk);
+ pchunk = NULL;
+ }
+ }
+
+ /* OK we have a chunk with some data in */
+ res = write(fd, pchunk->readptr, pchunk->length);
+ if (res==0)
+ errno = EAGAIN;
+ if (res<=0)
+ return -1;
+ pchunk->length -= res;
+ pchunk->readptr += res;
+ if (!pchunk->length) {
+ chunklist_unlink(l, pchunk);
+ free(pchunk->buffer);
+ free(pchunk);
+ }
+ return 0;
+}
+
+
+
#define TEST_WRITE (1<<0)
#define TEST_FLUSH (1<<1)
int retval=0;
struct nbd_request req;
struct nbd_reply rep;
- int request=0;
int i=0;
int serverflags = 0;
- pid_t mypid = getpid();
+ pid_t G_GNUC_UNUSED mypid = getpid();
char buf[((1024*1024)+sizeof(struct nbd_request)/2)<<1];
bool got_err;
int throughput_test(gchar* hostname, int port, char* name, int sock,
char sock_is_open, char close_sock, int testflags) {
long long int i;
- char buf[1024];
char writebuf[1024];
struct nbd_request req;
int requests=0;
char speedchar[2] = { '\0', '\0' };
int retval=0;
int serverflags = 0;
- size_t tmp;
signed int do_write=TRUE;
pid_t mypid = getpid();
#endif
}
+/* return an unused handle */
+uint64_t getrandomhandle(GHashTable *phash) {
+ uint64_t handle = 0;
+ int i;
+ do {
+ /* RAND_MAX may be as low as 2^15 */
+ for (i= 1 ; i<=5; i++)
+ handle ^= random() ^ (handle << 15);
+ } while (g_hash_table_lookup(phash, &handle));
+ return handle;
+}
+
int integrity_test(gchar* hostname, int port, char* name, int sock,
char sock_is_open, char close_sock, int testflags) {
- struct nbd_request req;
struct nbd_reply rep;
fd_set rset;
fd_set wset;
char speedchar[2] = { '\0', '\0' };
int retval=0;
int serverflags = 0;
- pid_t mypid = getpid();
+ pid_t G_GNUC_UNUSED mypid = getpid();
int blkhashfd = -1;
char *blkhashname=NULL;
uint32_t *blkhash = NULL;
uint64_t seq=1;
uint64_t processed=0;
uint64_t printer=0;
+ uint64_t xfer=0;
int readtransactionfile = 1;
struct rclist txqueue={NULL, NULL, 0};
struct rclist inflight={NULL, NULL, 0};
+ struct chunklist txbuf={NULL, NULL, 0};
+
+ GHashTable *handlehash = g_hash_table_new(g_int64_hash, g_int64_equal);
size=0;
if(!sock_is_open) {
goto err_open;
}
- while (readtransactionfile || txqueue.numitems || inflight.numitems) {
+ while (readtransactionfile || txqueue.numitems || txbuf.numitems || inflight.numitems) {
int ret;
uint32_t magic;
- uint64_t hand;
uint32_t command;
uint64_t from;
uint32_t len;
FD_ZERO(&rset);
if (readtransactionfile)
FD_SET(logfd, &rset);
- if (txqueue.numitems)
+ if (txqueue.numitems || txbuf.numitems)
FD_SET(sock, &wset);
if (inflight.numitems)
FD_SET(sock, &rset);
"Could not read transaction log: %s",
strerror(errno));
prc->req.magic = htonl(NBD_REQUEST_MAGIC);
+ memcpy(prc->orighandle, prc->req.handle, 8);
prc->seq=seq++;
if ((ntohl(prc->req.type) & NBD_CMD_MASK_COMMAND) == NBD_CMD_DISC) {
/* no more to read; don't enqueue as no reply
/* See if we have a write we can do */
if (FD_ISSET(sock, &wset))
{
- prc = txqueue.head;
- if (!prc)
+ if (!(txqueue.head) && !(txbuf.head))
g_warning("Socket write FD set but we shouldn't have been interested");
- else
+
+ /* If there is no buffered data, generate some */
+ if (!(txbuf.head) && (NULL != (prc = txqueue.head)))
{
-
rclist_unlink(&txqueue, prc);
rclist_addtail(&inflight, prc);
from = ntohll(prc->req.from);
len = ntohl(prc->req.len);
/* we rewrite the handle as they otherwise may not be unique */
- *((uint64_t*)(prc->req.handle))=htonll((uint64_t)prc);
- WRITE_ALL_ERRCHK(sock,
- &(prc->req),
- sizeof(struct nbd_request),
- err_open,
- "Could not write command: %s",
- strerror(errno));
+ *((uint64_t*)(prc->req.handle))=getrandomhandle(handlehash);
+ g_hash_table_insert(handlehash, prc->req.handle, prc);
+ addbuffer(&txbuf, &(prc->req), sizeof(struct nbd_request));
switch (command & NBD_CMD_MASK_COMMAND) {
case NBD_CMD_WRITE:
+ xfer+=len;
while (len > 0) {
uint64_t blknum = from>>9;
char dbuf[512];
}
/* work out what we should be writing */
makebuf(dbuf, prc->seq, blknum);
- WRITE_ALL_ERRCHK(sock,
- dbuf,
- 512,
- err_open,
- "Could not write data: %s",
- strerror(errno));
+ addbuffer(&txbuf, dbuf, 512);
from += 512;
len -= 512;
}
-
- case NBD_CMD_DISC:
+ break;
case NBD_CMD_READ:
+ xfer+=len;
+ break;
+ case NBD_CMD_DISC:
case NBD_CMD_FLUSH:
break;
default:
prc = NULL;
}
+
+ /* there should be some now */
+ if (writebuffer(sock, &txbuf)<0) {
+ retval=-1;
+ snprintf(errstr, errstr_len, "Failed to write to socket buffer: %s", strerror(errno));
+ goto err_open;
+ }
}
goto err_open;
}
- prc=(struct reqcontext *)ntohll(*((uint64_t *)rep.handle));
+ prc = g_hash_table_lookup(handlehash, rep.handle);
+ if (!prc) {
+ retval=-1;
+ snprintf(errstr, errstr_len, "Unrecognised handle in reply: 0x%llX", *(long long unsigned int*)(rep.handle));
+ goto err_open;
+ }
+ if (!g_hash_table_remove(handlehash, rep.handle)) {
+ retval=-1;
+ snprintf(errstr, errstr_len, "Could not remove handle from hash: 0x%llX", *(long long unsigned int*)(rep.handle));
+ goto err_open;
+ }
+
if (prc->req.magic != htonl(NBD_REQUEST_MAGIC)) {
retval=-1;
snprintf(errstr, errstr_len, "Bad magic in inflight data: %08x", prc->req.magic);
goto err_open;
}
timespan=timeval_diff_to_double(&stop, &start);
- speed=size/timespan;
+ speed=xfer/timespan;
if(speed>1024) {
speed=speed/1024.0;
speedchar[0]='K';
if (*errstr)
g_warning("%s",errstr);
+ g_hash_table_destroy(handlehash);
+
return retval;
}
if(argc<3) {
g_message("%d: Not enough arguments", (int)getpid());
g_message("%d: Usage: %s <hostname> <port>", (int)getpid(), argv[0]);
- g_message("%d: Or: %s <hostname> -N <exportname>", (int)getpid(), argv[0]);
+ g_message("%d: Or: %s <hostname> -N <exportname> [<port>]", (int)getpid(), argv[0]);
exit(EXIT_FAILURE);
}
logging();
nonopt++;
break;
case 1:
- if(want_port)
p=(strtol(argv[2], NULL, 0));
if(p==LONG_MIN||p==LONG_MAX) {
g_critical("Could not parse port number: %s", strerror(errno));
break;
case 'N':
name=g_strdup(optarg);
- p = 10809;
+ if(!p) {
+ p = 10809;
+ }
want_port = false;
break;
case 't':