ref: 655dad8f92c315c5c205b3c021b19ce2916e4c9b
dir: /lib/thread/rwlock+futex.myr/
use std use "atomic" use "futex" use "tls" use "types" pkg thread = /* The 31 low bits of `_state` contain either the number of readers holding the lock, or `0x7fffffff` if the lock is held by a single writer. The high bit is set if there are any waiting readers or writers. */ type rwlock = struct _state : ftxtag _owner : tid ;; const mkrwlock : (-> rwlock) const rdlock : (rw : rwlock# -> void) const wrlock : (rw : rwlock# -> void) const tryrdlock : (rw : rwlock# -> bool) const trywrlock : (rw : rwlock# -> bool) const rdunlock : (rw : rwlock# -> void) const wrunlock : (rw : rwlock# -> void) ;; const Nrmask = 0x7fffffff const Waitbit = 0x80000000 const mkrwlock = { -> [._state = 0, ._owner = -1] } const rdlock = {rw for ; ; var s = xget(&rw._state) match s & Nrmask | Nrmask - 1: std.die("error: rwlock overflowed\n") | Nrmask: /* The lock is held by a writer so we attempt to CAS in the wait bit and wait. If the CAS fails, the state of the lock has changed so we can try once again to acquire it. */ if xcas(&rw._state, s, Nrmask | Waitbit) == s ftxwait(&rw._state, Nrmask | Waitbit, 0) ;; | _: /* Otherwise the lock is either unlocked or held by some number of readers. Either way we simply increment the reader count via CAS. */ if xcas(&rw._state, s, s + 1) == s -> void ;; ;; ;; } const wrlock = {rw for ; ; if rw._owner == tid() std.fput(std.Err, "error: thread {} attempted to relock an rwlock it already holds\n", tid()) std.suicide() ;; /* `_state` must be 0 for a writer to acquire the lock. Anything else means the lock is either held or in the process of being released by the last reader. */ var s = xcas(&rw._state, 0, Nrmask) if s == 0 rw._owner = tid() -> void ;; /* If we fail to acquire the lock, attempt to CAS in the wait bit and wait. It the CAS fails, the state of the lock has changed so we can try once again to acquire it. */ if xcas(&rw._state, s, s | Waitbit) == s ftxwait(&rw._state, s | Waitbit, 0) ;; ;; } const tryrdlock = {rw for ; ; var s = xget(&rw._state) match s & Nrmask | Nrmask - 1: std.die("error: rwlock overflowed\n") | Nrmask: -> false | _: if xcas(&rw._state, s, s + 1) == s -> true ;; ;; ;; -> false /* Unreachable */ } const trywrlock = {rw if xcas(&rw._state, 0, Nrmask) == 0 rw._owner = tid() -> true ;; -> false } const rdunlock = {rw /* Only the last reader needs to potentially wake a writer so all other readers can get away with simply decrementing the reader count via FAA. */ var prev = xadd(&rw._state, -1) std.assert(prev & Nrmask != 0, "error: rwlock underflowed\n") if prev & Nrmask == 1 && prev & Waitbit != 0 /* If there are one or more waiting writers and no other readers have acquired the lock since we fetched the reader count, then the value of `_state` is guaranteed to be `Waitbit`. If the CAS succeeds, we wake one of those writers. */ if xcas(&rw._state, Waitbit, 0) == Waitbit ftxwake(&rw._state) ;; ;; } const wrunlock = {rw if rw._owner != tid() std.fput(std.Err, "error: thread {} attempted to unlock an rwlock last held by {}\n", tid(), rw._owner) std.suicide() ;; rw._owner = -1 /* If the wait bit was set then there are one or more waiting readers, writers, or both. In the first and third cases, we need to wake everyone; in the second, we'd like to just wake one thread. However, we currently have no way of knowing which case we're in so we always have to wake everyone. */ if xchg(&rw._state, 0) & Waitbit != 0 ftxwakeall(&rw._state) ;; }