#include "collect.h" #include "main.h" #include "sql.h" #include "util.h" #include <zstd.h> static void do_gc(sqlite3 *db, State *s) { int64_t cur_size = (int64_t)s->header->raw_size; pthread_mutex_lock(&s->global->storage_consumed_lock); int64_t remain_size = s->global->storage_budget - s->global->storage_consumed - cur_size; int64_t gc_size = 0; if (remain_size <= 0) { gc_size = -remain_size + cur_size * g_gc_rate; if (gc_size >= s->global->storage_consumed) gc_size = s->global->storage_consumed * g_gc_cap; else if (gc_size >= s->global->storage_budget * g_gc_cap) gc_size = s->global->storage_budget * g_gc_cap; } DEBUG("do_gc: gc_size %.2f KB, remain %.2f KB, cur_size, %.2f KB\n", gc_size / 1024.0, remain_size / 1024.0, cur_size / 1024.0); pthread_mutex_unlock(&s->global->storage_consumed_lock); uint32_t gc_count = 0; if (gc_size > 0) { gc_count = db_delete_oldest_bytes(db, gc_size); db_vacuum(db); } int64_t dbsize = check_file_size(s->global->storage_file); pthread_mutex_lock(&s->global->storage_consumed_lock); s->global->storage_consumed = dbsize; pthread_mutex_unlock(&s->global->storage_consumed_lock); if (gc_count) { INFO("gc: storage budget: %.2f MB, storage consumed: %.2f MB, (%.2f " "MB/%d entries) vacuumed", s->global->storage_budget / 1024.0 / 1024.0, s->global->storage_consumed / 1024.0 / 1024.0, gc_size / 1024.0 / 1024.0, gc_count); } else { DEBUG("gc: storage budget: %.2f MB, storage consumed: %.2f MB, skip " "vacuuming", s->global->storage_budget / 1024.0 / 1024.0, s->global->storage_consumed / 1024.0 / 1024.0); } } static int commit_lz4(State *s, void **buf) { /* TODO */ (void)s; (void)buf; return -1; } static int commit_zstd(State *s, void **buf) { size_t const bufsize = ZSTD_compressBound(s->header->raw_size); if (!(*buf = malloc(bufsize))) ERROR("zstd: cannot malloc"); size_t const csize = ZSTD_compress(*buf, bufsize, s->store, s->header->raw_size, 0); if (ZSTD_isError(csize)) { ERROR("zstd: %s \n", ZSTD_getErrorName(csize)); free(*buf); return -1; } s->header->raw_size = csize; return 0; } void *commit(void *targs) { sqlite3 *db = NULL; State *s = (State *)targs; uint32_t size = s->header->raw_size; DEBUG("Committing #%d packets", s->header->nr_entries); db_open(&db, s->global->storage_file); db_create_table(db); void *buf = NULL; switch (s->global->compression_type) { case COMPRESS_NONE: break; case COMPRESS_LZ4: commit_lz4(s, &buf); break; case COMPRESS_ZSTD: commit_zstd(s, &buf); break; default: FATAL("Unknown compression option detected"); } do_gc(db, s); db_insert(db, s->header, buf ? buf : s->store); db_close(db); DEBUG("Committed #%d packets, compressed size: %u/%u", s->header->nr_entries, s->header->raw_size, size); if (buf) free(buf); state_free(s); return NULL; }