From 7a889fd7fb01023b6ad4f673a4693742c9e05cdd Mon Sep 17 00:00:00 2001 From: Mitya Selivanov Date: Tue, 19 Sep 2023 14:44:12 +0200 Subject: shared mutex fixes --- source/kit/shared_memory.h | 1 + source/kit/shared_mutex.h | 33 ++++++++++---- source/tests/test_interprocess.c | 97 ++++++++++++++++++++++++++++++++-------- 3 files changed, 105 insertions(+), 26 deletions(-) (limited to 'source') diff --git a/source/kit/shared_memory.h b/source/kit/shared_memory.h index 594d4a4..bd910cb 100644 --- a/source/kit/shared_memory.h +++ b/source/kit/shared_memory.h @@ -40,6 +40,7 @@ kit_status_t kit_shared_memory_clean(kit_str_t name); #ifndef KIT_DISABLE_SHORT_NAMES # define shared_memory_t kit_shared_memory_t +# define shared_memory_clean kit_shared_memory_clean # define shared_memory_open kit_shared_memory_open # define shared_memory_close kit_shared_memory_close diff --git a/source/kit/shared_mutex.h b/source/kit/shared_mutex.h index de5958d..3626939 100644 --- a/source/kit/shared_mutex.h +++ b/source/kit/shared_mutex.h @@ -9,6 +9,7 @@ #include "threads.h" #include +#include #ifdef __cplusplus extern "C" { @@ -23,22 +24,27 @@ extern "C" { #endif enum { - KIT_SHARED_MUTEX_READY, + KIT_SHARED_MUTEX_READY = 1, KIT_SHARED_MUTEX_LOCKED, KIT_SHARED_MUTEX_WRITER }; -typedef struct { - i8 _Atomic state; - i32 readers; +typedef union { + struct { + i8 _Atomic state; + i32 readers; + }; + struct { + u8 _pad[16]; + }; } kit_shared_mutex_t; static void kit_shared_mutex_init(kit_shared_mutex_t *m) { assert(m != NULL); + memset(m, 0, sizeof *m); atomic_store_explicit(&m->state, KIT_SHARED_MUTEX_READY, memory_order_relaxed); - m->readers = 0; } static i8 kit_shared_try_lock(kit_shared_mutex_t *m) { @@ -58,7 +64,7 @@ static i8 kit_shared_try_lock(kit_shared_mutex_t *m) { } assert(m->readers >= 0); - m->readers++; + ++m->readers; i8 prev_state = KIT_SHARED_MUTEX_LOCKED; if (!atomic_compare_exchange_strong_explicit( @@ -66,7 +72,7 @@ static i8 kit_shared_try_lock(kit_shared_mutex_t *m) { memory_order_seq_cst, memory_order_seq_cst)) assert(0); - return 0; + return 1; } static void kit_shared_lock(kit_shared_mutex_t *m) { @@ -111,7 +117,7 @@ static i8 kit_unique_try_lock(kit_shared_mutex_t *m) { memory_order_seq_cst, memory_order_seq_cst)) return 0; - i8 is_locked = m->readers == 0; + i8 is_locked = (m->readers == 0); prev_state = KIT_SHARED_MUTEX_WRITER; if (!is_locked && @@ -147,6 +153,17 @@ static void kit_unique_unlock(kit_shared_mutex_t *m) { # pragma GCC diagnostic pop #endif +#ifndef KIT_DISABLE_SHORT_NAMES +# define shared_mutex_t kit_shared_mutex_t +# define shared_mutex_init kit_shared_mutex_init +# define shared_try_lock kit_shared_try_lock +# define shared_lock kit_shared_lock +# define shared_unlock kit_shared_unlock +# define unique_try_lock kit_unique_try_lock +# define unique_lock kit_unique_lock +# define unique_unlock kit_unique_unlock +#endif + #ifdef __cplusplus } #endif diff --git a/source/tests/test_interprocess.c b/source/tests/test_interprocess.c index cf33086..b41f71f 100644 --- a/source/tests/test_interprocess.c +++ b/source/tests/test_interprocess.c @@ -1,19 +1,24 @@ #include "../kit/shared_memory.h" +#include "../kit/shared_mutex.h" #include "../kit/threads.h" #include #define NAME "kit_test_interprocess" -enum { DATA_SIZE = 64 }; +enum { DATA_SIZE = 64, TIMEOUT = 3 }; + +typedef struct { + shared_mutex_t m; + i8 state; + i8 bytes[DATA_SIZE]; +} shared_data_t; enum { STATE_INIT, STATE_READY, STATE_DONE }; int run_writer() { - kit_shared_memory_clean(SZ(NAME)); - - kit_shared_memory_t mem = kit_shared_memory_open( - SZ(NAME), DATA_SIZE, KIT_SHARED_MEMORY_CREATE); + shared_memory_t mem = shared_memory_open( + SZ(NAME), sizeof(shared_data_t), SHARED_MEMORY_CREATE); if (mem.status != KIT_OK) { printf("%s: kit_shared_memory_open failed.\n", __FUNCTION__); @@ -21,13 +26,41 @@ int run_writer() { return 1; } - mem.bytes[0] = STATE_INIT; - for (int i = 1; i < DATA_SIZE; i++) mem.bytes[i] = i; - mem.bytes[0] = STATE_READY; + shared_data_t *p = (shared_data_t *) mem.bytes; + + shared_mutex_init(&p->m); + + unique_lock(&p->m); + p->state = STATE_INIT; + unique_unlock(&p->m); + + unique_lock(&p->m); + for (int i = 0; i < DATA_SIZE; i++) p->bytes[i] = i; + p->state = STATE_READY; + unique_unlock(&p->m); + + struct timespec t0; + timespec_get(&t0, TIME_UTC); + + shared_lock(&p->m); + while (p->state != STATE_DONE) { + shared_unlock(&p->m); - while (mem.bytes[0] != STATE_DONE) thrd_yield(); + struct timespec t1; + timespec_get(&t1, TIME_UTC); + + if (t1.tv_sec - t0.tv_sec > TIMEOUT) { + printf("%s: timeout.\n", __FUNCTION__); + shared_memory_close(&mem); + return 1; + } + + thrd_yield(); + shared_lock(&p->m); + } + shared_unlock(&p->m); - if (kit_shared_memory_close(&mem) != KIT_OK) { + if (shared_memory_close(&mem) != KIT_OK) { printf("%s: kit_shared_memory_close failed.\n", __FUNCTION__); fflush(stdout); return 1; @@ -37,12 +70,24 @@ int run_writer() { } int run_reader() { - kit_shared_memory_t mem; + struct timespec t0; + timespec_get(&t0, TIME_UTC); + + shared_memory_t mem; for (;;) { - mem = kit_shared_memory_open(SZ(NAME), DATA_SIZE, - KIT_SHARED_MEMORY_OPEN); + mem = shared_memory_open(SZ(NAME), sizeof(shared_data_t), + SHARED_MEMORY_OPEN); if (mem.status == KIT_OK) break; + + struct timespec t1; + timespec_get(&t1, TIME_UTC); + + if (t1.tv_sec - t0.tv_sec > TIMEOUT) { + printf("%s: timeout.\n", __FUNCTION__); + return 1; + } + thrd_yield(); } @@ -52,20 +97,31 @@ int run_reader() { return 1; } - while (mem.bytes[0] != STATE_READY) thrd_yield(); + shared_data_t *p = (shared_data_t *) mem.bytes; + + shared_lock(&p->m); + while (p->state != STATE_READY) { + shared_unlock(&p->m); + thrd_yield(); + shared_lock(&p->m); + } i32 status = 0; - for (i32 i = 1; i < DATA_SIZE; i++) - if (mem.bytes[i] != i) { + for (i32 i = 0; i < DATA_SIZE; i++) + if (p->bytes[i] != i) { printf("%s: wrong byte %d\n", __FUNCTION__, i); fflush(stdout); status = 1; } - mem.bytes[0] = STATE_DONE; + shared_unlock(&p->m); - if (kit_shared_memory_close(&mem) != KIT_OK) { + unique_lock(&p->m); + p->state = STATE_DONE; + unique_unlock(&p->m); + + if (shared_memory_close(&mem) != KIT_OK) { printf("%s: kit_shared_memory_close failed.\n", __FUNCTION__); fflush(stdout); status = 1; @@ -102,6 +158,11 @@ int main(int argc, char **argv) { if (strcmp(argv[1], "reader") == 0) return run_reader(); + if (strcmp(argv[1], "clean") == 0) { + shared_memory_clean(SZ(NAME)); + return 0; + } + printf("Invalid command line argument \"%s\"\n", argv[1]); return 1; } -- cgit v1.2.3