diff options
author | Yunchih Chen <yunchih.cat@gmail.com> | 2018-11-23 14:02:21 +0800 |
---|---|---|
committer | Yunchih Chen <yunchih.cat@gmail.com> | 2018-11-23 14:02:21 +0800 |
commit | f7074d5b66ab1872f3735eda1736e65c5cf40bc3 (patch) | |
tree | f8c8b69dbe44fa25800fe639209a925e7f235e8d | |
parent | c5b6d181707cade0785f84bbd2df17f8268f5a4a (diff) | |
download | nfcollect-f7074d5b66ab1872f3735eda1736e65c5cf40bc3.tar.gz nfcollect-f7074d5b66ab1872f3735eda1736e65c5cf40bc3.tar.zst nfcollect-f7074d5b66ab1872f3735eda1736e65c5cf40bc3.zip |
Old unfinished modifications
-rw-r--r-- | bin/nfcollect.c | 33 | ||||
-rw-r--r-- | include/commit.h | 2 | ||||
-rw-r--r-- | include/main.h | 9 | ||||
-rw-r--r-- | lib/collect.c | 6 | ||||
-rw-r--r-- | lib/commit.c | 41 |
5 files changed, 60 insertions, 31 deletions
diff --git a/bin/nfcollect.c b/bin/nfcollect.c index 160a21a..834819f 100644 --- a/bin/nfcollect.c +++ b/bin/nfcollect.c @@ -54,7 +54,7 @@ const char *help_text = " -v --version print version information\n" "\n"; -static uint32_t calculate_starting_trunk(const char *storage_dir); +static void traverse_storage_dir(const char *storage_dir, uint32_t *starting_trunk, uint32_t *storage_size); static nfl_nl_t netlink_fd; static void sig_handler(int signo) { @@ -150,6 +150,10 @@ int main(int argc, char *argv[]) { g.nfl_commit_queue = malloc(sizeof(sem_t)); sem_init(g.nfl_commit_queue, 0, max_commit_worker); + // Calculate storage consumed + pthread_mutex_init(&g.nfl_storage_consumed_lock, NULL); + g.nfl_storage_consumed = 0; + // Set up nflog receiver worker nfl_state_t **trunks = (nfl_state_t **)calloc(trunk_cnt, sizeof(void *)); @@ -158,13 +162,13 @@ int main(int argc, char *argv[]) { info(PACKAGE ": workers started, entries per trunk = %d, #trunks = %d", entries_max, trunk_cnt); + calculate_starting_trunk(storage_dir, &cur_trunk, &g.nfl_storage_consumed); if (truncate_trunks) { cur_trunk = 0; info(PACKAGE ": requested to truncate (overwrite) trunks in %s", storage_dir); } else { - int calculated_trunk = calculate_starting_trunk(storage_dir); - cur_trunk = calculated_trunk < 0 ? 0: NEXT(calculated_trunk, trunk_cnt); + cur_trunk = cur_trunk < 0 ? 0: NEXT(cur_trunk, trunk_cnt); const char *fn = nfl_get_filename(storage_dir, cur_trunk); info(PACKAGE ": will start writing to trunk %s and onward", fn); free((char *)fn); @@ -186,20 +190,24 @@ int main(int argc, char *argv[]) { // We don't actually free trunks or the semaphore at all sem_destroy(g.nfl_commit_queue); nfl_close_netlink_fd(&netlink_fd); - exit(0); + xit(0); + uint32_t start_trunk; } /* - * Need to find a trunk to start with after a restart - * We choose the one with newest modification time. - * If no existing trunk is found, returns -1 + * traverse_storage_dir does 2 things: + * 1. Find starting trunk + * Find the trunk to start with after a restart + * We choose the one with newest modification time. + * If no existing trunk is found, set to -1 + * 2. Sum storage size consumed by adding up stored sizes. */ -static uint32_t calculate_starting_trunk(const char *storage_dir) { +static void traverse_storage_dir(const char *storage_dir, uint32_t *starting_trunk, uint32_t *storage_size) { DIR *dp; struct stat stat; struct dirent *ep; time_t newest = (time_t)0; - uint32_t newest_index = -1; + uint32_t newest_index = -1, _storage_size; int index; char cwd[100]; @@ -215,12 +223,15 @@ static uint32_t calculate_starting_trunk(const char *storage_dir) { ERR(lstat(fn, &stat) < 0, fn); if (difftime(stat.st_mtime, newest) > 0) { newest = stat.st_mtime; - newest_index = (uint32_t)index; + _storage_size = (uint32_t)index; } + + *storage_size += stat.st_size } } closedir(dp); ERR(chdir(cwd) < 0, "chdir"); - return newest_index; + *starting_trunk = newest_index; + *storage_size = _storage_size; } diff --git a/include/commit.h b/include/commit.h index 7013bb1..2e16571 100644 --- a/include/commit.h +++ b/include/commit.h @@ -3,7 +3,7 @@ #include "common.h" void nfl_commit_init(); -void nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store, +int nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store, enum nfl_compression_t compression_opt, bool truncate, const char *filename); diff --git a/include/main.h b/include/main.h index 64ed558..417c6bc 100644 --- a/include/main.h +++ b/include/main.h @@ -133,9 +133,18 @@ typedef struct __attribute__((packed)) _nfl_entry_t { /* size: 24, cachelines: 1, members: 8 */ } nfl_entry_t; +typedef struct _store_manager_t { + uint32_t *trunk_size_map; + +} nfl_store_manager_t; + typedef struct _nfl_global_t { sem_t *nfl_commit_queue; uint16_t nfl_group_id; + + uint32_t nfl_storage_consumed; + pthread_mutex_t nfl_storage_consumed_lock; + const char *storage_dir; enum nfl_compression_t compression_opt; } nfl_global_t; diff --git a/lib/collect.c b/lib/collect.c index c69c9a9..bc32a93 100644 --- a/lib/collect.c +++ b/lib/collect.c @@ -221,12 +221,12 @@ static void *nfl_start_commit_worker(void *targs) { nfl_state_t *nf = (nfl_state_t *)targs; const char *filename = nfl_get_filename(g.storage_dir, nf->header->id); debug("Comm worker #%u: thread started.", nf->header->id); - /* FIXME */ + /* truncate ? */ bool truncate = true; sem_wait(g.nfl_commit_queue); debug("Comm worker #%u: commit started.", nf->header->id); - nfl_commit_worker(nf->header, nf->store, g.compression_opt, truncate, filename); + int ret = nfl_commit_worker(nf->header, nf->store, g.compression_opt, truncate, filename); debug("Comm worker #%u: commit done.", nf->header->id); sem_post(g.nfl_commit_queue); @@ -238,7 +238,7 @@ static void *nfl_start_commit_worker(void *targs) { pthread_cond_signal(&nf->has_finished_recv_cond); pthread_mutex_unlock(&nf->has_finished_recv_lock); - pthread_exit(NULL); + pthread_exit(ret); } /* diff --git a/lib/commit.c b/lib/commit.c index 53dc493..22b92ad 100644 --- a/lib/commit.c +++ b/lib/commit.c @@ -3,43 +3,51 @@ #include <string.h> #include <zstd.h> -static void nfl_commit_default(FILE *f, nfl_header_t *header, nfl_entry_t *store, +static int nfl_commit_default(FILE *f, nfl_header_t *header, nfl_entry_t *store, uint32_t store_size) { uint32_t written; header->raw_size = store_size; // Write header written = fwrite(header, 1, sizeof(nfl_header_t), f); - ERR(written != sizeof(nfl_header_t), strerror(errno)); + WARN_RETURN(written != sizeof(nfl_header_t), "commit header: %s", strerror(errno)); // Write store written = fwrite(store, 1, store_size, f); - ERR(written != store_size, strerror(errno)); + WARN_RETURN(written != store_size, "commit store: %s", strerror(errno)); + + return sizeof(nfl_header_t) + store_size; } -static void nfl_commit_lz4(FILE *f, nfl_header_t *header, nfl_entry_t *store, +static int nfl_commit_lz4(FILE *f, nfl_header_t *header, nfl_entry_t *store, uint32_t store_size) { /* TODO */ + return -1; } -static void nfl_commit_zstd(FILE *f, nfl_header_t *header, nfl_entry_t *store, +static int nfl_commit_zstd(FILE *f, nfl_header_t *header, nfl_entry_t *store, uint32_t store_size) { size_t const bufsize = ZSTD_compressBound(store_size); void *buf; - ERR(!(buf = malloc(bufsize)), "zstd: cannot malloc"); + WARN_RETURN(!(buf = malloc(bufsize)), "zstd: cannot malloc"); size_t const csize = ZSTD_compress(buf, bufsize, store, store_size, 1); - if (ZSTD_isError(csize)) - FATAL("zstd: %s \n", ZSTD_getErrorName(csize)); + if (ZSTD_isError(csize)) { + WARN(1, "zstd: %s \n", ZSTD_getErrorName(csize)); + free(buf); + return -1; + } - nfl_commit_default(f, header, buf, csize); + int ret = nfl_commit_default(f, header, buf, csize); free(buf); + return ret; } -void nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store, +int nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store, enum nfl_compression_t compression_opt, bool truncate, const char *filename) { + int ret; FILE *f; const char *mode = truncate ? "wb" : "ab"; @@ -50,16 +58,16 @@ void nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store, uint32_t store_size = sizeof(nfl_entry_t) * header->max_n_entries; switch (compression_opt) { case COMPRESS_NONE: - debug("Comm worker #%u: commit without compression\n", header->id) - nfl_commit_default(f, header, store, store_size); + debug("Comm worker #%u: commit without compression\n", header->id); + ret = nfl_commit_default(f, header, store, store_size); break; case COMPRESS_LZ4: - debug("Comm worker #%u: commit with compression algorithm: lz4", - header->id) nfl_commit_lz4(f, header, store, store_size); + debug("Comm worker #%u: commit with compression algorithm: lz4", header->id); + ret = nfl_commit_lz4(f, header, store, store_size); break; case COMPRESS_ZSTD: - debug("Comm worker #%u: commit with compression algorithm: zstd", - header->id) nfl_commit_zstd(f, header, store, store_size); + debug("Comm worker #%u: commit with compression algorithm: zstd", header->id); + ret = nfl_commit_zstd(f, header, store, store_size); break; // Must not reach here ... default: @@ -68,4 +76,5 @@ void nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store, // Do fsync ? fclose(f); + return ret; } |