From 93bf289d04b1f029787f51824fbc42136a8285b0 Mon Sep 17 00:00:00 2001 From: Mitya Selivanov Date: Fri, 15 Sep 2023 02:02:57 +0200 Subject: shared_memory posix impl --- source/kit/_static.c | 2 + source/kit/atomic.h | 3 - source/kit/shared_memory.h | 30 ++++++++ source/kit/shared_memory.posix.c | 144 +++++++++++++++++++++++++++++++++++++ source/kit/shared_memory.win32.c | 4 ++ source/kit/shared_mutex.h | 150 +++++++++++++++++++++++++++++++++++++++ source/kit/status.h | 28 +++++--- source/kit/string_ref.h | 4 +- source/tests/test_interprocess.c | 121 ++++++++++++------------------- 9 files changed, 395 insertions(+), 91 deletions(-) create mode 100644 source/kit/shared_memory.h create mode 100644 source/kit/shared_memory.posix.c create mode 100644 source/kit/shared_memory.win32.c create mode 100644 source/kit/shared_mutex.h (limited to 'source') diff --git a/source/kit/_static.c b/source/kit/_static.c index 4d820ad..eb66879 100644 --- a/source/kit/_static.c +++ b/source/kit/_static.c @@ -10,3 +10,5 @@ #include "sha256.c" #include "threads.posix.c" #include "threads.win32.c" +#include "shared_memory.posix.c" +#include "shared_memory.win32.c" diff --git a/source/kit/atomic.h b/source/kit/atomic.h index 9d7f284..a91cb4c 100644 --- a/source/kit/atomic.h +++ b/source/kit/atomic.h @@ -5,8 +5,6 @@ #ifndef _MSC_VER # include - -# define KIT_ATOMIC(type_) type_ _Atomic #else # include @@ -14,7 +12,6 @@ extern "C" { # endif -# define KIT_ATOMIC(type_) type_ volatile # define _Atomic volatile enum { diff --git a/source/kit/shared_memory.h b/source/kit/shared_memory.h new file mode 100644 index 0000000..5eb4fc4 --- /dev/null +++ b/source/kit/shared_memory.h @@ -0,0 +1,30 @@ +#ifndef KIT_SHARED_MEMORY_H +#define KIT_SHARED_MEMORY_H + +#include "status.h" +#include "allocator.h" +#include "string_ref.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + kit_status_t status; + i64 size; + u8 *bytes; + struct { + u8 internal[512]; + } _state; +} kit_shared_memory_t; + +kit_shared_memory_t kit_shared_memory_create(kit_str_t name, + i64 size); +kit_shared_memory_t kit_shared_memory_open(kit_str_t name, i64 size); +kit_status_t kit_shared_memory_close(kit_shared_memory_t *mem); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/source/kit/shared_memory.posix.c b/source/kit/shared_memory.posix.c new file mode 100644 index 0000000..d2c4973 --- /dev/null +++ b/source/kit/shared_memory.posix.c @@ -0,0 +1,144 @@ +#include "shared_memory.h" + +#if !defined(_WIN32) || defined(__CYGWIN__) +# include +# include +# include +# include +# include +# include +# include +# include + +typedef struct { + i8 owned; + i32 fd; + char name[NAME_MAX + 1]; +} kit_shared_memory_internal_t; + +kit_shared_memory_t kit_shared_memory_create(kit_str_t name, + i64 size) { + kit_shared_memory_t mem; + kit_shared_memory_internal_t *internal = + (kit_shared_memory_internal_t *) &mem._state; + + memset(&mem, 0, sizeof mem); + + assert(name.size + 1 <= NAME_MAX); + if (name.size + 1 > NAME_MAX) { + mem.status = KIT_ERROR_NAME_TOO_LONG; + return mem; + } + + for (i64 i = 0; i < name.size; i++) + if (name.values[i] == '/') { + mem.status = KIT_ERROR_INVALID_NAME; + return mem; + } + + internal->name[0] = '/'; + memcpy(internal->name + 1, name.values, name.size); + internal->name[1 + name.size] = '\0'; + + i32 fd = shm_open(internal->name, O_RDWR | O_CREAT | O_EXCL, 0660); + + assert(fd != -1); + if (fd == -1) { + mem.status = KIT_ERROR_SHM_OPEN_FAILED; + return mem; + } + + if (ftruncate(fd, size) == -1) { + shm_unlink(internal->name); + assert(0); + mem.status = KIT_ERROR_FTRUNCATE_FAILED; + return mem; + } + + void *p = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, + 0); + + if (p == MAP_FAILED) { + shm_unlink(internal->name); + assert(0); + mem.status = KIT_ERROR_MMAP_FAILED; + return mem; + } + + internal->owned = 1; + internal->fd = fd; + + mem.status = KIT_OK; + mem.size = size; + mem.bytes = (u8 *) p; + return mem; +} + +kit_shared_memory_t kit_shared_memory_open(kit_str_t name, i64 size) { + kit_shared_memory_t mem; + kit_shared_memory_internal_t *internal = + (kit_shared_memory_internal_t *) &mem._state; + + memset(&mem, 0, sizeof mem); + + assert(name.size + 1 <= NAME_MAX); + if (name.size + 1 > NAME_MAX) { + mem.status = KIT_ERROR_NAME_TOO_LONG; + return mem; + } + + for (i64 i = 0; i < name.size; i++) + if (name.values[i] == '/') { + mem.status = KIT_ERROR_INVALID_NAME; + return mem; + } + + internal->name[0] = '/'; + memcpy(internal->name + 1, name.values, name.size); + internal->name[1 + name.size] = '\0'; + + i32 fd = shm_open(internal->name, O_RDWR, 0660); + + assert(fd != -1); + if (fd == -1) { + mem.status = KIT_ERROR_SHM_OPEN_FAILED; + return mem; + } + + void *p = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, + 0); + + if (p == MAP_FAILED) { + assert(0); + mem.status = KIT_ERROR_MMAP_FAILED; + return mem; + } + + internal->owned = 0; + internal->fd = fd; + + mem.status = KIT_OK; + mem.size = size; + mem.bytes = (u8 *) p; + return mem; +} + +kit_status_t kit_shared_memory_close(kit_shared_memory_t *mem) { + assert(mem != NULL); + + if (mem == NULL) + return KIT_ERROR_INVALID_ARGUMENT; + + kit_shared_memory_internal_t *internal = + (kit_shared_memory_internal_t *) &mem->_state; + + kit_status_t status = KIT_OK; + + if (munmap(mem->bytes, mem->size) != 0) + status |= KIT_ERROR_MUNMAP_FAILED; + if (internal->owned && shm_unlink(internal->name) != 0) + status |= KIT_ERROR_UNLINK_FAILED; + + return status; +} +#endif diff --git a/source/kit/shared_memory.win32.c b/source/kit/shared_memory.win32.c new file mode 100644 index 0000000..f23d3a8 --- /dev/null +++ b/source/kit/shared_memory.win32.c @@ -0,0 +1,4 @@ +#include "shared_memory.h" + +#if defined(_WIN32) && !defined(__CYGWIN__) +#endif diff --git a/source/kit/shared_mutex.h b/source/kit/shared_mutex.h new file mode 100644 index 0000000..13aae1b --- /dev/null +++ b/source/kit/shared_mutex.h @@ -0,0 +1,150 @@ +#ifndef KIT_SHARED_MUTEX_H +#define KIT_SHARED_MUTEX_H + +#include "atomic.h" +#include "threads.h" + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#if defined(__GNUC__) || defined(__clang__) +# pragma GCC diagnostic push +# pragma GCC diagnostic ignored "-Wunused-function" +# pragma GCC diagnostic ignored "-Wunknown-pragmas" +# pragma GCC push_options +# pragma GCC optimize("O3") +#endif + +enum { + KIT_SHARED_MUTEX_READY, + KIT_SHARED_MUTEX_LOCKED, + KIT_SHARED_MUTEX_WRITER +}; + +typedef struct { + i8 _Atomic state; + i32 readers; +} kit_shared_mutex_t; + +static void kit_shared_mutex_init(kit_shared_mutex_t *m) { + assert(m != NULL); + + atomic_store_explicit(&m->state, KIT_SHARED_MUTEX_READY, + memory_order_relaxed); + m->readers = 0; +} + +static i8 kit_shared_try_lock(kit_shared_mutex_t *m) { + assert(m != NULL); + + for (;;) { + i8 prev_state = KIT_SHARED_MUTEX_READY; + if (atomic_compare_exchange_strong_explicit( + &m->state, &prev_state, KIT_SHARED_MUTEX_LOCKED, + memory_order_seq_cst, memory_order_seq_cst)) + break; + if (prev_state == KIT_SHARED_MUTEX_WRITER) + return 0; + // FIXME + // Check performance + thrd_yield(); + } + + assert(m->readers >= 0); + m->readers++; + + i8 prev_state = KIT_SHARED_MUTEX_LOCKED; + if (!atomic_compare_exchange_strong_explicit( + &m->state, &prev_state, KIT_SHARED_MUTEX_READY, + memory_order_seq_cst, memory_order_seq_cst)) + assert(0); + + return 0; +} + +static void kit_shared_lock(kit_shared_mutex_t *m) { + assert(m != NULL); + + while (!kit_shared_try_lock(m)) + // FIXME + // Check performance + thrd_yield(); +} + +static void kit_shared_unlock(kit_shared_mutex_t *m) { + assert(m != NULL); + + for (;;) { + i8 prev_state = KIT_SHARED_MUTEX_READY; + if (atomic_compare_exchange_strong_explicit( + &m->state, &prev_state, KIT_SHARED_MUTEX_LOCKED, + memory_order_seq_cst, memory_order_seq_cst)) + break; + // FIXME + // Check performance + thrd_yield(); + } + + assert(m->readers > 0); + m->readers--; + + i8 prev_state = KIT_SHARED_MUTEX_LOCKED; + if (!atomic_compare_exchange_strong_explicit( + &m->state, &prev_state, KIT_SHARED_MUTEX_READY, + memory_order_seq_cst, memory_order_seq_cst)) + assert(0); +} + +static i8 kit_unique_try_lock(kit_shared_mutex_t *m) { + assert(m != NULL); + + i8 prev_state = KIT_SHARED_MUTEX_READY; + if (!atomic_compare_exchange_strong_explicit( + &m->state, &prev_state, KIT_SHARED_MUTEX_WRITER, + memory_order_seq_cst, memory_order_seq_cst)) + return 0; + + i8 is_locked = m->readers == 0; + + prev_state = KIT_SHARED_MUTEX_WRITER; + if (!is_locked && + !atomic_compare_exchange_strong_explicit( + &m->state, &prev_state, KIT_SHARED_MUTEX_READY, + memory_order_seq_cst, memory_order_seq_cst)) + assert(0); + + return is_locked; +} + +static void kit_lock(kit_shared_mutex_t *m) { + assert(m != NULL); + + while (!kit_unique_try_lock(m)) + // FIXME + // Check performance + thrd_yield(); +} + +static void kit_unique_unlock(kit_shared_mutex_t *m) { + assert(m != NULL); + + i8 prev_state = KIT_SHARED_MUTEX_WRITER; + if (!atomic_compare_exchange_strong_explicit( + &m->state, &prev_state, KIT_SHARED_MUTEX_READY, + memory_order_seq_cst, memory_order_seq_cst)) + assert(0); +} + +#if defined(__GNUC__) || defined(__clang__) +# pragma GCC pop_options +# pragma GCC diagnostic pop +#endif + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/source/kit/status.h b/source/kit/status.h index a0f209e..e38a36b 100644 --- a/source/kit/status.h +++ b/source/kit/status.h @@ -5,16 +5,24 @@ enum { KIT_OK, - KIT_ERROR_NOT_IMPLEMENTED, - KIT_ERROR_BAD_ALLOC, - KIT_ERROR_MKDIR_FAILED, - KIT_ERROR_RMDIR_FAILED, - KIT_ERROR_UNLINK_FAILED, - KIT_ERROR_FILE_ALREADY_EXISTS, - KIT_ERROR_FILE_DO_NOT_EXIST, - KIT_ERROR_PATH_TOO_LONG, - KIT_ERROR_SOCKETS_STARTUP_FAILED, - KIT_ERROR_SOCKET_CONTROL_FAILED, + KIT_ERROR_NOT_IMPLEMENTED = (1), + KIT_ERROR_BAD_ALLOC = (1 << 1), + KIT_ERROR_INVALID_ARGUMENT = (1 << 2), + KIT_ERROR_MKDIR_FAILED = (1 << 3), + KIT_ERROR_RMDIR_FAILED = (1 << 4), + KIT_ERROR_UNLINK_FAILED = (1 << 5), + KIT_ERROR_FILE_ALREADY_EXISTS = (1 << 6), + KIT_ERROR_FILE_DO_NOT_EXIST = (1 << 7), + KIT_ERROR_PATH_TOO_LONG = (1 << 8), + KIT_ERROR_SOCKETS_STARTUP_FAILED = (1 << 9), + KIT_ERROR_SOCKET_CONTROL_FAILED = (1 << 10), + KIT_ERROR_NAME_TOO_LONG = (1 << 11), + KIT_ERROR_INVALID_NAME = (1 << 12), + KIT_ERROR_SHM_OPEN_FAILED = (1 << 13), + KIT_ERROR_SHM_UNLINK_FAILED = (1 << 14), + KIT_ERROR_FTRUNCATE_FAILED = (1 << 15), + KIT_ERROR_MMAP_FAILED = (1 << 16), + KIT_ERROR_MUNMAP_FAILED = (1 << 17), }; typedef i32 kit_status_t; diff --git a/source/kit/string_ref.h b/source/kit/string_ref.h index 2ed6f46..89c319b 100644 --- a/source/kit/string_ref.h +++ b/source/kit/string_ref.h @@ -19,8 +19,8 @@ typedef KIT_AR(char) kit_str_t; # pragma GCC optimize("O3") #endif -static kit_str_t kit_str(i64 size, char *static_string) { - kit_str_t s = { .size = size, .values = static_string }; +static kit_str_t kit_str(i64 size, char const *static_string) { + kit_str_t s = { .size = size, .values = (char *) static_string }; return s; } diff --git a/source/tests/test_interprocess.c b/source/tests/test_interprocess.c index df2a375..30a8779 100644 --- a/source/tests/test_interprocess.c +++ b/source/tests/test_interprocess.c @@ -1,90 +1,60 @@ +#include "../kit/shared_memory.h" +#include "../kit/threads.h" + +#include + #ifdef _WIN32 int main() { return 0; } #else -# include "../kit/threads.h" +# define NAME "kit_test_interprocess" -# include -# include -# include -# include -# include -# include - -# define NAME "/kit_test_interprocess" +enum { SIZE = 64 }; enum { STATE_INIT, STATE_READY, STATE_DONE }; -enum { SIZE = 64 }; - int run_writer() { - int f = shm_open(NAME, O_RDWR | O_CREAT, 0660); + kit_shared_memory_t mem = kit_shared_memory_create(SZ(NAME), SIZE); - if (f == -1) { - printf("%s: shm_open failed.\n", __FUNCTION__); - return 1; - } - - if (ftruncate(f, SIZE) == -1) { - printf("%s: ftruncate failed.\n", __FUNCTION__); + if (mem.status != KIT_OK) { + printf("%s: kit_shared_memory_create failed.\n", __FUNCTION__); fflush(stdout); - return 1; - } - - void *p = mmap(NULL, SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, f, - 0); - - if (p == MAP_FAILED) { - printf("%s: mmap failed.\n", __FUNCTION__); - fflush(stdout); - return 1; + return mem.status; } - unsigned char *bytes = (unsigned char *) p; + mem.bytes[0] = STATE_INIT; + for (int i = 1; i < SIZE; i++) mem.bytes[i] = i; + mem.bytes[0] = STATE_READY; - bytes[0] = STATE_INIT; - for (int i = 1; i < SIZE; i++) bytes[i] = i; - bytes[0] = STATE_READY; + while (mem.bytes[0] != STATE_DONE) thrd_yield(); - while (bytes[0] != STATE_DONE) thrd_yield(); - - shm_unlink(NAME); - - return 0; + return kit_shared_memory_close(&mem); } int run_reader() { - int f = -1; - - while (f == -1) { - f = shm_open(NAME, O_RDWR, 0); - thrd_yield(); - } - - void *p = mmap(NULL, SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, f, - 0); + kit_shared_memory_t mem = kit_shared_memory_open(SZ(NAME), SIZE); - if (p == MAP_FAILED) { - printf("%s: mmap failed.\n", __FUNCTION__); + if (mem.status != KIT_OK) { + printf("%s: kit_shared_memory_open failed.\n", __FUNCTION__); fflush(stdout); - return 1; + return mem.status; } - unsigned char *bytes = (unsigned char *) p; - - while (bytes[0] != STATE_READY) thrd_yield(); + while (mem.bytes[0] != STATE_READY) thrd_yield(); - int status = 0; + i32 status = 0; - for (int i = 1; i < SIZE; i++) - if (bytes[i] != i) { + for (i32 i = 1; i < SIZE; i++) + if (mem.bytes[i] != i) { printf("%s: wrong byte %d\n", __FUNCTION__, i); fflush(stdout); status = 1; } - bytes[0] = STATE_DONE; + mem.bytes[0] = STATE_DONE; + + status |= kit_shared_memory_close(&mem); return status; } @@ -95,30 +65,29 @@ int main(int argc, char **argv) { return 1; } - int status = 0; + if (strcmp(argv[1], "writer") == 0) { + struct timespec t0; + timespec_get(&t0, TIME_UTC); - struct timespec t0; - timespec_get(&t0, TIME_UTC); + i32 status = run_writer(); - if (strcmp(argv[1], "writer") == 0) - status = run_writer(); - else if (strcmp(argv[1], "reader") == 0) - status = run_reader(); - else { - printf("Invalid command line argument \"%s\"\n", argv[1]); - return 1; - } + struct timespec t1; + timespec_get(&t1, TIME_UTC); - struct timespec t1; - timespec_get(&t1, TIME_UTC); + i64 sec = t1.tv_sec - t0.tv_sec; + i64 nsec = t1.tv_nsec - t0.tv_nsec; - long long sec = t1.tv_sec - t0.tv_sec; - long long nsec = t1.tv_nsec - t0.tv_nsec; + printf("Done in %.2lf msec\n", + (sec * 1000000000 + nsec) * 0.000001); + fflush(stdout); - printf("%s: done in %.2lf msec\n", argv[1], - (sec * 1000000000 + nsec) * 0.000001); - fflush(stdout); + return status; + } - return status; + if (strcmp(argv[1], "reader") == 0) + return run_reader(); + + printf("Invalid command line argument \"%s\"\n", argv[1]); + return 1; } #endif -- cgit v1.2.3