* Please read Documentation/workqueue.txt for details.
*/
-#include <linux/module.h>
+#include <linux/export.h>
#include <linux/kernel.h>
#include <linux/sched.h>
#include <linux/init.h>
MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */
IDLE_WORKER_TIMEOUT = 300 * HZ, /* keep idle ones for 5 mins */
- MAYDAY_INITIAL_TIMEOUT = HZ / 100, /* call for help after 10ms */
+ MAYDAY_INITIAL_TIMEOUT = HZ / 100 >= 2 ? HZ / 100 : 2,
+ /* call for help after 10ms
+ (min two ticks) */
MAYDAY_INTERVAL = HZ / 10, /* and then every 100ms */
CREATE_COOLDOWN = HZ, /* time to breath after fail */
TRUSTEE_COOLDOWN = HZ / 10, /* for trustee draining */
* per-CPU workqueues:
*/
struct workqueue_struct {
- unsigned int flags; /* I: WQ_* flags */
+ unsigned int flags; /* W: WQ_* flags */
union {
struct cpu_workqueue_struct __percpu *pcpu;
struct cpu_workqueue_struct *single;
mayday_mask_t mayday_mask; /* cpus requesting rescue */
struct worker *rescuer; /* I: rescue worker */
+ int nr_drainers; /* W: drain in progress */
int saved_max_active; /* W: saved cwq max_active */
- const char *name; /* I: workqueue name */
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
+ char name[]; /* I: workqueue name */
};
struct workqueue_struct *system_wq __read_mostly;
struct workqueue_struct *system_long_wq __read_mostly;
struct workqueue_struct *system_nrt_wq __read_mostly;
struct workqueue_struct *system_unbound_wq __read_mostly;
+struct workqueue_struct *system_freezable_wq __read_mostly;
+struct workqueue_struct *system_nrt_freezable_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_wq);
EXPORT_SYMBOL_GPL(system_long_wq);
EXPORT_SYMBOL_GPL(system_nrt_wq);
EXPORT_SYMBOL_GPL(system_unbound_wq);
+EXPORT_SYMBOL_GPL(system_freezable_wq);
+EXPORT_SYMBOL_GPL(system_nrt_freezable_wq);
#define CREATE_TRACE_POINTS
#include <trace/events/workqueue.h>
(cpu) < WORK_CPU_NONE; \
(cpu) = __next_wq_cpu((cpu), cpu_possible_mask, (wq)))
-#ifdef CONFIG_LOCKDEP
-/**
- * in_workqueue_context() - in context of specified workqueue?
- * @wq: the workqueue of interest
- *
- * Checks lockdep state to see if the current task is executing from
- * within a workqueue item. This function exists only if lockdep is
- * enabled.
- */
-int in_workqueue_context(struct workqueue_struct *wq)
-{
- return lock_is_held(&wq->lockdep_map);
-}
-#endif
-
#ifdef CONFIG_DEBUG_OBJECTS_WORK
static struct debug_obj_descr work_debug_descr;
+static void *work_debug_hint(void *addr)
+{
+ return ((struct work_struct *) addr)->func;
+}
+
/*
* fixup_init is called when:
* - an active object is initialized
static struct debug_obj_descr work_debug_descr = {
.name = "work_struct",
+ .debug_hint = work_debug_hint,
.fixup_init = work_fixup_init,
.fixup_activate = work_fixup_activate,
.fixup_free = work_fixup_free,
struct workqueue_struct *wq)
{
if (!(wq->flags & WQ_UNBOUND)) {
- if (likely(cpu < nr_cpu_ids)) {
-#ifdef CONFIG_SMP
+ if (likely(cpu < nr_cpu_ids))
return per_cpu_ptr(wq->cpu_wq.pcpu, cpu);
-#else
- return wq->cpu_wq.single;
-#endif
- }
} else if (likely(cpu == WORK_CPU_UNBOUND))
return wq->cpu_wq.single;
return NULL;
{
atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
- return !list_empty(&gcwq->worklist) && atomic_read(nr_running) <= 1;
+ return !list_empty(&gcwq->worklist) &&
+ (atomic_read(nr_running) <= 1 ||
+ gcwq->flags & GCWQ_HIGHPRI_PENDING);
}
/* Do we need a new worker? Called from manager. */
{
struct worker *worker = kthread_data(task);
- if (likely(!(worker->flags & WORKER_NOT_RUNNING)))
+ if (!(worker->flags & WORKER_NOT_RUNNING))
atomic_inc(get_gcwq_nr_running(cpu));
}
struct global_cwq *gcwq = get_gcwq(cpu);
atomic_t *nr_running = get_gcwq_nr_running(cpu);
- if (unlikely(worker->flags & WORKER_NOT_RUNNING))
+ if (worker->flags & WORKER_NOT_RUNNING)
return NULL;
/* this can only happen on the local cpu */
worker->flags &= ~flags;
- /* if transitioning out of NOT_RUNNING, increment nr_running */
+ /*
+ * If transitioning out of NOT_RUNNING, increment nr_running. Note
+ * that the nested NOT_RUNNING is not a noop. NOT_RUNNING is mask
+ * of multiple flags, not a single flag.
+ */
if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
if (!(worker->flags & WORKER_NOT_RUNNING))
atomic_inc(get_gcwq_nr_running(gcwq->cpu));
wake_up_worker(gcwq);
}
+/*
+ * Test whether @work is being queued from another work executing on the
+ * same workqueue. This is rather expensive and should only be used from
+ * cold paths.
+ */
+static bool is_chained_work(struct workqueue_struct *wq)
+{
+ unsigned long flags;
+ unsigned int cpu;
+
+ for_each_gcwq_cpu(cpu) {
+ struct global_cwq *gcwq = get_gcwq(cpu);
+ struct worker *worker;
+ struct hlist_node *pos;
+ int i;
+
+ spin_lock_irqsave(&gcwq->lock, flags);
+ for_each_busy_worker(worker, i, pos, gcwq) {
+ if (worker->task != current)
+ continue;
+ spin_unlock_irqrestore(&gcwq->lock, flags);
+ /*
+ * I'm @worker, no locking necessary. See if @work
+ * is headed to the same workqueue.
+ */
+ return worker->current_cwq->wq == wq;
+ }
+ spin_unlock_irqrestore(&gcwq->lock, flags);
+ }
+ return false;
+}
+
static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
debug_work_activate(work);
- if (WARN_ON_ONCE(wq->flags & WQ_DYING))
+ /* if dying, only works from the same workqueue are allowed */
+ if (unlikely(wq->flags & WQ_DRAINING) &&
+ WARN_ON_ONCE(!is_chained_work(wq)))
return;
/* determine gcwq to use */
/* gcwq determined, get cwq and queue */
cwq = get_cwq(gcwq->cpu, wq);
+ trace_workqueue_queue_work(cpu, cwq, work);
BUG_ON(!list_empty(&work->entry));
work_flags = work_color_to_flags(cwq->work_color);
if (likely(cwq->nr_active < cwq->max_active)) {
+ trace_workqueue_activate_work(work);
cwq->nr_active++;
worklist = gcwq_determine_ins_pos(gcwq, cwq);
} else {
return true;
spin_unlock_irq(&gcwq->lock);
- /* CPU has come up inbetween, retry migration */
+ /*
+ * We've raced with CPU hot[un]plug. Give it a breather
+ * and retry migration. cond_resched() is required here;
+ * otherwise, we might deadlock against cpu_stop trying to
+ * bring down the CPU on non-preemptive kernel.
+ */
cpu_relax();
+ cond_resched();
}
}
worker->id = id;
if (!on_unbound_cpu)
- worker->task = kthread_create(worker_thread, worker,
- "kworker/%u:%d", gcwq->cpu, id);
+ worker->task = kthread_create_on_node(worker_thread,
+ worker,
+ cpu_to_node(gcwq->cpu),
+ "kworker/%u:%d", gcwq->cpu, id);
else
worker->task = kthread_create(worker_thread, worker,
"kworker/u:%d", id);
struct work_struct, entry);
struct list_head *pos = gcwq_determine_ins_pos(cwq->gcwq, cwq);
+ trace_workqueue_activate_work(work);
move_linked_works(work, pos, NULL);
__clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
cwq->nr_active++;
spin_unlock_irq(&gcwq->lock);
work_clear_pending(work);
- lock_map_acquire(&cwq->wq->lockdep_map);
+ lock_map_acquire_read(&cwq->wq->lockdep_map);
lock_map_acquire(&lockdep_map);
trace_workqueue_execute_start(work);
f(work);
move_linked_works(work, scheduled, &n);
process_scheduled_works(rescuer);
+
+ /*
+ * Leave this gcwq. If keep_working() is %true, notify a
+ * regular worker; otherwise, we end up with 0 concurrency
+ * and stalling the execution.
+ */
+ if (keep_working(gcwq))
+ wake_up_worker(gcwq);
+
spin_unlock_irq(&gcwq->lock);
}
* checks and call back into the fixup functions where we
* might deadlock.
*/
- INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
+ INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);
__set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
init_completion(&barr->done);
}
EXPORT_SYMBOL_GPL(flush_workqueue);
+/**
+ * drain_workqueue - drain a workqueue
+ * @wq: workqueue to drain
+ *
+ * Wait until the workqueue becomes empty. While draining is in progress,
+ * only chain queueing is allowed. IOW, only currently pending or running
+ * work items on @wq can queue further work items on it. @wq is flushed
+ * repeatedly until it becomes empty. The number of flushing is detemined
+ * by the depth of chaining and should be relatively short. Whine if it
+ * takes too long.
+ */
+void drain_workqueue(struct workqueue_struct *wq)
+{
+ unsigned int flush_cnt = 0;
+ unsigned int cpu;
+
+ /*
+ * __queue_work() needs to test whether there are drainers, is much
+ * hotter than drain_workqueue() and already looks at @wq->flags.
+ * Use WQ_DRAINING so that queue doesn't have to check nr_drainers.
+ */
+ spin_lock(&workqueue_lock);
+ if (!wq->nr_drainers++)
+ wq->flags |= WQ_DRAINING;
+ spin_unlock(&workqueue_lock);
+reflush:
+ flush_workqueue(wq);
+
+ for_each_cwq_cpu(cpu, wq) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+ bool drained;
+
+ spin_lock_irq(&cwq->gcwq->lock);
+ drained = !cwq->nr_active && list_empty(&cwq->delayed_works);
+ spin_unlock_irq(&cwq->gcwq->lock);
+
+ if (drained)
+ continue;
+
+ if (++flush_cnt == 10 ||
+ (flush_cnt % 100 == 0 && flush_cnt <= 1000))
+ pr_warning("workqueue %s: flush on destruction isn't complete after %u tries\n",
+ wq->name, flush_cnt);
+ goto reflush;
+ }
+
+ spin_lock(&workqueue_lock);
+ if (!--wq->nr_drainers)
+ wq->flags &= ~WQ_DRAINING;
+ spin_unlock(&workqueue_lock);
+}
+EXPORT_SYMBOL_GPL(drain_workqueue);
+
static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
bool wait_executing)
{
insert_wq_barrier(cwq, barr, work, worker);
spin_unlock_irq(&gcwq->lock);
- lock_map_acquire(&cwq->wq->lockdep_map);
+ /*
+ * If @max_active is 1 or rescuer is in use, flushing another work
+ * item on the same workqueue may lead to deadlock. Make sure the
+ * flusher is not running on the same workqueue by verifying write
+ * access.
+ */
+ if (cwq->wq->saved_max_active == 1 || cwq->wq->flags & WQ_RESCUER)
+ lock_map_acquire(&cwq->wq->lockdep_map);
+ else
+ lock_map_acquire_read(&cwq->wq->lockdep_map);
lock_map_release(&cwq->wq->lockdep_map);
+
return true;
already_gone:
spin_unlock_irq(&gcwq->lock);
EXPORT_SYMBOL(schedule_delayed_work_on);
/**
- * schedule_on_each_cpu - call a function on each online CPU from keventd
+ * schedule_on_each_cpu - execute a function synchronously on each online CPU
* @func: the function to call
*
- * Returns zero on success.
- * Returns -ve errno on failure.
- *
+ * schedule_on_each_cpu() executes @func on each online CPU using the
+ * system workqueue and blocks until all CPUs have completed.
* schedule_on_each_cpu() is very slow.
+ *
+ * RETURNS:
+ * 0 on success, -errno on failure.
*/
int schedule_on_each_cpu(work_func_t func)
{
const size_t size = sizeof(struct cpu_workqueue_struct);
const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
__alignof__(unsigned long long));
-#ifdef CONFIG_SMP
- bool percpu = !(wq->flags & WQ_UNBOUND);
-#else
- bool percpu = false;
-#endif
- if (percpu)
+ if (!(wq->flags & WQ_UNBOUND))
wq->cpu_wq.pcpu = __alloc_percpu(size, align);
else {
void *ptr;
static void free_cwqs(struct workqueue_struct *wq)
{
-#ifdef CONFIG_SMP
- bool percpu = !(wq->flags & WQ_UNBOUND);
-#else
- bool percpu = false;
-#endif
-
- if (percpu)
+ if (!(wq->flags & WQ_UNBOUND))
free_percpu(wq->cpu_wq.pcpu);
else if (wq->cpu_wq.single) {
/* the pointer to free is stored right after the cwq */
return clamp_val(max_active, 1, lim);
}
-struct workqueue_struct *__alloc_workqueue_key(const char *name,
+struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
unsigned int flags,
int max_active,
struct lock_class_key *key,
- const char *lock_name)
+ const char *lock_name, ...)
{
+ va_list args, args1;
struct workqueue_struct *wq;
unsigned int cpu;
+ size_t namelen;
+
+ /* determine namelen, allocate wq and format name */
+ va_start(args, lock_name);
+ va_copy(args1, args);
+ namelen = vsnprintf(NULL, 0, fmt, args) + 1;
+
+ wq = kzalloc(sizeof(*wq) + namelen, GFP_KERNEL);
+ if (!wq)
+ goto err;
+
+ vsnprintf(wq->name, namelen, fmt, args1);
+ va_end(args);
+ va_end(args1);
+
+ /*
+ * Workqueues which may be used during memory reclaim should
+ * have a rescuer to guarantee forward progress.
+ */
+ if (flags & WQ_MEM_RECLAIM)
+ flags |= WQ_RESCUER;
/*
* Unbound workqueues aren't concurrency managed and should be
flags |= WQ_HIGHPRI;
max_active = max_active ?: WQ_DFL_ACTIVE;
- max_active = wq_clamp_max_active(max_active, flags, name);
-
- wq = kzalloc(sizeof(*wq), GFP_KERNEL);
- if (!wq)
- goto err;
+ max_active = wq_clamp_max_active(max_active, flags, wq->name);
+ /* init wq */
wq->flags = flags;
wq->saved_max_active = max_active;
mutex_init(&wq->flush_mutex);
INIT_LIST_HEAD(&wq->flusher_queue);
INIT_LIST_HEAD(&wq->flusher_overflow);
- wq->name = name;
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
INIT_LIST_HEAD(&wq->list);
if (!rescuer)
goto err;
- rescuer->task = kthread_create(rescuer_thread, wq, "%s", name);
+ rescuer->task = kthread_create(rescuer_thread, wq, "%s",
+ wq->name);
if (IS_ERR(rescuer->task))
goto err;
*/
spin_lock(&workqueue_lock);
- if (workqueue_freezing && wq->flags & WQ_FREEZEABLE)
+ if (workqueue_freezing && wq->flags & WQ_FREEZABLE)
for_each_cwq_cpu(cpu, wq)
get_cwq(cpu, wq)->max_active = 0;
{
unsigned int cpu;
- wq->flags |= WQ_DYING;
- flush_workqueue(wq);
+ /* drain it before proceeding with destruction */
+ drain_workqueue(wq);
/*
* wq list is used to freeze wq, remove from list after
spin_lock_irq(&gcwq->lock);
- if (!(wq->flags & WQ_FREEZEABLE) ||
+ if (!(wq->flags & WQ_FREEZABLE) ||
!(gcwq->flags & GCWQ_FREEZING))
get_cwq(gcwq->cpu, wq)->max_active = max_active;
* want to get it over with ASAP - spam rescuers, wake up as
* many idlers as necessary and create new ones till the
* worklist is empty. Note that if the gcwq is frozen, there
- * may be frozen works in freezeable cwqs. Don't declare
+ * may be frozen works in freezable cwqs. Don't declare
* completion while frozen.
*/
while (gcwq->nr_workers != gcwq->nr_idle ||
/**
* freeze_workqueues_begin - begin freezing workqueues
*
- * Start freezing workqueues. After this function returns, all
- * freezeable workqueues will queue new works to their frozen_works
- * list instead of gcwq->worklist.
+ * Start freezing workqueues. After this function returns, all freezable
+ * workqueues will queue new works to their frozen_works list instead of
+ * gcwq->worklist.
*
* CONTEXT:
* Grabs and releases workqueue_lock and gcwq->lock's.
list_for_each_entry(wq, &workqueues, list) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
- if (cwq && wq->flags & WQ_FREEZEABLE)
+ if (cwq && wq->flags & WQ_FREEZABLE)
cwq->max_active = 0;
}
}
/**
- * freeze_workqueues_busy - are freezeable workqueues still busy?
+ * freeze_workqueues_busy - are freezable workqueues still busy?
*
* Check whether freezing is complete. This function must be called
* between freeze_workqueues_begin() and thaw_workqueues().
* Grabs and releases workqueue_lock.
*
* RETURNS:
- * %true if some freezeable workqueues are still busy. %false if
- * freezing is complete.
+ * %true if some freezable workqueues are still busy. %false if freezing
+ * is complete.
*/
bool freeze_workqueues_busy(void)
{
list_for_each_entry(wq, &workqueues, list) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
- if (!cwq || !(wq->flags & WQ_FREEZEABLE))
+ if (!cwq || !(wq->flags & WQ_FREEZABLE))
continue;
BUG_ON(cwq->nr_active < 0);
list_for_each_entry(wq, &workqueues, list) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
- if (!cwq || !(wq->flags & WQ_FREEZEABLE))
+ if (!cwq || !(wq->flags & WQ_FREEZABLE))
continue;
/* restore max_active and repopulate worklist */
system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
WQ_UNBOUND_MAX_ACTIVE);
- BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq);
+ system_freezable_wq = alloc_workqueue("events_freezable",
+ WQ_FREEZABLE, 0);
+ system_nrt_freezable_wq = alloc_workqueue("events_nrt_freezable",
+ WQ_NON_REENTRANT | WQ_FREEZABLE, 0);
+ BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq ||
+ !system_unbound_wq || !system_freezable_wq ||
+ !system_nrt_freezable_wq);
return 0;
}
early_initcall(init_workqueues);