aboutsummaryrefslogtreecommitdiffstats
path: root/lib/collect.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/collect.c')
-rw-r--r--lib/collect.c77
1 files changed, 41 insertions, 36 deletions
diff --git a/lib/collect.c b/lib/collect.c
index 4b3dd3e..9bffd11 100644
--- a/lib/collect.c
+++ b/lib/collect.c
@@ -38,7 +38,6 @@
nfl_global_t g;
-static void nfl_init(nfl_state_t *nf);
static void *nfl_start_commit_worker(void *targs);
static void nfl_commit(nfl_state_t *nf);
static void nfl_state_free(nfl_state_t *nf);
@@ -133,42 +132,46 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
return 0;
}
-static void nfl_init(nfl_state_t *nf) {
+void nfl_open_netlink_fd(nfl_nl_t *nl, uint16_t group_id) {
// open nflog
- ERR((nf->nfl_fd = nflog_open()) == NULL, "nflog_open")
+ ERR((nl->fd = nflog_open()) == NULL, "nflog_open")
debug("Opening nflog communication file descriptor");
// monitor IPv4 packets only
- ERR(nflog_bind_pf(nf->nfl_fd, AF_INET) < 0, "nflog_bind_pf");
+ ERR(nflog_bind_pf(nl->fd, AF_INET) < 0, "nflog_bind_pf");
// bind to group
- nf->nfl_group_fd = nflog_bind_group(nf->nfl_fd, nf->global->nfl_group_id);
+ nl->group_fd = nflog_bind_group(nl->fd, group_id);
// If the returned group_fd is NULL, it's likely
// that another process (like ulogd) has already
// bound to the same NFLOD group.
- if(!nf->nfl_group_fd)
- FATAL("Cannot bind to NFLOG group %d, is it used by another process?",
- nf->global->nfl_group_id);
+ if(!nl->group_fd)
+ FATAL("Cannot bind to NFLOG group %d, is it used by another process?", group_id);
- ERR(nflog_set_mode(nf->nfl_group_fd, NFULNL_COPY_PACKET, nfl_recv_size) < 0,
+ ERR(nflog_set_mode(nl->group_fd, NFULNL_COPY_PACKET, nfl_recv_size) < 0,
"Could not set copy mode");
// Batch send 128 packets from kernel to userspace
- ERR(nflog_set_qthresh(nf->nfl_group_fd, NF_NFLOG_QTHRESH),
+ ERR(nflog_set_qthresh(nl->group_fd, NF_NFLOG_QTHRESH),
"Could not set qthresh");
+}
- nflog_callback_register(nf->nfl_group_fd, &handle_packet, nf);
- debug("Registering nflog callback");
-
- memcpy(&g, nf->global, sizeof(nfl_global_t));
+void nfl_close_netlink_fd(nfl_nl_t *nl) {
+ nflog_unbind_group(nl->group_fd);
+ nflog_close(nl->fd);
}
void *nfl_collect_worker(void *targs) {
nfl_state_t *nf = (nfl_state_t *)targs;
- nfl_init(nf);
+ memcpy(&g, nf->global, sizeof(nfl_global_t));
- int fd = nflog_fd(nf->nfl_fd);
+ nflog_callback_register(nf->netlink_fd->group_fd, &handle_packet, nf);
+ debug("Registering nflog callback");
+
+ int fd = nflog_fd(nf->netlink_fd->fd);
debug("Recv worker #%u: main loop starts", nf->header->id);
+
+ // Write start time
time(&nf->header->start_time);
int rv;
@@ -181,23 +184,23 @@ void *nfl_collect_worker(void *targs) {
debug("Recv worker #%u: nflog packet received "
"(len=%u, #entries=%u)",
nf->header->id, rv, nf->header->n_entries);
- nflog_handle_packet(nf->nfl_fd, buf, rv);
+ nflog_handle_packet(nf->netlink_fd->fd, buf, rv);
}
}
- debug("Recv worker #%u: finish recv, received packets: %u", nf->header->id,
+ debug("Recv worker #%u: finished, received packets: %u",
+ nf->header->id,
nf->header->max_n_entries);
// write end time
time(&nf->header->end_time);
- nflog_unbind_group(nf->nfl_group_fd);
- nflog_close(nf->nfl_fd);
// write checksum
nf->header->cksum = nfl_header_cksum(nf->header);
// spawn commit thread
nfl_commit(nf);
+
pthread_exit(NULL);
}
@@ -215,20 +218,21 @@ static void *nfl_start_commit_worker(void *targs) {
nfl_state_t *nf = (nfl_state_t *)targs;
const char *filename = nfl_get_filename(g.storage_dir, nf->header->id);
debug("Comm worker #%u: thread started.", nf->header->id);
+ bool truncate = true;
sem_wait(g.nfl_commit_queue);
debug("Comm worker #%u: commit started.", nf->header->id);
- nfl_commit_worker(nf->header, nf->store, g.compression_opt, filename);
+ nfl_commit_worker(nf->header, nf->store, g.compression_opt, truncate, filename);
debug("Comm worker #%u: commit done.", nf->header->id);
sem_post(g.nfl_commit_queue);
nfl_state_free(nf);
free((char *)filename);
- pthread_mutex_lock(&nf->has_finished_lock);
- nf->has_finished = true;
- pthread_cond_signal(&nf->has_finished_cond);
- pthread_mutex_unlock(&nf->has_finished_lock);
+ pthread_mutex_lock(&nf->has_finished_recv_lock);
+ nf->has_finished_recv = true;
+ pthread_cond_signal(&nf->has_finished_recv_cond);
+ pthread_mutex_unlock(&nf->has_finished_recv_lock);
pthread_exit(NULL);
}
@@ -240,35 +244,36 @@ static void *nfl_start_commit_worker(void *targs) {
void nfl_state_init(nfl_state_t **nf, uint32_t id, uint32_t entries_max,
nfl_global_t *g) {
assert(nf);
+
+ // Check if nf has been allocated
if (unlikely(*nf == NULL)) {
*nf = (nfl_state_t *)malloc(sizeof(nfl_state_t));
(*nf)->global = g;
(*nf)->header = (nfl_header_t *)malloc(sizeof(nfl_header_t));
(*nf)->header->id = id;
- (*nf)->header->n_entries = 0;
(*nf)->header->max_n_entries = entries_max;
(*nf)->header->compression_opt = g->compression_opt;
- (*nf)->has_finished = true;
- pthread_mutex_init(&(*nf)->has_finished_lock, NULL);
- pthread_cond_init(&(*nf)->has_finished_cond, NULL);
+ (*nf)->has_finished_recv = true;
+ pthread_mutex_init(&(*nf)->has_finished_recv_lock, NULL);
+ pthread_cond_init(&(*nf)->has_finished_recv_cond, NULL);
}
// Ensure trunk with same id in previous run has finished to prevent reusing
// a trunk which it's still being used. Furthermore, this hopefully
- // alleviate us
- // from bursty network traffic.
- pthread_mutex_lock(&(*nf)->has_finished_lock);
- while (!(*nf)->has_finished)
- pthread_cond_wait(&(*nf)->has_finished_cond, &(*nf)->has_finished_lock);
- (*nf)->has_finished = false;
- pthread_mutex_unlock(&(*nf)->has_finished_lock);
+ // alleviate us from bursty network traffic.
+ pthread_mutex_lock(&(*nf)->has_finished_recv_lock);
+ while (!(*nf)->has_finished_recv)
+ pthread_cond_wait(&(*nf)->has_finished_recv_cond, &(*nf)->has_finished_recv_lock);
+ (*nf)->has_finished_recv = false;
+ pthread_mutex_unlock(&(*nf)->has_finished_recv_lock);
// Don't use calloc here, as it will cause page fault and
// consume physical memory before we fill the buffer.
// Instead, fill entries with 0 on the fly, to squeeze
// more space for compression.
(*nf)->store = (nfl_entry_t *)malloc(sizeof(nfl_entry_t) * entries_max);
+ (*nf)->header->n_entries = 0;
}
static void nfl_state_free(nfl_state_t *nf) {