// This is the entry for a created POSIX thread. It "wraps" // the function call of entry(id) to be compatible to the // pthread library's requirements: a thread takes a void * // pointer as argument, and returns a pointer. staticinline void *wrapper_(void *arg) { structthread *t = (struct thread *)arg; t->entry(t->id); returnNULL; }
// Create a thread that calls function fn. fn takes an integer // thread id as input argument. staticinline voidcreate(void *fn) { assert(n_ < LENGTH(threads_));
// Yes, we have resource leak here! threads_[n_] = (struct thread) { .id = n_ + 1, .status = T_LIVE, .entry = fn, }; pthread_create( &(threads_[n_].thread), // a pthread_t NULL, // options; all to default wrapper_, // the wrapper function &threads_[n_] // the argument to the wrapper ); n_++; }
// Wait until all threads return. staticinline voidjoin() { for (int i = 0; i < LENGTH(threads_); i++) { structthread *t = &threads_[i]; if (t->status == T_LIVE) { pthread_join(t->thread, NULL); t->status = T_DEAD; } } }
while (!CAN_PRODUCE) { cond_wait(&cv, &lk); // We are here if the thread is being waked up, with // the mutex being acquired. Then we check once again, // and move out of the loop if CAN_PRODUCE holds. }
// We still hold the mutex--and we check again. assert(CAN_PRODUCE);