shithub: fossil

ref: 333ae58f37c2c8f79f7d7078283a30e42c4d7a27
dir: /disk.c/

View raw version
#include "stdinc.h"
#include "dat.h"
#include "fns.h"
#include "error.h"

static void diskThread(void *a);

enum {
	/*
	 * disable measurement since it gets alignment faults on BG
	 * and the guts used to be commented out.
	 */
	Timing	= 0,			/* flag */
	QueueSize = 100,		/* maximum block to queue */
};

struct Disk {
	QLock lk;
	int ref;

	int fd;
	Header h;

	Rendez flow;
	Rendez starve;
	Rendez flush;
	Rendez die;

	int nqueue;

	Block *cur;		/* block to do on current scan */
	Block *next;		/* blocks to do next scan */
};

/* keep in sync with Part* enum in dat.h */
static char *partname[] = {
	[PartError]	"error",
	[PartSuper]	"super",
	[PartLabel]	"label",
	[PartData]	"data",
	[PartVenti]	"venti",
};

Disk *
diskAlloc(int fd)
{
	u8int buf[HeaderSize];
	Header h;
	Disk *disk;

	if(pread(fd, buf, HeaderSize, HeaderOffset) < HeaderSize){
		werrstr("short read: %r");
		return nil;
	}

	if(!headerUnpack(&h, buf)){
		werrstr("bad disk header");
		return nil;
	}
	disk = vtmallocz(sizeof(Disk));
	disk->starve.l = &disk->lk;
	disk->flow.l = &disk->lk;
	disk->flush.l = &disk->lk;
	disk->fd = fd;
	disk->h = h;

	disk->ref = 2;
	proccreate(diskThread, disk, STACK);

	return disk;
}

void
diskFree(Disk *disk)
{
	diskFlush(disk);

	/* kill slave */
	qlock(&disk->lk);
	disk->die.l = &disk->lk;
	rwakeup(&disk->starve);
	while(disk->ref > 1)
		rsleep(&disk->die);
	qunlock(&disk->lk);
	close(disk->fd);
	vtfree(disk);
}

static u32int
partStart(Disk *disk, int part)
{
	switch(part){
	default:
		assert(0);
	case PartSuper:
		return disk->h.super;
	case PartLabel:
		return disk->h.label;
	case PartData:
		return disk->h.data;
	}
}


static u32int
partEnd(Disk *disk, int part)
{
	switch(part){
	default:
		assert(0);
	case PartSuper:
		return disk->h.super+1;
	case PartLabel:
		return disk->h.data;
	case PartData:
		return disk->h.end;
	}
}

int
diskReadRaw(Disk *disk, int part, u32int addr, uchar *buf)
{
	ulong start, end;
	u64int offset;
	int n, nn;

	start = partStart(disk, part);
	end = partEnd(disk, part);

	if(addr >= end-start){
		werrstr(EBadAddr);
		return 0;
	}

	offset = ((u64int)(addr + start))*disk->h.blockSize;
	n = disk->h.blockSize;
	while(n > 0){
		nn = pread(disk->fd, buf, n, offset);
		if(nn < 0){
			werrstr("%r");
			return 0;
		}
		if(nn == 0){
			werrstr("eof reading disk");
			return 0;
		}
		n -= nn;
		offset += nn;
		buf += nn;
	}
	return 1;
}

int
diskWriteRaw(Disk *disk, int part, u32int addr, uchar *buf)
{
	ulong start, end;
	u64int offset;
	int n;

	start = partStart(disk, part);
	end = partEnd(disk, part);

	if(addr >= end - start){
		werrstr(EBadAddr);
		return 0;
	}

	offset = ((u64int)(addr + start))*disk->h.blockSize;
	n = pwrite(disk->fd, buf, disk->h.blockSize, offset);
	if(n < 0){
		werrstr("%r");
		return 0;
	}
	if(n < disk->h.blockSize) {
		werrstr("short write");
		return 0;
	}

	return 1;
}

static void
diskQueue(Disk *disk, Block *b)
{
	Block **bp, *bb;

	qlock(&disk->lk);
	while(disk->nqueue >= QueueSize)
		rsleep(&disk->flow);
	if(disk->cur == nil || b->addr > disk->cur->addr)
		bp = &disk->cur;
	else
		bp = &disk->next;

	for(bb=*bp; bb; bb=*bp){
		if(b->addr < bb->addr)
			break;
		bp = &bb->ionext;
	}
	b->ionext = bb;
	*bp = b;
	if(disk->nqueue == 0)
		rwakeup(&disk->starve);
	disk->nqueue++;
	qunlock(&disk->lk);
}


void
diskRead(Disk *disk, Block *b)
{
	assert(b->iostate == BioEmpty || b->iostate == BioLabel);
	blockSetIOState(b, BioReading);
	diskQueue(disk, b);
}

