sema.goc 6.48 KB
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Semaphore implementation exposed to Go.
// Intended use is provide a sleep and wakeup
// primitive that can be used in the contended case
// of other synchronization primitives.
// Thus it targets the same goal as Linux's futex,
// but it has much simpler semantics.
//
// That is, don't think of these as semaphores.
// Think of them as a way to implement sleep and wakeup
// such that every sleep is paired with a single wakeup,
// even if, due to races, the wakeup happens before the sleep.
//
// See Mullender and Cox, ``Semaphores in Plan 9,''
// http://swtch.com/semaphore.pdf

package sync
#include "runtime.h"
#include "arch.h"

typedef struct SemaWaiter SemaWaiter;
struct SemaWaiter
{
	uint32 volatile*	addr;
	G*	g;
	int64	releasetime;
	int32	nrelease;	// -1 for acquire
	SemaWaiter*	prev;
	SemaWaiter*	next;
};

typedef struct SemaRoot SemaRoot;
struct SemaRoot
{
	Lock		lock;
	SemaWaiter*	head;
	SemaWaiter*	tail;
	// Number of waiters. Read w/o the lock.
	uint32 volatile	nwait;
};

// Prime to not correlate with any user patterns.
#define SEMTABLESZ 251

struct semtable
{
	SemaRoot root;
	uint8 pad[CacheLineSize-sizeof(SemaRoot)];
};
static struct semtable semtable[SEMTABLESZ];

static SemaRoot*
semroot(uint32 volatile *addr)
{
	return &semtable[((uintptr)addr >> 3) % SEMTABLESZ].root;
}

static void
semqueue(SemaRoot *root, uint32 volatile *addr, SemaWaiter *s)
{
	s->g = runtime_g();
	s->addr = addr;
	s->next = nil;
	s->prev = root->tail;
	if(root->tail)
		root->tail->next = s;
	else
		root->head = s;
	root->tail = s;
}

static void
semdequeue(SemaRoot *root, SemaWaiter *s)
{
	if(s->next)
		s->next->prev = s->prev;
	else
		root->tail = s->prev;
	if(s->prev)
		s->prev->next = s->next;
	else
		root->head = s->next;
	s->prev = nil;
	s->next = nil;
}

static int32
cansemacquire(uint32 volatile *addr)
{
	uint32 v;

	while((v = runtime_atomicload(addr)) > 0)
		if(runtime_cas(addr, v, v-1))
			return 1;
	return 0;
}

void
runtime_semacquire(uint32 volatile *addr, bool profile)
{
	SemaWaiter s;	// Needs to be allocated on stack, otherwise garbage collector could deallocate it
	SemaRoot *root;
	int64 t0;
	
	// Easy case.
	if(cansemacquire(addr))
		return;

	// Harder case:
	//	increment waiter count
	//	try cansemacquire one more time, return if succeeded
	//	enqueue itself as a waiter
	//	sleep
	//	(waiter descriptor is dequeued by signaler)
	root = semroot(addr);
	t0 = 0;
	s.releasetime = 0;
	if(profile && runtime_blockprofilerate > 0) {
		t0 = runtime_cputicks();
		s.releasetime = -1;
	}
	for(;;) {

		runtime_lock(&root->lock);
		// Add ourselves to nwait to disable "easy case" in semrelease.
		runtime_xadd(&root->nwait, 1);
		// Check cansemacquire to avoid missed wakeup.
		if(cansemacquire(addr)) {
			runtime_xadd(&root->nwait, -1);
			runtime_unlock(&root->lock);
			return;
		}
		// Any semrelease after the cansemacquire knows we're waiting
		// (we set nwait above), so go to sleep.
		semqueue(root, addr, &s);
		runtime_parkunlock(&root->lock, "semacquire");
		if(cansemacquire(addr)) {
			if(t0)
				runtime_blockevent(s.releasetime - t0, 3);
			return;
		}
	}
}

