static uint64_t size;
+static int looseordering = 0;
+
static gchar * transactionlog = "nbd-tester-client.tr";
typedef enum {
int numitems;
};
+struct blkitem {
+ uint32_t seq;
+ int32_t inflightr;
+ int32_t inflightw;
+};
+
void rclist_unlink(struct rclist * l, struct reqcontext * p) {
if (p && l) {
struct reqcontext * prev = p->prev;
pid_t G_GNUC_UNUSED mypid = getpid();
int blkhashfd = -1;
char *blkhashname=NULL;
- uint32_t *blkhash = NULL;
+ struct blkitem *blkhash = NULL;
int logfd=-1;
uint64_t seq=1;
uint64_t processed=0;
uint64_t printer=0;
uint64_t xfer=0;
int readtransactionfile = 1;
+ int blocked = 0;
struct rclist txqueue={NULL, NULL, 0};
struct rclist inflight={NULL, NULL, 0};
struct chunklist txbuf={NULL, NULL, 0};
goto err;
}
- if (-1 == lseek(blkhashfd, (off_t)((size>>9)<<2), SEEK_SET)) {
+ if (-1 == lseek(blkhashfd, (off_t)((size>>9)*sizeof(struct blkitem)), SEEK_SET)) {
g_warning("Could not llseek temp file: %s", strerror(errno));
retval=-1;
goto err;
}
if (NULL == (blkhash = mmap(NULL,
- (size>>9)<<2,
+ (size>>9)*sizeof(struct blkitem),
PROT_READ | PROT_WRITE,
MAP_SHARED,
blkhashfd,
FD_ZERO(&rset);
if (readtransactionfile)
FD_SET(logfd, &rset);
- if (txqueue.numitems || txbuf.numitems)
+ if ((!blocked && txqueue.numitems) || txbuf.numitems)
FD_SET(sock, &wset);
if (inflight.numitems)
FD_SET(sock, &rset);
/* See if we have a write we can do */
if (FD_ISSET(sock, &wset))
{
- if (!(txqueue.head) && !(txbuf.head))
+ if ((!(txqueue.head) && !(txbuf.head)) || blocked)
g_warning("Socket write FD set but we shouldn't have been interested");
/* If there is no buffered data, generate some */
- if (!(txbuf.head) && (NULL != (prc = txqueue.head)))
+ if (!blocked && !(txbuf.head) && (NULL != (prc = txqueue.head)))
{
- rclist_unlink(&txqueue, prc);
- rclist_addtail(&inflight, prc);
-
if (ntohl(prc->req.magic) != NBD_REQUEST_MAGIC) {
retval=-1;
- g_warning("Asked to write a reply without a magic number");
+ g_warning("Asked to write a request without a magic number");
goto err_open;
}
- dumpcommand("Sending command", prc->req.type);
command = ntohl(prc->req.type);
from = ntohll(prc->req.from);
len = ntohl(prc->req.len);
+
+ /* First check whether we can touch this command at all. If this
+ * command is a read, and there is an inflight write, OR if this
+ * command is a write, and there is an inflight read or write, then
+ * we need to leave the command alone and signal that we are blocked
+ */
+
+ if (!looseordering)
+ {
+ uint64_t cfrom;
+ uint32_t clen;
+ cfrom = from;
+ clen = len;
+ while (clen > 0) {
+ uint64_t blknum = cfrom>>9;
+ if (cfrom>=size) {
+ snprintf(errstr, errstr_len, "offset %llx beyond size %llx",
+ (long long int) cfrom, (long long int)size);
+ goto err_open;
+ }
+ if (blkhash[blknum].inflightw ||
+ (blkhash[blknum].inflightr &&
+ ((command & NBD_CMD_MASK_COMMAND)==NBD_CMD_WRITE))) {
+ blocked=1;
+ break;
+ }
+ cfrom += 512;
+ clen -= 512;
+ }
+ }
+
+ if (blocked)
+ goto skipdequeue;
+
+ rclist_unlink(&txqueue, prc);
+ rclist_addtail(&inflight, prc);
+
+ dumpcommand("Sending command", prc->req.type);
/* we rewrite the handle as they otherwise may not be unique */
*((uint64_t*)(prc->req.handle))=getrandomhandle(handlehash);
g_hash_table_insert(handlehash, prc->req.handle, prc);
(long long int) from, (long long int)size);
goto err_open;
}
+ (blkhash[blknum].inflightw)++;
/* work out what we should be writing */
makebuf(dbuf, prc->seq, blknum);
addbuffer(&txbuf, dbuf, 512);
break;
case NBD_CMD_READ:
xfer+=len;
+ while (len > 0) {
+ uint64_t blknum = from>>9;
+ if (from>=size) {
+ snprintf(errstr, errstr_len, "offset %llx beyond size %llx",
+ (long long int) from, (long long int)size);
+ goto err_open;
+ }
+ (blkhash[blknum].inflightr)++;
+ from += 512;
+ len -= 512;
+ }
break;
case NBD_CMD_DISC:
case NBD_CMD_FLUSH:
prc = NULL;
}
+ skipdequeue:
/* there should be some now */
if (writebuffer(sock, &txbuf)<0) {
err_open,
"Could not read data: %s",
strerror(errno));
+ if (--(blkhash[blknum].inflightr) <0 ) {
+ snprintf(errstr, errstr_len, "Received a read reply for offset %llx when not in flight",
+ (long long int) from);
+ goto err_open;
+ }
/* work out what we was written */
- if (checkbuf(dbuf, blkhash[blknum], blknum))
- {
+ if (checkbuf(dbuf, blkhash[blknum].seq, blknum)) {
retval=-1;
snprintf(errstr, errstr_len, "Bad reply data: I wanted blk %08x, seq %08x but I got (at a guess) blk %08x, seq %08x",
(unsigned int) blknum,
- blkhash[blknum],
+ blkhash[blknum].seq,
((uint32_t *)(dbuf))[0],
((uint32_t *)(dbuf))[1]
);
/* subsequent reads should get data with this seq*/
while (len > 0) {
uint64_t blknum = from>>9;
- blkhash[blknum]=(uint32_t)(prc->seq);
+ if (--(blkhash[blknum].inflightw) <0 ) {
+ snprintf(errstr, errstr_len, "Received a write reply for offset %llx when not in flight",
+ (long long int) from);
+ goto err_open;
+ }
+ blkhash[blknum].seq=(uint32_t)(prc->seq);
from += 512;
len -= 512;
}
default:
break;
}
-
+ blocked = 0;
processed++;
rclist_unlink(&inflight, prc);
prc->req.magic=0; /* so a duplicate reply is detected */
}
err:
if (size && blkhash)
- munmap(blkhash, (size>>9)<<2);
+ munmap(blkhash, (size>>9)*sizeof(struct blkitem));
if (blkhashfd != -1)
close (blkhashfd);
exit(EXIT_FAILURE);
}
logging();
- while((c=getopt(argc, argv, "-N:t:owfi"))>=0) {
+ while((c=getopt(argc, argv, "-N:t:owfil"))>=0) {
switch(c) {
case 1:
switch(nonopt) {
case 'o':
test=oversize_test;
break;
+ case 'l':
+ looseordering=1;
+ break;
case 'w':
testflags|=TEST_WRITE;
break;