diff options
author | Yunchih Chen <yunchih.cat@gmail.com> | 2018-12-07 15:45:30 +0800 |
---|---|---|
committer | Yunchih Chen <yunchih.cat@gmail.com> | 2019-03-07 15:02:17 +0800 |
commit | b655c8e74ad5d29db18660a677784f181f8e7590 (patch) | |
tree | a227b77bdb13610871ec1394794cb55c103b2429 /lib/commit.c | |
parent | af48bcec8be1f4b0cc55ce47bd6eb7c7d977f4d1 (diff) | |
download | nfcollect-b655c8e74ad5d29db18660a677784f181f8e7590.tar.gz nfcollect-b655c8e74ad5d29db18660a677784f181f8e7590.tar.zst nfcollect-b655c8e74ad5d29db18660a677784f181f8e7590.zip |
sqlite3 rewrite
This rewrite intends to simplifies previous design by
hosting the storage in sqlite database instead of counting
on individual log files.
Diffstat (limited to 'lib/commit.c')
-rw-r--r-- | lib/commit.c | 102 |
1 files changed, 51 insertions, 51 deletions
diff --git a/lib/commit.c b/lib/commit.c index 22b92ad..45d4d73 100644 --- a/lib/commit.c +++ b/lib/commit.c @@ -1,80 +1,80 @@ -#include "commit.h" -#include <errno.h> -#include <string.h> -#include <zstd.h> - -static int nfl_commit_default(FILE *f, nfl_header_t *header, nfl_entry_t *store, - uint32_t store_size) { - uint32_t written; - header->raw_size = store_size; +#include "collect.h" +#include "main.h" +#include "sql.h" - // Write header - written = fwrite(header, 1, sizeof(nfl_header_t), f); - WARN_RETURN(written != sizeof(nfl_header_t), "commit header: %s", strerror(errno)); +#include <zstd.h> - // Write store - written = fwrite(store, 1, store_size, f); - WARN_RETURN(written != store_size, "commit store: %s", strerror(errno)); +static void do_gc(sqlite3 *db, State *s) { + uint32_t cur_size = s->header->raw_size; + pthread_mutex_lock(&s->global->storage_consumed_lock); + uint32_t remain_size = + s->global->storage_budget - s->global->storage_consumed - cur_size; + uint32_t gc_size = -remain_size + cur_size * g_gc_rate; + if (gc_size >= s->global->storage_consumed) + gc_size = s->global->storage_consumed; + pthread_mutex_unlock(&s->global->storage_consumed_lock); - return sizeof(nfl_header_t) + store_size; + if (remain_size <= 0) + db_delete_oldest_bytes(db, gc_size); } -static int nfl_commit_lz4(FILE *f, nfl_header_t *header, nfl_entry_t *store, - uint32_t store_size) { +static int commit_lz4(State *s, void **buf) { /* TODO */ + (void)s; + (void)buf; return -1; } -static int nfl_commit_zstd(FILE *f, nfl_header_t *header, nfl_entry_t *store, - uint32_t store_size) { - size_t const bufsize = ZSTD_compressBound(store_size); - void *buf; +static int commit_zstd(State *s, void **buf) { + size_t const bufsize = ZSTD_compressBound(s->header->raw_size); + + if (!(*buf = malloc(bufsize))) + ERROR("zstd: cannot malloc"); - WARN_RETURN(!(buf = malloc(bufsize)), "zstd: cannot malloc"); - size_t const csize = ZSTD_compress(buf, bufsize, store, store_size, 1); + size_t const csize = + ZSTD_compress(*buf, bufsize, s->store, s->header->raw_size, 0); if (ZSTD_isError(csize)) { - WARN(1, "zstd: %s \n", ZSTD_getErrorName(csize)); - free(buf); + ERROR("zstd: %s \n", ZSTD_getErrorName(csize)); + free(*buf); return -1; } - int ret = nfl_commit_default(f, header, buf, csize); - free(buf); - return ret; + s->header->raw_size = csize; + return 0; } -int nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store, - enum nfl_compression_t compression_opt, - bool truncate, - const char *filename) { - int ret; - FILE *f; - const char *mode = truncate ? "wb" : "ab"; +void *commit(void *targs) { + sqlite3 *db = NULL; + State *s = (State *)targs; + uint32_t size = s->header->raw_size; + DEBUG("Committing #%d packets", s->header->nr_entries); - debug("Comm worker #%u: commit to file %s\n", header->id, filename); - ERR((f = fopen(filename, mode)) == NULL, strerror(errno)); + db_open(&db, s->global->storage_file); + db_create_table(db); - // commit store - uint32_t store_size = sizeof(nfl_entry_t) * header->max_n_entries; - switch (compression_opt) { + void *buf = NULL; + switch (s->global->compression_type) { case COMPRESS_NONE: - debug("Comm worker #%u: commit without compression\n", header->id); - ret = nfl_commit_default(f, header, store, store_size); break; case COMPRESS_LZ4: - debug("Comm worker #%u: commit with compression algorithm: lz4", header->id); - ret = nfl_commit_lz4(f, header, store, store_size); + commit_lz4(s, &buf); break; case COMPRESS_ZSTD: - debug("Comm worker #%u: commit with compression algorithm: zstd", header->id); - ret = nfl_commit_zstd(f, header, store, store_size); + commit_zstd(s, &buf); break; - // Must not reach here ... default: FATAL("Unknown compression option detected"); } - // Do fsync ? - fclose(f); - return ret; + do_gc(db, s); + db_insert(db, s->header, buf ? buf : s->store); + db_close(db); + + DEBUG("Committed #%d packets, compressed size: %u/%u", + s->header->nr_entries, s->header->raw_size, size); + if (buf) + free(buf); + state_free(s); + + return NULL; } |