void
runtime_semrelease(uint32 volatile *addr)
{
	SemaWaiter *s;
	SemaRoot *root;

	root = semroot(addr);
	runtime_xadd(addr, 1);

	// Easy case: no waiters?
	// This check must happen after the xadd, to avoid a missed wakeup
	// (see loop in semacquire).
	if(runtime_atomicload(&root->nwait) == 0)
		return;

	// Harder case: search for a waiter and wake it.
	runtime_lock(&root->lock);
	if(runtime_atomicload(&root->nwait) == 0) {
		// The count is already consumed by another goroutine,
		// so no need to wake up another goroutine.
		runtime_unlock(&root->lock);
		return;
	}
	for(s = root->head; s; s = s->next) {
		if(s->addr == addr) {
			runtime_xadd(&root->nwait, -1);
			semdequeue(root, s);
			break;
		}
	}
	runtime_unlock(&root->lock);
	if(s) {
		if(s->releasetime)
			s->releasetime = runtime_cputicks();
		runtime_ready(s->g);
	}
}

// TODO(dvyukov): move to netpoll.goc once it's used by all OSes.
void net_runtime_Semacquire(uint32 *addr)
  __asm__ (GOSYM_PREFIX "net.runtime_Semacquire");

void net_runtime_Semacquire(uint32 *addr)
{
	runtime_semacquire(addr, true);
}

void net_runtime_Semrelease(uint32 *addr)
  __asm__ (GOSYM_PREFIX "net.runtime_Semrelease");

void net_runtime_Semrelease(uint32 *addr)
{
	runtime_semrelease(addr);
}

func runtime_Semacquire(addr *uint32) {
	runtime_semacquire(addr, true);
}

func runtime_Semrelease(addr *uint32) {
	runtime_semrelease(addr);
}

typedef struct SyncSema SyncSema;
struct SyncSema
{
	Lock		lock;
	SemaWaiter*	head;
	SemaWaiter*	tail;
};

func runtime_Syncsemcheck(size uintptr) {
	if(size != sizeof(SyncSema)) {
		runtime_printf("bad SyncSema size: sync:%D runtime:%D\n", (int64)size, (int64)sizeof(SyncSema));
		runtime_throw("bad SyncSema size");
	}
}

// Syncsemacquire waits for a pairing Syncsemrelease on the same semaphore s.
func runtime_Syncsemacquire(s *SyncSema) {
	SemaWaiter w, *wake;
	int64 t0;

	w.g = runtime_g();
	w.nrelease = -1;
	w.next = nil;
	w.releasetime = 0;
	t0 = 0;
	if(runtime_blockprofilerate > 0) {
		t0 = runtime_cputicks();
		w.releasetime = -1;
	}

	runtime_lock(&s->lock);
	if(s->head && s->head->nrelease > 0) {
		// have pending release, consume it
		wake = nil;
		s->head->nrelease--;
		if(s->head->nrelease == 0) {
			wake = s->head;
			s->head = wake->next;
			if(s->head == nil)
				s->tail = nil;
		}
		runtime_unlock(&s->lock);
		if(wake)
			runtime_ready(wake->g);
	} else {
		// enqueue itself
		if(s->tail == nil)
			s->head = &w;
		else
			s->tail->next = &w;
		s->tail = &w;
		runtime_parkunlock(&s->lock, "semacquire");
		if(t0)
			runtime_blockevent(w.releasetime - t0, 2);
	}
}

// Syncsemrelease waits for n pairing Syncsemacquire on the same semaphore s.
func runtime_Syncsemrelease(s *SyncSema, n uint32) {
	SemaWaiter w, *wake;

	w.g = runtime_g();
	w.nrelease = (int32)n;
	w.next = nil;
	w.releasetime = 0;

	runtime_lock(&s->lock);
	while(w.nrelease > 0 && s->head && s->head->nrelease < 0) {
		// have pending acquire, satisfy it
		wake = s->head;
		s->head = wake->next;
		if(s->head == nil)
			s->tail = nil;
		if(wake->releasetime)
			wake->releasetime = runtime_cputicks();
		runtime_ready(wake->g);
		w.nrelease--;
	}
	if(w.nrelease > 0) {
		// enqueue itself
		if(s->tail == nil)
			s->head = &w;
		else
			s->tail->next = &w;
		s->tail = &w;
		runtime_parkunlock(&s->lock, "semarelease");
	} else
		runtime_unlock(&s->lock);
}