finish_consuming_dead_subbuffer: cleanup
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 3 Mar 2011 18:17:16 +0000 (13:17 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 3 Mar 2011 18:17:16 +0000 (13:17 -0500)
Use the same iterator as error printing to iterate on the last subbuffers.
Removes a lot of iterator duplication and extra unneeded modulo operations.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
libustconsumer/lowlevel.c

index a65ed09c59a3d453d2b4b35d7252a17b268e74c7..ec1ef05ca4d18dba3c4f07b216187c7cb5a8c235 100644 (file)
@@ -124,59 +124,52 @@ size_t subbuffer_data_size(void *subbuf)
 
 void finish_consuming_dead_subbuffer(struct ustconsumer_callbacks *callbacks, struct buffer_info *buf)
 {
-       struct ust_buffer *ustbuf = buf->bufstruct_mem;
-       long write_offset = uatomic_read(&ustbuf->offset);
-
-       long i_subbuf;
+       struct ust_buffer *ust_buf = buf->bufstruct_mem;
+       unsigned long n_subbufs_order = get_count_order(buf->n_subbufs);
+       unsigned long commit_seq_mask = (~0UL >> n_subbufs_order);
+       unsigned long cons_off;
        int ret;
 
        DBG("processing dead buffer (%s)", buf->name);
-       DBG("consumed offset is %ld (%s)", uatomic_read(&ustbuf->consumed),
-           buf->name);
-       DBG("write offset is %ld (%s)", write_offset, buf->name);
-
-       /* First subbuf that we need to consume now. It is not modulo'd.
-        * Consumed_offset is the next byte to consume.  */
-       long first_subbuf = uatomic_read(&ustbuf->consumed) / buf->subbuf_size;
-       /* Last subbuf that we need to consume now. It is not modulo'd. 
-        * Write_offset is the next place to write so write_offset-1 is the
-        * last place written. */
-       long last_subbuf = (write_offset - 1) / buf->subbuf_size;
-
-       DBG("first_subbuf=%ld", first_subbuf);
-       DBG("last_subbuf=%ld", last_subbuf);
-
-       if(last_subbuf - first_subbuf >= buf->n_subbufs) {
-               DBG("an overflow has occurred, nothing can be recovered");
-               return;
-       }
+       DBG("consumed offset is %ld (%s)", uatomic_read(&ust_buf->consumed),
+                                          buf->name);
+       DBG("write offset is %ld (%s)", uatomic_read(&ust_buf->offset),
+                                       buf->name);
 
-       /* Iterate on subbuffers to recover. */
-       for(i_subbuf = first_subbuf % buf->n_subbufs; ; i_subbuf++, i_subbuf %= buf->n_subbufs) {
-               /* commit_seq is the offset in the buffer of the end of the last sequential commit.
-                * Bytes beyond this limit cannot be recovered. This is a free-running counter. */
-               long commit_seq = uatomic_read(&ustbuf->commit_seq[i_subbuf]);
-
-               unsigned long valid_length = buf->subbuf_size;
-               long n_subbufs_order = get_count_order(buf->n_subbufs);
-               long commit_seq_mask = (~0UL >> n_subbufs_order);
-
-               struct ltt_subbuffer_header *header = (struct ltt_subbuffer_header *)((char *)buf->mem+i_subbuf*buf->subbuf_size);
+       /*
+        * Iterate on subbuffers to recover, including the one the writer
+        * just wrote data into. Using write position - 1 since the writer
+        * position points into the position that is going to be written.
+        */
+       for (cons_off = uatomic_read(&ust_buf->consumed);
+                       (long) (SUBBUF_TRUNC(uatomic_read(&ust_buf->offset) - 1, buf)
+                               - cons_off) >= 0;
+                       cons_off = SUBBUF_ALIGN(cons_off, buf)) {
+               /*
+                * commit_seq is the offset in the buffer of the end of the last sequential commit.
+                * Bytes beyond this limit cannot be recovered. This is a free-running counter.
+                */
+               unsigned long commit_seq =
+                       uatomic_read(&ust_buf->commit_seq[SUBBUF_INDEX(cons_off, buf)]);
+               struct ltt_subbuffer_header *header =
+                       (struct ltt_subbuffer_header *)((char *) buf->mem
+                               + SUBBUF_INDEX(cons_off, buf) * buf->subbuf_size);
+               unsigned long valid_length;
 
                /* Check if subbuf was fully written. This is from Mathieu's algorithm/paper. */
                if (((commit_seq - buf->subbuf_size) & commit_seq_mask)
-                   - (USTD_BUFFER_TRUNC(uatomic_read(&ustbuf->consumed), buf) >> n_subbufs_order) == 0
+                   - (USTD_BUFFER_TRUNC(uatomic_read(&ust_buf->consumed), buf) >> n_subbufs_order) == 0
                     && header->data_size != 0xffffffff) {
                        assert(header->sb_size != 0xffffffff);
                        /*
-                        * If it was, we only check the data_size. This is the
-                        * amount of valid data at the beginning of the
-                        * subbuffer.
+                        * If it was fully written, we only check the data_size.
+                        * This is the amount of valid data at the beginning of
+                        * the subbuffer.
                         */
                        valid_length = header->data_size;
-                       DBG("writing full subbuffer (%ld) with valid_length = %ld", i_subbuf, valid_length);
-               }
-               else {
+                       DBG("writing full subbuffer (%ld) with valid_length = %ld",
+                           SUBBUF_INDEX(cons_off, buf), valid_length);
+               else {
                        /*
                         * If the subbuffer was not fully written, then we don't
                         * check data_size because it hasn't been written yet.
@@ -192,25 +185,23 @@ void finish_consuming_dead_subbuffer(struct ustconsumer_callbacks *callbacks, st
                         * writing data_size that commit_seq is updated to
                         * include the end-of-buffer padding.
                         */
-                       valid_length = commit_seq & (buf->subbuf_size-1);
-                       DBG("writing unfull subbuffer (%ld) with valid_length = %ld", i_subbuf, valid_length);
+                       valid_length = commit_seq & (buf->subbuf_size - 1);
+                       DBG("writing unfull subbuffer (%ld) with valid_length = %ld",
+                           SUBBUF_INDEX(cons_off, buf), valid_length);
                        header->data_size = valid_length;
                        header->sb_size = PAGE_ALIGN(valid_length);
                }
 
                if (callbacks->on_read_partial_subbuffer) {
-                       ret = callbacks->on_read_partial_subbuffer(callbacks, buf, i_subbuf, valid_length);
-                       /* Increment the consumed offset */
-                       if (ret >= 0)
-                               uatomic_add(&ustbuf->consumed, buf->subbuf_size);
-                       else
+                       ret = callbacks->on_read_partial_subbuffer(callbacks, buf,
+                                                                  SUBBUF_INDEX(cons_off, buf),
+                                                                  valid_length);
+                       if (ret < 0)
                                break;  /* Error happened */
-               } else
-                       uatomic_add(&ustbuf->consumed, buf->subbuf_size);
-               if(i_subbuf == last_subbuf % buf->n_subbufs)
-                       break;
+               }
        }
-
+       /* Increment the consumed offset */
+       uatomic_set(&ust_buf->consumed, cons_off);
        ltt_relay_print_buffer_errors(buf, buf->channel_cpu);
 }
 
This page took 0.026136 seconds and 4 git commands to generate.