X-Git-Url: http://git.liburcu.org/?p=ust.git;a=blobdiff_plain;f=libustconsumer%2Flowlevel.c;h=a65ed09c59a3d453d2b4b35d7252a17b268e74c7;hp=ec3cf89cd539e9bca6670b5f17c93df57bb5537f;hb=c74e3560aa89ea4dac242ceae06355f5a1ddc2c9;hpb=f3d96ee020d41cbc252adb71d80fe548fb8b103c diff --git a/libustconsumer/lowlevel.c b/libustconsumer/lowlevel.c index ec3cf89..a65ed09 100644 --- a/libustconsumer/lowlevel.c +++ b/libustconsumer/lowlevel.c @@ -125,20 +125,19 @@ size_t subbuffer_data_size(void *subbuf) void finish_consuming_dead_subbuffer(struct ustconsumer_callbacks *callbacks, struct buffer_info *buf) { struct ust_buffer *ustbuf = buf->bufstruct_mem; - long write_offset = uatomic_read(&ustbuf->offset); - long consumed_offset = uatomic_read(&ustbuf->consumed); long i_subbuf; int ret; DBG("processing dead buffer (%s)", buf->name); - DBG("consumed offset is %ld (%s)", consumed_offset, buf->name); + DBG("consumed offset is %ld (%s)", uatomic_read(&ustbuf->consumed), + buf->name); DBG("write offset is %ld (%s)", write_offset, buf->name); /* First subbuf that we need to consume now. It is not modulo'd. * Consumed_offset is the next byte to consume. */ - long first_subbuf = consumed_offset / buf->subbuf_size; + long first_subbuf = uatomic_read(&ustbuf->consumed) / buf->subbuf_size; /* Last subbuf that we need to consume now. It is not modulo'd. * Write_offset is the next place to write so write_offset-1 is the * last place written. */ @@ -165,36 +164,49 @@ void finish_consuming_dead_subbuffer(struct ustconsumer_callbacks *callbacks, st struct ltt_subbuffer_header *header = (struct ltt_subbuffer_header *)((char *)buf->mem+i_subbuf*buf->subbuf_size); /* Check if subbuf was fully written. This is from Mathieu's algorithm/paper. */ - /* FIXME: not sure data_size = 0xffffffff when the buffer is not full. It might - * take the value of the header size initially */ if (((commit_seq - buf->subbuf_size) & commit_seq_mask) - - (USTD_BUFFER_TRUNC(consumed_offset, buf) >> n_subbufs_order) == 0 - && header->data_size != 0xffffffff && header->sb_size != 0xffffffff) { - /* If it was, we only check the data_size. This is the amount of valid data at - * the beginning of the subbuffer. */ + - (USTD_BUFFER_TRUNC(uatomic_read(&ustbuf->consumed), buf) >> n_subbufs_order) == 0 + && header->data_size != 0xffffffff) { + assert(header->sb_size != 0xffffffff); + /* + * If it was, we only check the data_size. This is the + * amount of valid data at the beginning of the + * subbuffer. + */ valid_length = header->data_size; DBG("writing full subbuffer (%ld) with valid_length = %ld", i_subbuf, valid_length); } else { - /* If the subbuffer was not fully written, then we don't check data_size because - * it hasn't been written yet. Instead we check commit_seq and use it to choose - * a value for data_size. The viewer will need this value when parsing. + /* + * If the subbuffer was not fully written, then we don't + * check data_size because it hasn't been written yet. + * Instead we check commit_seq and use it to choose a + * value for data_size. The viewer will need this value + * when parsing. Generally, this will happen only for + * the last subbuffer. However, if we have threads still + * holding reserved slots in the previous subbuffers, + * which could happen for other subbuffers prior to the + * last one. Note that when data_size is set, the + * commit_seq count is still at a value that shows the + * amount of valid data to read. It's only _after_ + * writing data_size that commit_seq is updated to + * include the end-of-buffer padding. */ - valid_length = commit_seq & (buf->subbuf_size-1); DBG("writing unfull subbuffer (%ld) with valid_length = %ld", i_subbuf, valid_length); header->data_size = valid_length; header->sb_size = PAGE_ALIGN(valid_length); - assert(i_subbuf == (last_subbuf % buf->n_subbufs)); } - if(callbacks->on_read_partial_subbuffer) + if (callbacks->on_read_partial_subbuffer) { ret = callbacks->on_read_partial_subbuffer(callbacks, buf, i_subbuf, valid_length); - - /* Increment the consumed offset */ - if (ret >= 0) + /* Increment the consumed offset */ + if (ret >= 0) + uatomic_add(&ustbuf->consumed, buf->subbuf_size); + else + break; /* Error happened */ + } else uatomic_add(&ustbuf->consumed, buf->subbuf_size); - if(i_subbuf == last_subbuf % buf->n_subbufs) break; }