-}
-
-int process_blkd_consumer_act(void *priv, int fd, short events)
-{
- int result;
- long consumed_old = 0;
- char *reply;
- struct blocked_consumer *bc = (struct blocked_consumer *) priv;
- char inbuf;
-
- result = read(bc->fd_producer, &inbuf, 1);
- if (result == -1) {
- PERROR("read");
- return -1;
- }
- if (result == 0) {
- int res;
- DBG("listener: got messsage that a buffer ended");
-
- res = close(bc->fd_producer);
- if (res == -1) {
- PERROR("close");
- }
-
- list_del(&bc->list);
-
- result = ustcomm_send_reply(&bc->server, "END", &bc->src);
- if (result < 0) {
- ERR("ustcomm_send_reply failed");
- return -1;
- }
-
- return 0;
- }
-
- result = ust_buffers_get_subbuf(bc->buf, &consumed_old);
- if (result == -EAGAIN) {
- WARN("missed buffer?");
- return 0;
- } else if (result < 0) {
- ERR("ust_buffers_get_subbuf: error: %s", strerror(-result));
- }
- if (asprintf(&reply, "%s %ld", "OK", consumed_old) < 0) {
- ERR("process_blkd_consumer_act : asprintf failed (OK %ld)",
- consumed_old);
- return -1;
- }
- result = ustcomm_send_reply(&bc->server, reply, &bc->src);
- if (result < 0) {
- ERR("ustcomm_send_reply failed");
- free(reply);
- return -1;
- }
- free(reply);
-
- list_del(&bc->list);
-
- return 0;
-}
-
-void blocked_consumers_add_to_mp(struct mpentries *ent)
-{
- struct blocked_consumer *bc;
-
- list_for_each_entry(bc, &blocked_consumers, list) {
- multipoll_add(ent, bc->fd_producer, POLLIN, process_blkd_consumer_act, bc, NULL);
- }
-
-}
-
-void seperate_channel_cpu(const char *channel_and_cpu, char **channel, int *cpu)
-{
- const char *sep;
-
- sep = rindex(channel_and_cpu, '_');
- if (sep == NULL) {
- *cpu = -1;
- sep = channel_and_cpu + strlen(channel_and_cpu);
- } else {
- *cpu = atoi(sep+1);
- }