workqueue: add approximate upper bound to queue length
[userspace-rcu.git] / urcu / workqueue-fifo.h
index 883f41f3feec4c0869f27c794f44a420ba80fdb5..13d9278a5beca9baa46ea9ef3cc7f3b1981091e0 100644 (file)
@@ -36,6 +36,11 @@ enum urcu_accept_ret {
        URCU_ACCEPT_SHUTDOWN    = 1,
 };
 
+enum urcu_enqueue_ret {
+       URCU_ENQUEUE_OK         = 0,
+       URCU_ENQUEUE_FULL       = 1,
+};
+
 /*
  * We use RCU to steal work from siblings. Therefore, one of RCU flavors
  * need to be included before this header. All worker that participate
@@ -59,6 +64,9 @@ struct urcu_workqueue {
        struct cds_list_head sibling_head;
        pthread_mutex_t sibling_lock;   /* Protect sibling list updates */
 
+       /* Maximum number of work entries (approximate). 0 means infinite. */
+       unsigned long nr_work_max;
+       unsigned long nr_work;          /* Current number of work items */
        bool shutdown;                  /* Shutdown performed */
 };
 
@@ -73,6 +81,7 @@ struct urcu_worker {
        struct urcu_wait_node wait_node;
        /* RCU linked list node of siblings for work stealing. */
        struct cds_list_head sibling_node;
+       struct urcu_workqueue *queue;
        int flags;      /* enum urcu_worker_flags */
 };
 
@@ -81,19 +90,32 @@ enum urcu_worker_flags {
 };
 
 static inline
-void urcu_workqueue_init(struct urcu_workqueue *queue)
+void urcu_workqueue_init(struct urcu_workqueue *queue,
+               unsigned long max_queue_len)
 {
        __cds_wfcq_init(&queue->head, &queue->tail);
        urcu_wait_queue_init(&queue->waitqueue);
        CDS_INIT_LIST_HEAD(&queue->sibling_head);
+       pthread_mutex_init(&queue->sibling_lock, NULL);
+       queue->nr_work_max = max_queue_len;
+       queue->nr_work = 0;
        queue->shutdown = false;
 }
 
 static inline
-void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work)
+enum urcu_enqueue_ret urcu_queue_work(struct urcu_workqueue *queue,
+               struct urcu_work *work)
 {
        bool was_empty;
-
+       unsigned long nr_work_max;
+
+       nr_work_max = queue->nr_work_max;
+       if (nr_work_max) {
+               /* Approximate max queue size. */
+               if (uatomic_read(&queue->nr_work) >= nr_work_max)
+                       return URCU_ENQUEUE_FULL;
+               uatomic_inc(&queue->nr_work);
+       }
        cds_wfcq_node_init(&work->node);
 
        /* Enqueue work. */
@@ -117,6 +139,7 @@ void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work)
                (void) urcu_dequeue_wake_single(&queue->waitqueue);
                rcu_read_unlock();      /* Protect stack dequeue */
        }
+       return URCU_ENQUEUE_OK;
 }
 
 static inline
@@ -132,12 +155,15 @@ void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
 }
 
 static inline
-void urcu_worker_init(struct urcu_worker *worker, int flags)
+void urcu_worker_init(struct urcu_workqueue *queue,
+               struct urcu_worker *worker, int flags)
 {
        cds_wfcq_init(&worker->head, &worker->tail);
        worker->flags = flags;
        urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
        worker->own = NULL;
+       worker->wait_node.node.next = NULL;
+       worker->queue = queue;
 }
 
 static inline
@@ -204,7 +230,7 @@ bool ___urcu_grab_work(struct urcu_worker *worker,
                bool steal)
 {
        enum cds_wfcq_ret splice_ret;
-       struct cds_wfcq_head tmp_head;
+       struct __cds_wfcq_head tmp_head;
        struct cds_wfcq_tail tmp_tail;
        struct cds_wfcq_node *node;
 
@@ -213,7 +239,7 @@ bool ___urcu_grab_work(struct urcu_worker *worker,
         */
        if (cds_wfcq_empty(src_head, src_tail))
                return false;
-       cds_wfcq_init(&tmp_head, &tmp_tail);
+       __cds_wfcq_init(&tmp_head, &tmp_tail);
 
        /* Ensure that we preserve FIFO work order. */
        assert(!steal || worker->own == NULL);
@@ -254,7 +280,7 @@ got_node:
        }
 
        /* Splice into worker workqueue. */
-       splice_ret = cds_wfcq_splice_blocking(&worker->head,
+       splice_ret = __cds_wfcq_splice_blocking(&worker->head,
                        &worker->tail,
                        &tmp_head,
                        &tmp_tail);
@@ -356,15 +382,16 @@ end:
 }
 
 static inline
-enum urcu_accept_ret urcu_accept_work(struct urcu_workqueue *queue,
-               struct urcu_worker *worker)
+enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker)
 {
+       struct urcu_workqueue *queue = worker->queue;
        enum cds_wfcq_ret wfcq_ret;
        bool has_work;
 
        has_work = ___urcu_grab_work(worker, &queue->head, &queue->tail, 0);
        /* Don't wait if we have work to do. */
-       if (has_work || !cds_wfcq_empty(&worker->head, &worker->tail))
+       if (has_work || worker->own
+                       || !cds_wfcq_empty(&worker->head, &worker->tail))
                goto do_work;
        /* Try to steal work from sibling instead of blocking */
        if (__urcu_steal_work(queue, worker))
@@ -427,15 +454,15 @@ do_work:
 static inline
 struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
 {
+       struct urcu_workqueue *queue = worker->queue;
        struct cds_wfcq_node *node;
+       struct urcu_work *work;
 
        if (worker->own) {
-               struct urcu_work *work;
-
                /* Process our own work entry. */
                work = worker->own;
                worker->own = NULL;
-               return work;
+               goto end;
        }
        /*
         * If we are registered for work stealing, we need to dequeue
@@ -456,7 +483,11 @@ struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
        }
        if (!node)
                return NULL;
-       return caa_container_of(node, struct urcu_work, node);
+       work = caa_container_of(node, struct urcu_work, node);
+end:
+       if (queue->nr_work_max)
+               uatomic_dec(&queue->nr_work);
+       return work;
 }
 
 static inline
This page took 0.024406 seconds and 4 git commands to generate.