diff options
Diffstat (limited to 'lib/collect.c')
-rw-r--r-- | lib/collect.c | 77 |
1 files changed, 41 insertions, 36 deletions
diff --git a/lib/collect.c b/lib/collect.c index 4b3dd3e..9bffd11 100644 --- a/lib/collect.c +++ b/lib/collect.c @@ -38,7 +38,6 @@ nfl_global_t g; -static void nfl_init(nfl_state_t *nf); static void *nfl_start_commit_worker(void *targs); static void nfl_commit(nfl_state_t *nf); static void nfl_state_free(nfl_state_t *nf); @@ -133,42 +132,46 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg, return 0; } -static void nfl_init(nfl_state_t *nf) { +void nfl_open_netlink_fd(nfl_nl_t *nl, uint16_t group_id) { // open nflog - ERR((nf->nfl_fd = nflog_open()) == NULL, "nflog_open") + ERR((nl->fd = nflog_open()) == NULL, "nflog_open") debug("Opening nflog communication file descriptor"); // monitor IPv4 packets only - ERR(nflog_bind_pf(nf->nfl_fd, AF_INET) < 0, "nflog_bind_pf"); + ERR(nflog_bind_pf(nl->fd, AF_INET) < 0, "nflog_bind_pf"); // bind to group - nf->nfl_group_fd = nflog_bind_group(nf->nfl_fd, nf->global->nfl_group_id); + nl->group_fd = nflog_bind_group(nl->fd, group_id); // If the returned group_fd is NULL, it's likely // that another process (like ulogd) has already // bound to the same NFLOD group. - if(!nf->nfl_group_fd) - FATAL("Cannot bind to NFLOG group %d, is it used by another process?", - nf->global->nfl_group_id); + if(!nl->group_fd) + FATAL("Cannot bind to NFLOG group %d, is it used by another process?", group_id); - ERR(nflog_set_mode(nf->nfl_group_fd, NFULNL_COPY_PACKET, nfl_recv_size) < 0, + ERR(nflog_set_mode(nl->group_fd, NFULNL_COPY_PACKET, nfl_recv_size) < 0, "Could not set copy mode"); // Batch send 128 packets from kernel to userspace - ERR(nflog_set_qthresh(nf->nfl_group_fd, NF_NFLOG_QTHRESH), + ERR(nflog_set_qthresh(nl->group_fd, NF_NFLOG_QTHRESH), "Could not set qthresh"); +} - nflog_callback_register(nf->nfl_group_fd, &handle_packet, nf); - debug("Registering nflog callback"); - - memcpy(&g, nf->global, sizeof(nfl_global_t)); +void nfl_close_netlink_fd(nfl_nl_t *nl) { + nflog_unbind_group(nl->group_fd); + nflog_close(nl->fd); } void *nfl_collect_worker(void *targs) { nfl_state_t *nf = (nfl_state_t *)targs; - nfl_init(nf); + memcpy(&g, nf->global, sizeof(nfl_global_t)); - int fd = nflog_fd(nf->nfl_fd); + nflog_callback_register(nf->netlink_fd->group_fd, &handle_packet, nf); + debug("Registering nflog callback"); + + int fd = nflog_fd(nf->netlink_fd->fd); debug("Recv worker #%u: main loop starts", nf->header->id); + + // Write start time time(&nf->header->start_time); int rv; @@ -181,23 +184,23 @@ void *nfl_collect_worker(void *targs) { debug("Recv worker #%u: nflog packet received " "(len=%u, #entries=%u)", nf->header->id, rv, nf->header->n_entries); - nflog_handle_packet(nf->nfl_fd, buf, rv); + nflog_handle_packet(nf->netlink_fd->fd, buf, rv); } } - debug("Recv worker #%u: finish recv, received packets: %u", nf->header->id, + debug("Recv worker #%u: finished, received packets: %u", + nf->header->id, nf->header->max_n_entries); // write end time time(&nf->header->end_time); - nflog_unbind_group(nf->nfl_group_fd); - nflog_close(nf->nfl_fd); // write checksum nf->header->cksum = nfl_header_cksum(nf->header); // spawn commit thread nfl_commit(nf); + pthread_exit(NULL); } @@ -215,20 +218,21 @@ static void *nfl_start_commit_worker(void *targs) { nfl_state_t *nf = (nfl_state_t *)targs; const char *filename = nfl_get_filename(g.storage_dir, nf->header->id); debug("Comm worker #%u: thread started.", nf->header->id); + bool truncate = true; sem_wait(g.nfl_commit_queue); debug("Comm worker #%u: commit started.", nf->header->id); - nfl_commit_worker(nf->header, nf->store, g.compression_opt, filename); + nfl_commit_worker(nf->header, nf->store, g.compression_opt, truncate, filename); debug("Comm worker #%u: commit done.", nf->header->id); sem_post(g.nfl_commit_queue); nfl_state_free(nf); free((char *)filename); - pthread_mutex_lock(&nf->has_finished_lock); - nf->has_finished = true; - pthread_cond_signal(&nf->has_finished_cond); - pthread_mutex_unlock(&nf->has_finished_lock); + pthread_mutex_lock(&nf->has_finished_recv_lock); + nf->has_finished_recv = true; + pthread_cond_signal(&nf->has_finished_recv_cond); + pthread_mutex_unlock(&nf->has_finished_recv_lock); pthread_exit(NULL); } @@ -240,35 +244,36 @@ static void *nfl_start_commit_worker(void *targs) { void nfl_state_init(nfl_state_t **nf, uint32_t id, uint32_t entries_max, nfl_global_t *g) { assert(nf); + + // Check if nf has been allocated if (unlikely(*nf == NULL)) { *nf = (nfl_state_t *)malloc(sizeof(nfl_state_t)); (*nf)->global = g; (*nf)->header = (nfl_header_t *)malloc(sizeof(nfl_header_t)); (*nf)->header->id = id; - (*nf)->header->n_entries = 0; (*nf)->header->max_n_entries = entries_max; (*nf)->header->compression_opt = g->compression_opt; - (*nf)->has_finished = true; - pthread_mutex_init(&(*nf)->has_finished_lock, NULL); - pthread_cond_init(&(*nf)->has_finished_cond, NULL); + (*nf)->has_finished_recv = true; + pthread_mutex_init(&(*nf)->has_finished_recv_lock, NULL); + pthread_cond_init(&(*nf)->has_finished_recv_cond, NULL); } // Ensure trunk with same id in previous run has finished to prevent reusing // a trunk which it's still being used. Furthermore, this hopefully - // alleviate us - // from bursty network traffic. - pthread_mutex_lock(&(*nf)->has_finished_lock); - while (!(*nf)->has_finished) - pthread_cond_wait(&(*nf)->has_finished_cond, &(*nf)->has_finished_lock); - (*nf)->has_finished = false; - pthread_mutex_unlock(&(*nf)->has_finished_lock); + // alleviate us from bursty network traffic. + pthread_mutex_lock(&(*nf)->has_finished_recv_lock); + while (!(*nf)->has_finished_recv) + pthread_cond_wait(&(*nf)->has_finished_recv_cond, &(*nf)->has_finished_recv_lock); + (*nf)->has_finished_recv = false; + pthread_mutex_unlock(&(*nf)->has_finished_recv_lock); // Don't use calloc here, as it will cause page fault and // consume physical memory before we fill the buffer. // Instead, fill entries with 0 on the fly, to squeeze // more space for compression. (*nf)->store = (nfl_entry_t *)malloc(sizeof(nfl_entry_t) * entries_max); + (*nf)->header->n_entries = 0; } static void nfl_state_free(nfl_state_t *nf) { |