X-Git-Url: http://git.liburcu.org/?p=userspace-rcu.git;a=blobdiff_plain;f=urcu%2Fworkqueue-fifo.h;h=13d9278a5beca9baa46ea9ef3cc7f3b1981091e0;hp=a2bbd909fdb1ae3923c8743d155752dceb016802;hb=8a2c74fefc366e219bc09c4a4ebd7bacd5cb83e5;hpb=0a14cd14bb8de4385c133f5cec3c4ec06f41ee8b diff --git a/urcu/workqueue-fifo.h b/urcu/workqueue-fifo.h index a2bbd90..13d9278 100644 --- a/urcu/workqueue-fifo.h +++ b/urcu/workqueue-fifo.h @@ -36,6 +36,11 @@ enum urcu_accept_ret { URCU_ACCEPT_SHUTDOWN = 1, }; +enum urcu_enqueue_ret { + URCU_ENQUEUE_OK = 0, + URCU_ENQUEUE_FULL = 1, +}; + /* * We use RCU to steal work from siblings. Therefore, one of RCU flavors * need to be included before this header. All worker that participate @@ -59,6 +64,9 @@ struct urcu_workqueue { struct cds_list_head sibling_head; pthread_mutex_t sibling_lock; /* Protect sibling list updates */ + /* Maximum number of work entries (approximate). 0 means infinite. */ + unsigned long nr_work_max; + unsigned long nr_work; /* Current number of work items */ bool shutdown; /* Shutdown performed */ }; @@ -73,6 +81,7 @@ struct urcu_worker { struct urcu_wait_node wait_node; /* RCU linked list node of siblings for work stealing. */ struct cds_list_head sibling_node; + struct urcu_workqueue *queue; int flags; /* enum urcu_worker_flags */ }; @@ -81,20 +90,32 @@ enum urcu_worker_flags { }; static inline -void urcu_workqueue_init(struct urcu_workqueue *queue) +void urcu_workqueue_init(struct urcu_workqueue *queue, + unsigned long max_queue_len) { __cds_wfcq_init(&queue->head, &queue->tail); urcu_wait_queue_init(&queue->waitqueue); CDS_INIT_LIST_HEAD(&queue->sibling_head); pthread_mutex_init(&queue->sibling_lock, NULL); + queue->nr_work_max = max_queue_len; + queue->nr_work = 0; queue->shutdown = false; } static inline -void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work) +enum urcu_enqueue_ret urcu_queue_work(struct urcu_workqueue *queue, + struct urcu_work *work) { bool was_empty; - + unsigned long nr_work_max; + + nr_work_max = queue->nr_work_max; + if (nr_work_max) { + /* Approximate max queue size. */ + if (uatomic_read(&queue->nr_work) >= nr_work_max) + return URCU_ENQUEUE_FULL; + uatomic_inc(&queue->nr_work); + } cds_wfcq_node_init(&work->node); /* Enqueue work. */ @@ -118,6 +139,7 @@ void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work) (void) urcu_dequeue_wake_single(&queue->waitqueue); rcu_read_unlock(); /* Protect stack dequeue */ } + return URCU_ENQUEUE_OK; } static inline @@ -133,13 +155,15 @@ void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue) } static inline -void urcu_worker_init(struct urcu_worker *worker, int flags) +void urcu_worker_init(struct urcu_workqueue *queue, + struct urcu_worker *worker, int flags) { cds_wfcq_init(&worker->head, &worker->tail); worker->flags = flags; urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING); worker->own = NULL; worker->wait_node.node.next = NULL; + worker->queue = queue; } static inline @@ -358,9 +382,9 @@ end: } static inline -enum urcu_accept_ret urcu_accept_work(struct urcu_workqueue *queue, - struct urcu_worker *worker) +enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker) { + struct urcu_workqueue *queue = worker->queue; enum cds_wfcq_ret wfcq_ret; bool has_work; @@ -430,15 +454,15 @@ do_work: static inline struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker) { + struct urcu_workqueue *queue = worker->queue; struct cds_wfcq_node *node; + struct urcu_work *work; if (worker->own) { - struct urcu_work *work; - /* Process our own work entry. */ work = worker->own; worker->own = NULL; - return work; + goto end; } /* * If we are registered for work stealing, we need to dequeue @@ -459,7 +483,11 @@ struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker) } if (!node) return NULL; - return caa_container_of(node, struct urcu_work, node); + work = caa_container_of(node, struct urcu_work, node); +end: + if (queue->nr_work_max) + uatomic_dec(&queue->nr_work); + return work; } static inline