diff options
author | Yunchih Chen <yunchih.cat@gmail.com> | 2018-04-28 10:45:41 +0800 |
---|---|---|
committer | Yunchih Chen <yunchih.cat@gmail.com> | 2018-04-28 10:45:41 +0800 |
commit | af4bf7c93f3390a8013de18679f12b336d2a314b (patch) | |
tree | 9ceae4ffd03ec8ea116591fc153e4f79e2b1921f | |
parent | 5300aeef42f0d79dbed5703457620784d8e848a6 (diff) | |
download | nfcollect-af4bf7c93f3390a8013de18679f12b336d2a314b.tar.gz nfcollect-af4bf7c93f3390a8013de18679f12b336d2a314b.tar.zst nfcollect-af4bf7c93f3390a8013de18679f12b336d2a314b.zip |
Move netlink socket initialization into separate function
-rw-r--r-- | bin/nfcollect.c | 13 | ||||
-rw-r--r-- | include/collect.h | 2 | ||||
-rw-r--r-- | include/main.h | 14 | ||||
-rw-r--r-- | lib/collect.c | 77 |
4 files changed, 61 insertions, 45 deletions
diff --git a/bin/nfcollect.c b/bin/nfcollect.c index e168595..f59d68f 100644 --- a/bin/nfcollect.c +++ b/bin/nfcollect.c @@ -54,10 +54,13 @@ const char *help_text = "\n"; static uint32_t calculate_starting_trunk(const char *storage_dir); +static nfl_nl_t netlink_fd; static void sig_handler(int signo) { - if (signo == SIGHUP) + if (signo == SIGHUP) { puts("Terminated due to SIGHUP ..."); + nfl_close_netlink_fd(&netlink_fd); + } } int main(int argc, char *argv[]) { @@ -133,7 +136,6 @@ int main(int argc, char *argv[]) { max_commit_worker = max_commit_worker > 0 ? max_commit_worker : 1; } - g.nfl_group_id = nfl_group_id; g.storage_dir = storage_dir; // register signal handler @@ -149,7 +151,6 @@ int main(int argc, char *argv[]) { // Set up nflog receiver worker nfl_state_t **trunks = (nfl_state_t **)calloc(trunk_cnt, sizeof(void *)); - nfl_commit_init(trunk_cnt); info(PACKAGE ": storing in directory '%s', capped by %d MiB", storage_dir, storage_size); @@ -168,9 +169,12 @@ int main(int argc, char *argv[]) { free((char *)fn); } + nfl_open_netlink_fd(&netlink_fd, nfl_group_id); for (;; cur_trunk = NEXT(cur_trunk, trunk_cnt)) { debug("Running receiver worker: id = %d", cur_trunk); nfl_state_init(&(trunks[cur_trunk]), cur_trunk, entries_max, &g); + trunks[cur_trunk]->netlink_fd = &netlink_fd; + pthread_create(&(trunks[cur_trunk]->thread), NULL, nfl_collect_worker, (void *)trunks[cur_trunk]); // wait for current receiver worker @@ -179,7 +183,8 @@ int main(int argc, char *argv[]) { // Won't reach here // We don't actually free trunks or the semaphore at all - // sem_destroy(&nfl_commit_queue); + sem_destroy(g.nfl_commit_queue); + nfl_close_netlink_fd(&netlink_fd); exit(0); } diff --git a/include/collect.h b/include/collect.h index 2f75f4a..1af0506 100644 --- a/include/collect.h +++ b/include/collect.h @@ -5,3 +5,5 @@ void *nfl_collect_worker(void *targs); void nfl_state_init(nfl_state_t **nf, uint32_t id, uint32_t entries_max, nfl_global_t *g); void nfl_state_free(nfl_state_t *nf); +void nfl_open_netlink_fd(nfl_nl_t *nf, uint16_t group_id); +void nfl_close_netlink_fd(nfl_nl_t *nf); diff --git a/include/main.h b/include/main.h index d33d499..cbe0f28 100644 --- a/include/main.h +++ b/include/main.h @@ -134,17 +134,21 @@ typedef struct _nfl_global_t { enum nfl_compression_t compression_opt; } nfl_global_t; +typedef struct _nfl_nl_t { + struct nflog_handle *fd; + struct nflog_g_handle *group_fd; +} nfl_nl_t; + typedef struct _nfl_state_t { nfl_global_t *global; nfl_header_t *header; nfl_entry_t *store; + nfl_nl_t *netlink_fd; - struct nflog_handle *nfl_fd; - struct nflog_g_handle *nfl_group_fd; + bool has_finished_recv; + pthread_cond_t has_finished_recv_cond; + pthread_mutex_t has_finished_recv_lock; - bool has_finished; - pthread_cond_t has_finished_cond; - pthread_mutex_t has_finished_lock; pthread_t thread; } nfl_state_t; diff --git a/lib/collect.c b/lib/collect.c index 4b3dd3e..9bffd11 100644 --- a/lib/collect.c +++ b/lib/collect.c @@ -38,7 +38,6 @@ nfl_global_t g; -static void nfl_init(nfl_state_t *nf); static void *nfl_start_commit_worker(void *targs); static void nfl_commit(nfl_state_t *nf); static void nfl_state_free(nfl_state_t *nf); @@ -133,42 +132,46 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg, return 0; } -static void nfl_init(nfl_state_t *nf) { +void nfl_open_netlink_fd(nfl_nl_t *nl, uint16_t group_id) { // open nflog - ERR((nf->nfl_fd = nflog_open()) == NULL, "nflog_open") + ERR((nl->fd = nflog_open()) == NULL, "nflog_open") debug("Opening nflog communication file descriptor"); // monitor IPv4 packets only - ERR(nflog_bind_pf(nf->nfl_fd, AF_INET) < 0, "nflog_bind_pf"); + ERR(nflog_bind_pf(nl->fd, AF_INET) < 0, "nflog_bind_pf"); // bind to group - nf->nfl_group_fd = nflog_bind_group(nf->nfl_fd, nf->global->nfl_group_id); + nl->group_fd = nflog_bind_group(nl->fd, group_id); // If the returned group_fd is NULL, it's likely // that another process (like ulogd) has already // bound to the same NFLOD group. - if(!nf->nfl_group_fd) - FATAL("Cannot bind to NFLOG group %d, is it used by another process?", - nf->global->nfl_group_id); + if(!nl->group_fd) + FATAL("Cannot bind to NFLOG group %d, is it used by another process?", group_id); - ERR(nflog_set_mode(nf->nfl_group_fd, NFULNL_COPY_PACKET, nfl_recv_size) < 0, + ERR(nflog_set_mode(nl->group_fd, NFULNL_COPY_PACKET, nfl_recv_size) < 0, "Could not set copy mode"); // Batch send 128 packets from kernel to userspace - ERR(nflog_set_qthresh(nf->nfl_group_fd, NF_NFLOG_QTHRESH), + ERR(nflog_set_qthresh(nl->group_fd, NF_NFLOG_QTHRESH), "Could not set qthresh"); +} - nflog_callback_register(nf->nfl_group_fd, &handle_packet, nf); - debug("Registering nflog callback"); - - memcpy(&g, nf->global, sizeof(nfl_global_t)); +void nfl_close_netlink_fd(nfl_nl_t *nl) { + nflog_unbind_group(nl->group_fd); + nflog_close(nl->fd); } void *nfl_collect_worker(void *targs) { nfl_state_t *nf = (nfl_state_t *)targs; - nfl_init(nf); + memcpy(&g, nf->global, sizeof(nfl_global_t)); - int fd = nflog_fd(nf->nfl_fd); + nflog_callback_register(nf->netlink_fd->group_fd, &handle_packet, nf); + debug("Registering nflog callback"); + + int fd = nflog_fd(nf->netlink_fd->fd); debug("Recv worker #%u: main loop starts", nf->header->id); + + // Write start time time(&nf->header->start_time); int rv; @@ -181,23 +184,23 @@ void *nfl_collect_worker(void *targs) { debug("Recv worker #%u: nflog packet received " "(len=%u, #entries=%u)", nf->header->id, rv, nf->header->n_entries); - nflog_handle_packet(nf->nfl_fd, buf, rv); + nflog_handle_packet(nf->netlink_fd->fd, buf, rv); } } - debug("Recv worker #%u: finish recv, received packets: %u", nf->header->id, + debug("Recv worker #%u: finished, received packets: %u", + nf->header->id, nf->header->max_n_entries); // write end time time(&nf->header->end_time); - nflog_unbind_group(nf->nfl_group_fd); - nflog_close(nf->nfl_fd); // write checksum nf->header->cksum = nfl_header_cksum(nf->header); // spawn commit thread nfl_commit(nf); + pthread_exit(NULL); } @@ -215,20 +218,21 @@ static void *nfl_start_commit_worker(void *targs) { nfl_state_t *nf = (nfl_state_t *)targs; const char *filename = nfl_get_filename(g.storage_dir, nf->header->id); debug("Comm worker #%u: thread started.", nf->header->id); + bool truncate = true; sem_wait(g.nfl_commit_queue); debug("Comm worker #%u: commit started.", nf->header->id); - nfl_commit_worker(nf->header, nf->store, g.compression_opt, filename); + nfl_commit_worker(nf->header, nf->store, g.compression_opt, truncate, filename); debug("Comm worker #%u: commit done.", nf->header->id); sem_post(g.nfl_commit_queue); nfl_state_free(nf); free((char *)filename); - pthread_mutex_lock(&nf->has_finished_lock); - nf->has_finished = true; - pthread_cond_signal(&nf->has_finished_cond); - pthread_mutex_unlock(&nf->has_finished_lock); + pthread_mutex_lock(&nf->has_finished_recv_lock); + nf->has_finished_recv = true; + pthread_cond_signal(&nf->has_finished_recv_cond); + pthread_mutex_unlock(&nf->has_finished_recv_lock); pthread_exit(NULL); } @@ -240,35 +244,36 @@ static void *nfl_start_commit_worker(void *targs) { void nfl_state_init(nfl_state_t **nf, uint32_t id, uint32_t entries_max, nfl_global_t *g) { assert(nf); + + // Check if nf has been allocated if (unlikely(*nf == NULL)) { *nf = (nfl_state_t *)malloc(sizeof(nfl_state_t)); (*nf)->global = g; (*nf)->header = (nfl_header_t *)malloc(sizeof(nfl_header_t)); (*nf)->header->id = id; - (*nf)->header->n_entries = 0; (*nf)->header->max_n_entries = entries_max; (*nf)->header->compression_opt = g->compression_opt; - (*nf)->has_finished = true; - pthread_mutex_init(&(*nf)->has_finished_lock, NULL); - pthread_cond_init(&(*nf)->has_finished_cond, NULL); + (*nf)->has_finished_recv = true; + pthread_mutex_init(&(*nf)->has_finished_recv_lock, NULL); + pthread_cond_init(&(*nf)->has_finished_recv_cond, NULL); } // Ensure trunk with same id in previous run has finished to prevent reusing // a trunk which it's still being used. Furthermore, this hopefully - // alleviate us - // from bursty network traffic. - pthread_mutex_lock(&(*nf)->has_finished_lock); - while (!(*nf)->has_finished) - pthread_cond_wait(&(*nf)->has_finished_cond, &(*nf)->has_finished_lock); - (*nf)->has_finished = false; - pthread_mutex_unlock(&(*nf)->has_finished_lock); + // alleviate us from bursty network traffic. + pthread_mutex_lock(&(*nf)->has_finished_recv_lock); + while (!(*nf)->has_finished_recv) + pthread_cond_wait(&(*nf)->has_finished_recv_cond, &(*nf)->has_finished_recv_lock); + (*nf)->has_finished_recv = false; + pthread_mutex_unlock(&(*nf)->has_finished_recv_lock); // Don't use calloc here, as it will cause page fault and // consume physical memory before we fill the buffer. // Instead, fill entries with 0 on the fly, to squeeze // more space for compression. (*nf)->store = (nfl_entry_t *)malloc(sizeof(nfl_entry_t) * entries_max); + (*nf)->header->n_entries = 0; } static void nfl_state_free(nfl_state_t *nf) { |