aboutsummaryrefslogtreecommitdiffstats
path: root/lib/commit.c
diff options
context:
space:
mode:
authorYunchih Chen <yunchih.cat@gmail.com>2018-12-07 15:45:30 +0800
committerYunchih Chen <yunchih.cat@gmail.com>2019-03-07 15:02:17 +0800
commitb655c8e74ad5d29db18660a677784f181f8e7590 (patch)
treea227b77bdb13610871ec1394794cb55c103b2429 /lib/commit.c
parentaf48bcec8be1f4b0cc55ce47bd6eb7c7d977f4d1 (diff)
downloadnfcollect-b655c8e74ad5d29db18660a677784f181f8e7590.tar.gz
nfcollect-b655c8e74ad5d29db18660a677784f181f8e7590.tar.zst
nfcollect-b655c8e74ad5d29db18660a677784f181f8e7590.zip
sqlite3 rewrite
This rewrite intends to simplifies previous design by hosting the storage in sqlite database instead of counting on individual log files.
Diffstat (limited to 'lib/commit.c')
-rw-r--r--lib/commit.c102
1 files changed, 51 insertions, 51 deletions
diff --git a/lib/commit.c b/lib/commit.c
index 22b92ad..45d4d73 100644
--- a/lib/commit.c
+++ b/lib/commit.c
@@ -1,80 +1,80 @@
-#include "commit.h"
-#include <errno.h>
-#include <string.h>
-#include <zstd.h>
-
-static int nfl_commit_default(FILE *f, nfl_header_t *header, nfl_entry_t *store,
- uint32_t store_size) {
- uint32_t written;
- header->raw_size = store_size;
+#include "collect.h"
+#include "main.h"
+#include "sql.h"
- // Write header
- written = fwrite(header, 1, sizeof(nfl_header_t), f);
- WARN_RETURN(written != sizeof(nfl_header_t), "commit header: %s", strerror(errno));
+#include <zstd.h>
- // Write store
- written = fwrite(store, 1, store_size, f);
- WARN_RETURN(written != store_size, "commit store: %s", strerror(errno));
+static void do_gc(sqlite3 *db, State *s) {
+ uint32_t cur_size = s->header->raw_size;
+ pthread_mutex_lock(&s->global->storage_consumed_lock);
+ uint32_t remain_size =
+ s->global->storage_budget - s->global->storage_consumed - cur_size;
+ uint32_t gc_size = -remain_size + cur_size * g_gc_rate;
+ if (gc_size >= s->global->storage_consumed)
+ gc_size = s->global->storage_consumed;
+ pthread_mutex_unlock(&s->global->storage_consumed_lock);
- return sizeof(nfl_header_t) + store_size;
+ if (remain_size <= 0)
+ db_delete_oldest_bytes(db, gc_size);
}
-static int nfl_commit_lz4(FILE *f, nfl_header_t *header, nfl_entry_t *store,
- uint32_t store_size) {
+static int commit_lz4(State *s, void **buf) {
/* TODO */
+ (void)s;
+ (void)buf;
return -1;
}
-static int nfl_commit_zstd(FILE *f, nfl_header_t *header, nfl_entry_t *store,
- uint32_t store_size) {
- size_t const bufsize = ZSTD_compressBound(store_size);
- void *buf;
+static int commit_zstd(State *s, void **buf) {
+ size_t const bufsize = ZSTD_compressBound(s->header->raw_size);
+
+ if (!(*buf = malloc(bufsize)))
+ ERROR("zstd: cannot malloc");
- WARN_RETURN(!(buf = malloc(bufsize)), "zstd: cannot malloc");
- size_t const csize = ZSTD_compress(buf, bufsize, store, store_size, 1);
+ size_t const csize =
+ ZSTD_compress(*buf, bufsize, s->store, s->header->raw_size, 0);
if (ZSTD_isError(csize)) {
- WARN(1, "zstd: %s \n", ZSTD_getErrorName(csize));
- free(buf);
+ ERROR("zstd: %s \n", ZSTD_getErrorName(csize));
+ free(*buf);
return -1;
}
- int ret = nfl_commit_default(f, header, buf, csize);
- free(buf);
- return ret;
+ s->header->raw_size = csize;
+ return 0;
}
-int nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store,
- enum nfl_compression_t compression_opt,
- bool truncate,
- const char *filename) {
- int ret;
- FILE *f;
- const char *mode = truncate ? "wb" : "ab";
+void *commit(void *targs) {
+ sqlite3 *db = NULL;
+ State *s = (State *)targs;
+ uint32_t size = s->header->raw_size;
+ DEBUG("Committing #%d packets", s->header->nr_entries);
- debug("Comm worker #%u: commit to file %s\n", header->id, filename);
- ERR((f = fopen(filename, mode)) == NULL, strerror(errno));
+ db_open(&db, s->global->storage_file);
+ db_create_table(db);
- // commit store
- uint32_t store_size = sizeof(nfl_entry_t) * header->max_n_entries;
- switch (compression_opt) {
+ void *buf = NULL;
+ switch (s->global->compression_type) {
case COMPRESS_NONE:
- debug("Comm worker #%u: commit without compression\n", header->id);
- ret = nfl_commit_default(f, header, store, store_size);
break;
case COMPRESS_LZ4:
- debug("Comm worker #%u: commit with compression algorithm: lz4", header->id);
- ret = nfl_commit_lz4(f, header, store, store_size);
+ commit_lz4(s, &buf);
break;
case COMPRESS_ZSTD:
- debug("Comm worker #%u: commit with compression algorithm: zstd", header->id);
- ret = nfl_commit_zstd(f, header, store, store_size);
+ commit_zstd(s, &buf);
break;
- // Must not reach here ...
default:
FATAL("Unknown compression option detected");
}
- // Do fsync ?
- fclose(f);
- return ret;
+ do_gc(db, s);
+ db_insert(db, s->header, buf ? buf : s->store);
+ db_close(db);
+
+ DEBUG("Committed #%d packets, compressed size: %u/%u",
+ s->header->nr_entries, s->header->raw_size, size);
+ if (buf)
+ free(buf);
+ state_free(s);
+
+ return NULL;
}