From b655c8e74ad5d29db18660a677784f181f8e7590 Mon Sep 17 00:00:00 2001
From: Yunchih Chen <yunchih.cat@gmail.com>
Date: Fri, 7 Dec 2018 15:45:30 +0800
Subject: sqlite3 rewrite

This rewrite intends to simplifies previous design by
hosting the storage in sqlite database instead of counting
on individual log files.
---
 lib/commit.c | 102 +++++++++++++++++++++++++++++------------------------------
 1 file changed, 51 insertions(+), 51 deletions(-)

(limited to 'lib/commit.c')

diff --git a/lib/commit.c b/lib/commit.c
index 22b92ad..45d4d73 100644
--- a/lib/commit.c
+++ b/lib/commit.c
@@ -1,80 +1,80 @@
-#include "commit.h"
-#include <errno.h>
-#include <string.h>
-#include <zstd.h>
-
-static int nfl_commit_default(FILE *f, nfl_header_t *header, nfl_entry_t *store,
-                               uint32_t store_size) {
-    uint32_t written;
-    header->raw_size = store_size;
+#include "collect.h"
+#include "main.h"
+#include "sql.h"
 
-    // Write header
-    written = fwrite(header, 1, sizeof(nfl_header_t), f);
-    WARN_RETURN(written != sizeof(nfl_header_t), "commit header: %s", strerror(errno));
+#include <zstd.h>
 
-    // Write store
-    written = fwrite(store, 1, store_size, f);
-    WARN_RETURN(written != store_size, "commit store: %s", strerror(errno));
+static void do_gc(sqlite3 *db, State *s) {
+    uint32_t cur_size = s->header->raw_size;
+    pthread_mutex_lock(&s->global->storage_consumed_lock);
+    uint32_t remain_size =
+        s->global->storage_budget - s->global->storage_consumed - cur_size;
+    uint32_t gc_size = -remain_size + cur_size * g_gc_rate;
+    if (gc_size >= s->global->storage_consumed)
+        gc_size = s->global->storage_consumed;
+    pthread_mutex_unlock(&s->global->storage_consumed_lock);
 
-    return sizeof(nfl_header_t) + store_size;
+    if (remain_size <= 0)
+        db_delete_oldest_bytes(db, gc_size);
 }
 
-static int nfl_commit_lz4(FILE *f, nfl_header_t *header, nfl_entry_t *store,
-                           uint32_t store_size) {
+static int commit_lz4(State *s, void **buf) {
     /* TODO */
+    (void)s;
+    (void)buf;
     return -1;
 }
 
-static int nfl_commit_zstd(FILE *f, nfl_header_t *header, nfl_entry_t *store,
-                            uint32_t store_size) {
-    size_t const bufsize = ZSTD_compressBound(store_size);
-    void *buf;
+static int commit_zstd(State *s, void **buf) {
+    size_t const bufsize = ZSTD_compressBound(s->header->raw_size);
+
+    if (!(*buf = malloc(bufsize)))
+        ERROR("zstd: cannot malloc");
 
-    WARN_RETURN(!(buf = malloc(bufsize)), "zstd: cannot malloc");
-    size_t const csize = ZSTD_compress(buf, bufsize, store, store_size, 1);
+    size_t const csize =
+        ZSTD_compress(*buf, bufsize, s->store, s->header->raw_size, 0);
     if (ZSTD_isError(csize)) {
-        WARN(1, "zstd: %s \n", ZSTD_getErrorName(csize));
-        free(buf);
+        ERROR("zstd: %s \n", ZSTD_getErrorName(csize));
+        free(*buf);
         return -1;
     }
 
-    int ret = nfl_commit_default(f, header, buf, csize);
-    free(buf);
-    return ret;
+    s->header->raw_size = csize;
+    return 0;
 }
 
-int nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store,
-                       enum nfl_compression_t compression_opt,
-                       bool truncate,
-                       const char *filename) {
-    int ret;
-    FILE *f;
-    const char *mode = truncate ? "wb" : "ab";
+void *commit(void *targs) {
+    sqlite3 *db = NULL;
+    State *s = (State *)targs;
+    uint32_t size = s->header->raw_size;
+    DEBUG("Committing #%d packets", s->header->nr_entries);
 
-    debug("Comm worker #%u: commit to file %s\n", header->id, filename);
-    ERR((f = fopen(filename, mode)) == NULL, strerror(errno));
+    db_open(&db, s->global->storage_file);
+    db_create_table(db);
 
-    // commit store
-    uint32_t store_size = sizeof(nfl_entry_t) * header->max_n_entries;
-    switch (compression_opt) {
+    void *buf = NULL;
+    switch (s->global->compression_type) {
     case COMPRESS_NONE:
-        debug("Comm worker #%u: commit without compression\n", header->id);
-        ret = nfl_commit_default(f, header, store, store_size);
         break;
     case COMPRESS_LZ4:
-        debug("Comm worker #%u: commit with compression algorithm: lz4", header->id);
-        ret = nfl_commit_lz4(f, header, store, store_size);
+        commit_lz4(s, &buf);
         break;
     case COMPRESS_ZSTD:
-        debug("Comm worker #%u: commit with compression algorithm: zstd", header->id);
-        ret = nfl_commit_zstd(f, header, store, store_size);
+        commit_zstd(s, &buf);
         break;
-    // Must not reach here ...
     default:
         FATAL("Unknown compression option detected");
     }
 
-    // Do fsync ?
-    fclose(f);
-    return ret;
+    do_gc(db, s);
+    db_insert(db, s->header, buf ? buf : s->store);
+    db_close(db);
+
+    DEBUG("Committed #%d packets, compressed size: %u/%u",
+          s->header->nr_entries, s->header->raw_size, size);
+    if (buf)
+        free(buf);
+    state_free(s);
+
+    return NULL;
 }
-- 
cgit