void
diskWrite(Disk *disk, Block *b)
{
	assert(b->nlock == 1);
	assert(b->iostate == BioDirty);
	blockSetIOState(b, BioWriting);
	diskQueue(disk, b);
}

void
diskWriteAndWait(Disk *disk, Block *b)
{
	int nlock;

	/*
	 * If b->nlock > 1, the block is aliased within
	 * a single thread.  That thread is us.
	 * DiskWrite does some funny stuff with QLock
	 * and blockPut that basically assumes b->nlock==1.
	 * We humor diskWrite by temporarily setting
	 * nlock to 1.  This needs to be revisited.
	 */
	nlock = b->nlock;
	if(nlock > 1)
		b->nlock = 1;
	diskWrite(disk, b);
	while(b->iostate != BioClean)
		rsleep(&b->ioready);
	b->nlock = nlock;
}

int
diskBlockSize(Disk *disk)
{
	return disk->h.blockSize;	/* immuttable */
}

int
diskFlush(Disk *disk)
{
	Dir dir;

	qlock(&disk->lk);
	while(disk->nqueue > 0)
		rsleep(&disk->flush);
	qunlock(&disk->lk);

	/* there really should be a cleaner interface to flush an fd */
	nulldir(&dir);
	if(dirfwstat(disk->fd, &dir) < 0){
		werrstr("%r");
		return 0;
	}
	return 1;
}

u32int
diskSize(Disk *disk, int part)
{
	return partEnd(disk, part) - partStart(disk, part);
}

static uintptr
mypc(int x)
{
	return getcallerpc(&x);
}

static char *
disk2file(Disk *disk)
{
	static char buf[256];

	if (fd2path(disk->fd, buf, sizeof buf) < 0)
		strncpy(buf, "GOK", sizeof buf);
	return buf;
}

static void
diskThread(void *a)
{
	Disk *disk = a;
	Block *b;
	uchar *buf, *p;
	double t;
	int nio;

	threadsetname("disk");

//fprint(2, "diskThread %d\n", getpid());

	buf = vtmalloc(disk->h.blockSize);

	qlock(&disk->lk);
	if (Timing) {
		nio = 0;
		t = -nsec();
	}
	for(;;){
		while(disk->nqueue == 0){
			if (Timing) {
				t += nsec();
				if(nio >= 10000){
					fprint(2, "disk: io=%d at %.3fms\n",
						nio, t*1e-6/nio);
					nio = 0;
					t = 0;
				}
			}
			if(disk->die.l != nil)
				goto Done;
			rsleep(&disk->starve);
			if (Timing)
				t -= nsec();
		}
		assert(disk->cur != nil || disk->next != nil);

		if(disk->cur == nil){
			disk->cur = disk->next;
			disk->next = nil;
		}
		b = disk->cur;
		disk->cur = b->ionext;
		qunlock(&disk->lk);

		/*
		 * no one should hold onto blocking in the
		 * reading or writing state, so this lock should
		 * not cause deadlock.
		 */
if(0)fprint(2, "fossil: diskThread: %d:%d %x\n", getpid(), b->part, b->addr);
		bwatchLock(b);
		qlock(&b->lk);
		b->pc = mypc(0);
		assert(b->nlock == 1);
		switch(b->iostate){
		default:
			abort();
		case BioReading:
			if(!diskReadRaw(disk, b->part, b->addr, b->data)){
				fprint(2, "fossil: diskReadRaw failed: %s: "
					"score %V: part=%s block %ud: %r\n",
					disk2file(disk), b->score,
					partname[b->part], b->addr);
				blockSetIOState(b, BioReadError);
			}else
				blockSetIOState(b, BioClean);
			break;
		case BioWriting:
			p = blockRollback(b, buf);
			/* NB: ctime result ends with a newline */
			if(!diskWriteRaw(disk, b->part, b->addr, p)){
				fprint(2, "fossil: diskWriteRaw failed: %s: "
				    "score %V: date %s part=%s block %ud: %r\n",
					disk2file(disk), b->score,
					ctime(time(0)),
					partname[b->part], b->addr);
				break;
			}
			if(p != buf)
				blockSetIOState(b, BioClean);
			else
				blockSetIOState(b, BioDirty);
			break;
		}

		blockPut(b);		/* remove extra reference, unlock */
		qlock(&disk->lk);
		disk->nqueue--;
		if(disk->nqueue == QueueSize-1)
			rwakeup(&disk->flow);
		if(disk->nqueue == 0)
			rwakeup(&disk->flush);
		if(Timing)
			nio++;
	}
Done:
//fprint(2, "diskThread done\n");
	disk->ref--;
	rwakeup(&disk->die);
	qunlock(&disk->lk);
	vtfree(buf);
}