workqueue: implement per-cwq active work limit
[linux-flexiantxendom0-natty.git] / kernel / workqueue.c
1 /*
2  * linux/kernel/workqueue.c
3  *
4  * Generic mechanism for defining kernel helper threads for running
5  * arbitrary tasks in process context.
6  *
7  * Started by Ingo Molnar, Copyright (C) 2002
8  *
9  * Derived from the taskqueue/keventd code by:
10  *
11  *   David Woodhouse <dwmw2@infradead.org>
12  *   Andrew Morton
13  *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
14  *   Theodore Ts'o <tytso@mit.edu>
15  *
16  * Made to use alloc_percpu by Christoph Lameter.
17  */
18
19 #include <linux/module.h>
20 #include <linux/kernel.h>
21 #include <linux/sched.h>
22 #include <linux/init.h>
23 #include <linux/signal.h>
24 #include <linux/completion.h>
25 #include <linux/workqueue.h>
26 #include <linux/slab.h>
27 #include <linux/cpu.h>
28 #include <linux/notifier.h>
29 #include <linux/kthread.h>
30 #include <linux/hardirq.h>
31 #include <linux/mempolicy.h>
32 #include <linux/freezer.h>
33 #include <linux/kallsyms.h>
34 #include <linux/debug_locks.h>
35 #include <linux/lockdep.h>
36 #include <linux/idr.h>
37
38 /*
39  * Structure fields follow one of the following exclusion rules.
40  *
41  * I: Set during initialization and read-only afterwards.
42  *
43  * L: cwq->lock protected.  Access with cwq->lock held.
44  *
45  * F: wq->flush_mutex protected.
46  *
47  * W: workqueue_lock protected.
48  */
49
50 struct cpu_workqueue_struct;
51
52 struct worker {
53         struct work_struct      *current_work;  /* L: work being processed */
54         struct list_head        scheduled;      /* L: scheduled works */
55         struct task_struct      *task;          /* I: worker task */
56         struct cpu_workqueue_struct *cwq;       /* I: the associated cwq */
57         int                     id;             /* I: worker id */
58 };
59
60 /*
61  * The per-CPU workqueue (if single thread, we always use the first
62  * possible cpu).  The lower WORK_STRUCT_FLAG_BITS of
63  * work_struct->data are used for flags and thus cwqs need to be
64  * aligned at two's power of the number of flag bits.
65  */
66 struct cpu_workqueue_struct {
67
68         spinlock_t lock;
69
70         struct list_head worklist;
71         wait_queue_head_t more_work;
72         unsigned int            cpu;
73         struct worker           *worker;
74
75         struct workqueue_struct *wq;            /* I: the owning workqueue */
76         int                     work_color;     /* L: current color */
77         int                     flush_color;    /* L: flushing color */
78         int                     nr_in_flight[WORK_NR_COLORS];
79                                                 /* L: nr of in_flight works */
80         int                     nr_active;      /* L: nr of active works */
81         int                     max_active;     /* I: max active works */
82         struct list_head        delayed_works;  /* L: delayed works */
83 };
84
85 /*
86  * Structure used to wait for workqueue flush.
87  */
88 struct wq_flusher {
89         struct list_head        list;           /* F: list of flushers */
90         int                     flush_color;    /* F: flush color waiting for */
91         struct completion       done;           /* flush completion */
92 };
93
94 /*
95  * The externally visible workqueue abstraction is an array of
96  * per-CPU workqueues:
97  */
98 struct workqueue_struct {
99         unsigned int            flags;          /* I: WQ_* flags */
100         struct cpu_workqueue_struct *cpu_wq;    /* I: cwq's */
101         struct list_head        list;           /* W: list of all workqueues */
102
103         struct mutex            flush_mutex;    /* protects wq flushing */
104         int                     work_color;     /* F: current work color */
105         int                     flush_color;    /* F: current flush color */
106         atomic_t                nr_cwqs_to_flush; /* flush in progress */
107         struct wq_flusher       *first_flusher; /* F: first flusher */
108         struct list_head        flusher_queue;  /* F: flush waiters */
109         struct list_head        flusher_overflow; /* F: flush overflow list */
110
111         const char              *name;          /* I: workqueue name */
112 #ifdef CONFIG_LOCKDEP
113         struct lockdep_map      lockdep_map;
114 #endif
115 };
116
117 #ifdef CONFIG_DEBUG_OBJECTS_WORK
118
119 static struct debug_obj_descr work_debug_descr;
120
121 /*
122  * fixup_init is called when:
123  * - an active object is initialized
124  */
125 static int work_fixup_init(void *addr, enum debug_obj_state state)
126 {
127         struct work_struct *work = addr;
128
129         switch (state) {
130         case ODEBUG_STATE_ACTIVE:
131                 cancel_work_sync(work);
132                 debug_object_init(work, &work_debug_descr);
133                 return 1;
134         default:
135                 return 0;
136         }
137 }
138
139 /*
140  * fixup_activate is called when:
141  * - an active object is activated
142  * - an unknown object is activated (might be a statically initialized object)
143  */
144 static int work_fixup_activate(void *addr, enum debug_obj_state state)
145 {
146         struct work_struct *work = addr;
147
148         switch (state) {
149
150         case ODEBUG_STATE_NOTAVAILABLE:
151                 /*
152                  * This is not really a fixup. The work struct was
153                  * statically initialized. We just make sure that it
154                  * is tracked in the object tracker.
155                  */
156                 if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) {
157                         debug_object_init(work, &work_debug_descr);
158                         debug_object_activate(work, &work_debug_descr);
159                         return 0;
160                 }
161                 WARN_ON_ONCE(1);
162                 return 0;
163
164         case ODEBUG_STATE_ACTIVE:
165                 WARN_ON(1);
166
167         default:
168                 return 0;
169         }
170 }
171
172 /*
173  * fixup_free is called when:
174  * - an active object is freed
175  */
176 static int work_fixup_free(void *addr, enum debug_obj_state state)
177 {
178         struct work_struct *work = addr;
179
180         switch (state) {
181         case ODEBUG_STATE_ACTIVE:
182                 cancel_work_sync(work);
183                 debug_object_free(work, &work_debug_descr);
184                 return 1;
185         default:
186                 return 0;
187         }
188 }
189
190 static struct debug_obj_descr work_debug_descr = {
191         .name           = "work_struct",
192         .fixup_init     = work_fixup_init,
193         .fixup_activate = work_fixup_activate,
194         .fixup_free     = work_fixup_free,
195 };
196
197 static inline void debug_work_activate(struct work_struct *work)
198 {
199         debug_object_activate(work, &work_debug_descr);
200 }
201
202 static inline void debug_work_deactivate(struct work_struct *work)
203 {
204         debug_object_deactivate(work, &work_debug_descr);
205 }
206
207 void __init_work(struct work_struct *work, int onstack)
208 {
209         if (onstack)
210                 debug_object_init_on_stack(work, &work_debug_descr);
211         else
212                 debug_object_init(work, &work_debug_descr);
213 }
214 EXPORT_SYMBOL_GPL(__init_work);
215
216 void destroy_work_on_stack(struct work_struct *work)
217 {
218         debug_object_free(work, &work_debug_descr);
219 }
220 EXPORT_SYMBOL_GPL(destroy_work_on_stack);
221
222 #else
223 static inline void debug_work_activate(struct work_struct *work) { }
224 static inline void debug_work_deactivate(struct work_struct *work) { }
225 #endif
226
227 /* Serializes the accesses to the list of workqueues. */
228 static DEFINE_SPINLOCK(workqueue_lock);
229 static LIST_HEAD(workqueues);
230 static DEFINE_PER_CPU(struct ida, worker_ida);
231
232 static int worker_thread(void *__worker);
233
234 static int singlethread_cpu __read_mostly;
235
236 static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
237                                             struct workqueue_struct *wq)
238 {
239         return per_cpu_ptr(wq->cpu_wq, cpu);
240 }
241
242 static struct cpu_workqueue_struct *target_cwq(unsigned int cpu,
243                                                struct workqueue_struct *wq)
244 {
245         if (unlikely(wq->flags & WQ_SINGLE_THREAD))
246                 cpu = singlethread_cpu;
247         return get_cwq(cpu, wq);
248 }
249
250 static unsigned int work_color_to_flags(int color)
251 {
252         return color << WORK_STRUCT_COLOR_SHIFT;
253 }
254
255 static int get_work_color(struct work_struct *work)
256 {
257         return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) &
258                 ((1 << WORK_STRUCT_COLOR_BITS) - 1);
259 }
260
261 static int work_next_color(int color)
262 {
263         return (color + 1) % WORK_NR_COLORS;
264 }
265
266 /*
267  * Set the workqueue on which a work item is to be run
268  * - Must *only* be called if the pending flag is set
269  */
270 static inline void set_wq_data(struct work_struct *work,
271                                struct cpu_workqueue_struct *cwq,
272                                unsigned long extra_flags)
273 {
274         BUG_ON(!work_pending(work));
275
276         atomic_long_set(&work->data, (unsigned long)cwq | work_static(work) |
277                         WORK_STRUCT_PENDING | extra_flags);
278 }
279
280 /*
281  * Clear WORK_STRUCT_PENDING and the workqueue on which it was queued.
282  */
283 static inline void clear_wq_data(struct work_struct *work)
284 {
285         atomic_long_set(&work->data, work_static(work));
286 }
287
288 static inline struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
289 {
290         return (void *)(atomic_long_read(&work->data) &
291                         WORK_STRUCT_WQ_DATA_MASK);
292 }
293
294 /**
295  * insert_work - insert a work into cwq
296  * @cwq: cwq @work belongs to
297  * @work: work to insert
298  * @head: insertion point
299  * @extra_flags: extra WORK_STRUCT_* flags to set
300  *
301  * Insert @work into @cwq after @head.
302  *
303  * CONTEXT:
304  * spin_lock_irq(cwq->lock).
305  */
306 static void insert_work(struct cpu_workqueue_struct *cwq,
307                         struct work_struct *work, struct list_head *head,
308                         unsigned int extra_flags)
309 {
310         /* we own @work, set data and link */
311         set_wq_data(work, cwq, extra_flags);
312
313         /*
314          * Ensure that we get the right work->data if we see the
315          * result of list_add() below, see try_to_grab_pending().
316          */
317         smp_wmb();
318
319         list_add_tail(&work->entry, head);
320         wake_up(&cwq->more_work);
321 }
322
323 static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
324                          struct work_struct *work)
325 {
326         struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq);
327         struct list_head *worklist;
328         unsigned long flags;
329
330         debug_work_activate(work);
331
332         spin_lock_irqsave(&cwq->lock, flags);
333         BUG_ON(!list_empty(&work->entry));
334
335         cwq->nr_in_flight[cwq->work_color]++;
336
337         if (likely(cwq->nr_active < cwq->max_active)) {
338                 cwq->nr_active++;
339                 worklist = &cwq->worklist;
340         } else
341                 worklist = &cwq->delayed_works;
342
343         insert_work(cwq, work, worklist, work_color_to_flags(cwq->work_color));
344
345         spin_unlock_irqrestore(&cwq->lock, flags);
346 }
347
348 /**
349  * queue_work - queue work on a workqueue
350  * @wq: workqueue to use
351  * @work: work to queue
352  *
353  * Returns 0 if @work was already on a queue, non-zero otherwise.
354  *
355  * We queue the work to the CPU on which it was submitted, but if the CPU dies
356  * it can be processed by another CPU.
357  */
358 int queue_work(struct workqueue_struct *wq, struct work_struct *work)
359 {
360         int ret;
361
362         ret = queue_work_on(get_cpu(), wq, work);
363         put_cpu();
364
365         return ret;
366 }
367 EXPORT_SYMBOL_GPL(queue_work);
368
369 /**
370  * queue_work_on - queue work on specific cpu
371  * @cpu: CPU number to execute work on
372  * @wq: workqueue to use
373  * @work: work to queue
374  *
375  * Returns 0 if @work was already on a queue, non-zero otherwise.
376  *
377  * We queue the work to a specific CPU, the caller must ensure it
378  * can't go away.
379  */
380 int
381 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
382 {
383         int ret = 0;
384
385         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
386                 __queue_work(cpu, wq, work);
387                 ret = 1;
388         }
389         return ret;
390 }
391 EXPORT_SYMBOL_GPL(queue_work_on);
392
393 static void delayed_work_timer_fn(unsigned long __data)
394 {
395         struct delayed_work *dwork = (struct delayed_work *)__data;
396         struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
397
398         __queue_work(smp_processor_id(), cwq->wq, &dwork->work);
399 }
400
401 /**
402  * queue_delayed_work - queue work on a workqueue after delay
403  * @wq: workqueue to use
404  * @dwork: delayable work to queue
405  * @delay: number of jiffies to wait before queueing
406  *
407  * Returns 0 if @work was already on a queue, non-zero otherwise.
408  */
409 int queue_delayed_work(struct workqueue_struct *wq,
410                         struct delayed_work *dwork, unsigned long delay)
411 {
412         if (delay == 0)
413                 return queue_work(wq, &dwork->work);
414
415         return queue_delayed_work_on(-1, wq, dwork, delay);
416 }
417 EXPORT_SYMBOL_GPL(queue_delayed_work);
418
419 /**
420  * queue_delayed_work_on - queue work on specific CPU after delay
421  * @cpu: CPU number to execute work on
422  * @wq: workqueue to use
423  * @dwork: work to queue
424  * @delay: number of jiffies to wait before queueing
425  *
426  * Returns 0 if @work was already on a queue, non-zero otherwise.
427  */
428 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
429                         struct delayed_work *dwork, unsigned long delay)
430 {
431         int ret = 0;
432         struct timer_list *timer = &dwork->timer;
433         struct work_struct *work = &dwork->work;
434
435         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
436                 BUG_ON(timer_pending(timer));
437                 BUG_ON(!list_empty(&work->entry));
438
439                 timer_stats_timer_set_start_info(&dwork->timer);
440
441                 /* This stores cwq for the moment, for the timer_fn */
442                 set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0);
443                 timer->expires = jiffies + delay;
444                 timer->data = (unsigned long)dwork;
445                 timer->function = delayed_work_timer_fn;
446
447                 if (unlikely(cpu >= 0))
448                         add_timer_on(timer, cpu);
449                 else
450                         add_timer(timer);
451                 ret = 1;
452         }
453         return ret;
454 }
455 EXPORT_SYMBOL_GPL(queue_delayed_work_on);
456
457 static struct worker *alloc_worker(void)
458 {
459         struct worker *worker;
460
461         worker = kzalloc(sizeof(*worker), GFP_KERNEL);
462         if (worker)
463                 INIT_LIST_HEAD(&worker->scheduled);
464         return worker;
465 }
466
467 /**
468  * create_worker - create a new workqueue worker
469  * @cwq: cwq the new worker will belong to
470  * @bind: whether to set affinity to @cpu or not
471  *
472  * Create a new worker which is bound to @cwq.  The returned worker
473  * can be started by calling start_worker() or destroyed using
474  * destroy_worker().
475  *
476  * CONTEXT:
477  * Might sleep.  Does GFP_KERNEL allocations.
478  *
479  * RETURNS:
480  * Pointer to the newly created worker.
481  */
482 static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
483 {
484         int id = -1;
485         struct worker *worker = NULL;
486
487         spin_lock(&workqueue_lock);
488         while (ida_get_new(&per_cpu(worker_ida, cwq->cpu), &id)) {
489                 spin_unlock(&workqueue_lock);
490                 if (!ida_pre_get(&per_cpu(worker_ida, cwq->cpu), GFP_KERNEL))
491                         goto fail;
492                 spin_lock(&workqueue_lock);
493         }
494         spin_unlock(&workqueue_lock);
495
496         worker = alloc_worker();
497         if (!worker)
498                 goto fail;
499
500         worker->cwq = cwq;
501         worker->id = id;
502
503         worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
504                                       cwq->cpu, id);
505         if (IS_ERR(worker->task))
506                 goto fail;
507
508         if (bind)
509                 kthread_bind(worker->task, cwq->cpu);
510
511         return worker;
512 fail:
513         if (id >= 0) {
514                 spin_lock(&workqueue_lock);
515                 ida_remove(&per_cpu(worker_ida, cwq->cpu), id);
516                 spin_unlock(&workqueue_lock);
517         }
518         kfree(worker);
519         return NULL;
520 }
521
522 /**
523  * start_worker - start a newly created worker
524  * @worker: worker to start
525  *
526  * Start @worker.
527  *
528  * CONTEXT:
529  * spin_lock_irq(cwq->lock).
530  */
531 static void start_worker(struct worker *worker)
532 {
533         wake_up_process(worker->task);
534 }
535
536 /**
537  * destroy_worker - destroy a workqueue worker
538  * @worker: worker to be destroyed
539  *
540  * Destroy @worker.
541  */
542 static void destroy_worker(struct worker *worker)
543 {
544         int cpu = worker->cwq->cpu;
545         int id = worker->id;
546
547         /* sanity check frenzy */
548         BUG_ON(worker->current_work);
549         BUG_ON(!list_empty(&worker->scheduled));
550
551         kthread_stop(worker->task);
552         kfree(worker);
553
554         spin_lock(&workqueue_lock);
555         ida_remove(&per_cpu(worker_ida, cpu), id);
556         spin_unlock(&workqueue_lock);
557 }
558
559 /**
560  * move_linked_works - move linked works to a list
561  * @work: start of series of works to be scheduled
562  * @head: target list to append @work to
563  * @nextp: out paramter for nested worklist walking
564  *
565  * Schedule linked works starting from @work to @head.  Work series to
566  * be scheduled starts at @work and includes any consecutive work with
567  * WORK_STRUCT_LINKED set in its predecessor.
568  *
569  * If @nextp is not NULL, it's updated to point to the next work of
570  * the last scheduled work.  This allows move_linked_works() to be
571  * nested inside outer list_for_each_entry_safe().
572  *
573  * CONTEXT:
574  * spin_lock_irq(cwq->lock).
575  */
576 static void move_linked_works(struct work_struct *work, struct list_head *head,
577                               struct work_struct **nextp)
578 {
579         struct work_struct *n;
580
581         /*
582          * Linked worklist will always end before the end of the list,
583          * use NULL for list head.
584          */
585         list_for_each_entry_safe_from(work, n, NULL, entry) {
586                 list_move_tail(&work->entry, head);
587                 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
588                         break;
589         }
590
591         /*
592          * If we're already inside safe list traversal and have moved
593          * multiple works to the scheduled queue, the next position
594          * needs to be updated.
595          */
596         if (nextp)
597                 *nextp = n;
598 }
599
600 static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
601 {
602         struct work_struct *work = list_first_entry(&cwq->delayed_works,
603                                                     struct work_struct, entry);
604
605         move_linked_works(work, &cwq->worklist, NULL);
606         cwq->nr_active++;
607 }
608
609 /**
610  * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
611  * @cwq: cwq of interest
612  * @color: color of work which left the queue
613  *
614  * A work either has completed or is removed from pending queue,
615  * decrement nr_in_flight of its cwq and handle workqueue flushing.
616  *
617  * CONTEXT:
618  * spin_lock_irq(cwq->lock).
619  */
620 static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
621 {
622         /* ignore uncolored works */
623         if (color == WORK_NO_COLOR)
624                 return;
625
626         cwq->nr_in_flight[color]--;
627         cwq->nr_active--;
628
629         /* one down, submit a delayed one */
630         if (!list_empty(&cwq->delayed_works) &&
631             cwq->nr_active < cwq->max_active)
632                 cwq_activate_first_delayed(cwq);
633
634         /* is flush in progress and are we at the flushing tip? */
635         if (likely(cwq->flush_color != color))
636                 return;
637
638         /* are there still in-flight works? */
639         if (cwq->nr_in_flight[color])
640                 return;
641
642         /* this cwq is done, clear flush_color */
643         cwq->flush_color = -1;
644
645         /*
646          * If this was the last cwq, wake up the first flusher.  It
647          * will handle the rest.
648          */
649         if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
650                 complete(&cwq->wq->first_flusher->done);
651 }
652
653 /**
654  * process_one_work - process single work
655  * @worker: self
656  * @work: work to process
657  *
658  * Process @work.  This function contains all the logics necessary to
659  * process a single work including synchronization against and
660  * interaction with other workers on the same cpu, queueing and
661  * flushing.  As long as context requirement is met, any worker can
662  * call this function to process a work.
663  *
664  * CONTEXT:
665  * spin_lock_irq(cwq->lock) which is released and regrabbed.
666  */
667 static void process_one_work(struct worker *worker, struct work_struct *work)
668 {
669         struct cpu_workqueue_struct *cwq = worker->cwq;
670         work_func_t f = work->func;
671         int work_color;
672 #ifdef CONFIG_LOCKDEP
673         /*
674          * It is permissible to free the struct work_struct from
675          * inside the function that is called from it, this we need to
676          * take into account for lockdep too.  To avoid bogus "held
677          * lock freed" warnings as well as problems when looking into
678          * work->lockdep_map, make a copy and use that here.
679          */
680         struct lockdep_map lockdep_map = work->lockdep_map;
681 #endif
682         /* claim and process */
683         debug_work_deactivate(work);
684         worker->current_work = work;
685         work_color = get_work_color(work);
686         list_del_init(&work->entry);
687
688         spin_unlock_irq(&cwq->lock);
689
690         BUG_ON(get_wq_data(work) != cwq);
691         work_clear_pending(work);
692         lock_map_acquire(&cwq->wq->lockdep_map);
693         lock_map_acquire(&lockdep_map);
694         f(work);
695         lock_map_release(&lockdep_map);
696         lock_map_release(&cwq->wq->lockdep_map);
697
698         if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
699                 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
700                        "%s/0x%08x/%d\n",
701                        current->comm, preempt_count(), task_pid_nr(current));
702                 printk(KERN_ERR "    last function: ");
703                 print_symbol("%s\n", (unsigned long)f);
704                 debug_show_held_locks(current);
705                 dump_stack();
706         }
707
708         spin_lock_irq(&cwq->lock);
709
710         /* we're done with it, release */
711         worker->current_work = NULL;
712         cwq_dec_nr_in_flight(cwq, work_color);
713 }
714
715 /**
716  * process_scheduled_works - process scheduled works
717  * @worker: self
718  *
719  * Process all scheduled works.  Please note that the scheduled list
720  * may change while processing a work, so this function repeatedly
721  * fetches a work from the top and executes it.
722  *
723  * CONTEXT:
724  * spin_lock_irq(cwq->lock) which may be released and regrabbed
725  * multiple times.
726  */
727 static void process_scheduled_works(struct worker *worker)
728 {
729         while (!list_empty(&worker->scheduled)) {
730                 struct work_struct *work = list_first_entry(&worker->scheduled,
731                                                 struct work_struct, entry);
732                 process_one_work(worker, work);
733         }
734 }
735
736 /**
737  * worker_thread - the worker thread function
738  * @__worker: self
739  *
740  * The cwq worker thread function.
741  */
742 static int worker_thread(void *__worker)
743 {
744         struct worker *worker = __worker;
745         struct cpu_workqueue_struct *cwq = worker->cwq;
746         DEFINE_WAIT(wait);
747
748         if (cwq->wq->flags & WQ_FREEZEABLE)
749                 set_freezable();
750
751         for (;;) {
752                 prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
753                 if (!freezing(current) &&
754                     !kthread_should_stop() &&
755                     list_empty(&cwq->worklist))
756                         schedule();
757                 finish_wait(&cwq->more_work, &wait);
758
759                 try_to_freeze();
760
761                 if (kthread_should_stop())
762                         break;
763
764                 if (unlikely(!cpumask_equal(&worker->task->cpus_allowed,
765                                             get_cpu_mask(cwq->cpu))))
766                         set_cpus_allowed_ptr(worker->task,
767                                              get_cpu_mask(cwq->cpu));
768
769                 spin_lock_irq(&cwq->lock);
770
771                 while (!list_empty(&cwq->worklist)) {
772                         struct work_struct *work =
773                                 list_first_entry(&cwq->worklist,
774                                                  struct work_struct, entry);
775
776                         if (likely(!(*work_data_bits(work) &
777                                      WORK_STRUCT_LINKED))) {
778                                 /* optimization path, not strictly necessary */
779                                 process_one_work(worker, work);
780                                 if (unlikely(!list_empty(&worker->scheduled)))
781                                         process_scheduled_works(worker);
782                         } else {
783                                 move_linked_works(work, &worker->scheduled,
784                                                   NULL);
785                                 process_scheduled_works(worker);
786                         }
787                 }
788
789                 spin_unlock_irq(&cwq->lock);
790         }
791
792         return 0;
793 }
794
795 struct wq_barrier {
796         struct work_struct      work;
797         struct completion       done;
798 };
799
800 static void wq_barrier_func(struct work_struct *work)
801 {
802         struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
803         complete(&barr->done);
804 }
805
806 /**
807  * insert_wq_barrier - insert a barrier work
808  * @cwq: cwq to insert barrier into
809  * @barr: wq_barrier to insert
810  * @target: target work to attach @barr to
811  * @worker: worker currently executing @target, NULL if @target is not executing
812  *
813  * @barr is linked to @target such that @barr is completed only after
814  * @target finishes execution.  Please note that the ordering
815  * guarantee is observed only with respect to @target and on the local
816  * cpu.
817  *
818  * Currently, a queued barrier can't be canceled.  This is because
819  * try_to_grab_pending() can't determine whether the work to be
820  * grabbed is at the head of the queue and thus can't clear LINKED
821  * flag of the previous work while there must be a valid next work
822  * after a work with LINKED flag set.
823  *
824  * Note that when @worker is non-NULL, @target may be modified
825  * underneath us, so we can't reliably determine cwq from @target.
826  *
827  * CONTEXT:
828  * spin_lock_irq(cwq->lock).
829  */
830 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
831                               struct wq_barrier *barr,
832                               struct work_struct *target, struct worker *worker)
833 {
834         struct list_head *head;
835         unsigned int linked = 0;
836
837         /*
838          * debugobject calls are safe here even with cwq->lock locked
839          * as we know for sure that this will not trigger any of the
840          * checks and call back into the fixup functions where we
841          * might deadlock.
842          */
843         INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
844         __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
845         init_completion(&barr->done);
846
847         /*
848          * If @target is currently being executed, schedule the
849          * barrier to the worker; otherwise, put it after @target.
850          */
851         if (worker)
852                 head = worker->scheduled.next;
853         else {
854                 unsigned long *bits = work_data_bits(target);
855
856                 head = target->entry.next;
857                 /* there can already be other linked works, inherit and set */
858                 linked = *bits & WORK_STRUCT_LINKED;
859                 __set_bit(WORK_STRUCT_LINKED_BIT, bits);
860         }
861
862         debug_work_activate(&barr->work);
863         insert_work(cwq, &barr->work, head,
864                     work_color_to_flags(WORK_NO_COLOR) | linked);
865 }
866
867 /**
868  * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing
869  * @wq: workqueue being flushed
870  * @flush_color: new flush color, < 0 for no-op
871  * @work_color: new work color, < 0 for no-op
872  *
873  * Prepare cwqs for workqueue flushing.
874  *
875  * If @flush_color is non-negative, flush_color on all cwqs should be
876  * -1.  If no cwq has in-flight commands at the specified color, all
877  * cwq->flush_color's stay at -1 and %false is returned.  If any cwq
878  * has in flight commands, its cwq->flush_color is set to
879  * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq
880  * wakeup logic is armed and %true is returned.
881  *
882  * The caller should have initialized @wq->first_flusher prior to
883  * calling this function with non-negative @flush_color.  If
884  * @flush_color is negative, no flush color update is done and %false
885  * is returned.
886  *
887  * If @work_color is non-negative, all cwqs should have the same
888  * work_color which is previous to @work_color and all will be
889  * advanced to @work_color.
890  *
891  * CONTEXT:
892  * mutex_lock(wq->flush_mutex).
893  *
894  * RETURNS:
895  * %true if @flush_color >= 0 and there's something to flush.  %false
896  * otherwise.
897  */
898 static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
899                                       int flush_color, int work_color)
900 {
901         bool wait = false;
902         unsigned int cpu;
903
904         if (flush_color >= 0) {
905                 BUG_ON(atomic_read(&wq->nr_cwqs_to_flush));
906                 atomic_set(&wq->nr_cwqs_to_flush, 1);
907         }
908
909         for_each_possible_cpu(cpu) {
910                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
911
912                 spin_lock_irq(&cwq->lock);
913
914                 if (flush_color >= 0) {
915                         BUG_ON(cwq->flush_color != -1);
916
917                         if (cwq->nr_in_flight[flush_color]) {
918                                 cwq->flush_color = flush_color;
919                                 atomic_inc(&wq->nr_cwqs_to_flush);
920                                 wait = true;
921                         }
922                 }
923
924                 if (work_color >= 0) {
925                         BUG_ON(work_color != work_next_color(cwq->work_color));
926                         cwq->work_color = work_color;
927                 }
928
929                 spin_unlock_irq(&cwq->lock);
930         }
931
932         if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush))
933                 complete(&wq->first_flusher->done);
934
935         return wait;
936 }
937
938 /**
939  * flush_workqueue - ensure that any scheduled work has run to completion.
940  * @wq: workqueue to flush
941  *
942  * Forces execution of the workqueue and blocks until its completion.
943  * This is typically used in driver shutdown handlers.
944  *
945  * We sleep until all works which were queued on entry have been handled,
946  * but we are not livelocked by new incoming ones.
947  */
948 void flush_workqueue(struct workqueue_struct *wq)
949 {
950         struct wq_flusher this_flusher = {
951                 .list = LIST_HEAD_INIT(this_flusher.list),
952                 .flush_color = -1,
953                 .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done),
954         };
955         int next_color;
956
957         lock_map_acquire(&wq->lockdep_map);
958         lock_map_release(&wq->lockdep_map);
959
960         mutex_lock(&wq->flush_mutex);
961
962         /*
963          * Start-to-wait phase
964          */
965         next_color = work_next_color(wq->work_color);
966
967         if (next_color != wq->flush_color) {
968                 /*
969                  * Color space is not full.  The current work_color
970                  * becomes our flush_color and work_color is advanced
971                  * by one.
972                  */
973                 BUG_ON(!list_empty(&wq->flusher_overflow));
974                 this_flusher.flush_color = wq->work_color;
975                 wq->work_color = next_color;
976
977                 if (!wq->first_flusher) {
978                         /* no flush in progress, become the first flusher */
979                         BUG_ON(wq->flush_color != this_flusher.flush_color);
980
981                         wq->first_flusher = &this_flusher;
982
983                         if (!flush_workqueue_prep_cwqs(wq, wq->flush_color,
984                                                        wq->work_color)) {
985                                 /* nothing to flush, done */
986                                 wq->flush_color = next_color;
987                                 wq->first_flusher = NULL;
988                                 goto out_unlock;
989                         }
990                 } else {
991                         /* wait in queue */
992                         BUG_ON(wq->flush_color == this_flusher.flush_color);
993                         list_add_tail(&this_flusher.list, &wq->flusher_queue);
994                         flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
995                 }
996         } else {
997                 /*
998                  * Oops, color space is full, wait on overflow queue.
999                  * The next flush completion will assign us
1000                  * flush_color and transfer to flusher_queue.
1001                  */
1002                 list_add_tail(&this_flusher.list, &wq->flusher_overflow);
1003         }
1004
1005         mutex_unlock(&wq->flush_mutex);
1006
1007         wait_for_completion(&this_flusher.done);
1008
1009         /*
1010          * Wake-up-and-cascade phase
1011          *
1012          * First flushers are responsible for cascading flushes and
1013          * handling overflow.  Non-first flushers can simply return.
1014          */
1015         if (wq->first_flusher != &this_flusher)
1016                 return;
1017
1018         mutex_lock(&wq->flush_mutex);
1019
1020         wq->first_flusher = NULL;
1021
1022         BUG_ON(!list_empty(&this_flusher.list));
1023         BUG_ON(wq->flush_color != this_flusher.flush_color);
1024
1025         while (true) {
1026                 struct wq_flusher *next, *tmp;
1027
1028                 /* complete all the flushers sharing the current flush color */
1029                 list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
1030                         if (next->flush_color != wq->flush_color)
1031                                 break;
1032                         list_del_init(&next->list);
1033                         complete(&next->done);
1034                 }
1035
1036                 BUG_ON(!list_empty(&wq->flusher_overflow) &&
1037                        wq->flush_color != work_next_color(wq->work_color));
1038
1039                 /* this flush_color is finished, advance by one */
1040                 wq->flush_color = work_next_color(wq->flush_color);
1041
1042                 /* one color has been freed, handle overflow queue */
1043                 if (!list_empty(&wq->flusher_overflow)) {
1044                         /*
1045                          * Assign the same color to all overflowed
1046                          * flushers, advance work_color and append to
1047                          * flusher_queue.  This is the start-to-wait
1048                          * phase for these overflowed flushers.
1049                          */
1050                         list_for_each_entry(tmp, &wq->flusher_overflow, list)
1051                                 tmp->flush_color = wq->work_color;
1052
1053                         wq->work_color = work_next_color(wq->work_color);
1054
1055                         list_splice_tail_init(&wq->flusher_overflow,
1056                                               &wq->flusher_queue);
1057                         flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
1058                 }
1059
1060                 if (list_empty(&wq->flusher_queue)) {
1061                         BUG_ON(wq->flush_color != wq->work_color);
1062                         break;
1063                 }
1064
1065                 /*
1066                  * Need to flush more colors.  Make the next flusher
1067                  * the new first flusher and arm cwqs.
1068                  */
1069                 BUG_ON(wq->flush_color == wq->work_color);
1070                 BUG_ON(wq->flush_color != next->flush_color);
1071
1072                 list_del_init(&next->list);
1073                 wq->first_flusher = next;
1074
1075                 if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1))
1076                         break;
1077
1078                 /*
1079                  * Meh... this color is already done, clear first
1080                  * flusher and repeat cascading.
1081                  */
1082                 wq->first_flusher = NULL;
1083         }
1084
1085 out_unlock:
1086         mutex_unlock(&wq->flush_mutex);
1087 }
1088 EXPORT_SYMBOL_GPL(flush_workqueue);
1089
1090 /**
1091  * flush_work - block until a work_struct's callback has terminated
1092  * @work: the work which is to be flushed
1093  *
1094  * Returns false if @work has already terminated.
1095  *
1096  * It is expected that, prior to calling flush_work(), the caller has
1097  * arranged for the work to not be requeued, otherwise it doesn't make
1098  * sense to use this function.
1099  */
1100 int flush_work(struct work_struct *work)
1101 {
1102         struct worker *worker = NULL;
1103         struct cpu_workqueue_struct *cwq;
1104         struct wq_barrier barr;
1105
1106         might_sleep();
1107         cwq = get_wq_data(work);
1108         if (!cwq)
1109                 return 0;
1110
1111         lock_map_acquire(&cwq->wq->lockdep_map);
1112         lock_map_release(&cwq->wq->lockdep_map);
1113
1114         spin_lock_irq(&cwq->lock);
1115         if (!list_empty(&work->entry)) {
1116                 /*
1117                  * See the comment near try_to_grab_pending()->smp_rmb().
1118                  * If it was re-queued under us we are not going to wait.
1119                  */
1120                 smp_rmb();
1121                 if (unlikely(cwq != get_wq_data(work)))
1122                         goto already_gone;
1123         } else {
1124                 if (cwq->worker && cwq->worker->current_work == work)
1125                         worker = cwq->worker;
1126                 if (!worker)
1127                         goto already_gone;
1128         }
1129
1130         insert_wq_barrier(cwq, &barr, work, worker);
1131         spin_unlock_irq(&cwq->lock);
1132         wait_for_completion(&barr.done);
1133         destroy_work_on_stack(&barr.work);
1134         return 1;
1135 already_gone:
1136         spin_unlock_irq(&cwq->lock);
1137         return 0;
1138 }
1139 EXPORT_SYMBOL_GPL(flush_work);
1140
1141 /*
1142  * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
1143  * so this work can't be re-armed in any way.
1144  */
1145 static int try_to_grab_pending(struct work_struct *work)
1146 {
1147         struct cpu_workqueue_struct *cwq;
1148         int ret = -1;
1149
1150         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
1151                 return 0;
1152
1153         /*
1154          * The queueing is in progress, or it is already queued. Try to
1155          * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
1156          */
1157
1158         cwq = get_wq_data(work);
1159         if (!cwq)
1160                 return ret;
1161
1162         spin_lock_irq(&cwq->lock);
1163         if (!list_empty(&work->entry)) {
1164                 /*
1165                  * This work is queued, but perhaps we locked the wrong cwq.
1166                  * In that case we must see the new value after rmb(), see
1167                  * insert_work()->wmb().
1168                  */
1169                 smp_rmb();
1170                 if (cwq == get_wq_data(work)) {
1171                         debug_work_deactivate(work);
1172                         list_del_init(&work->entry);
1173                         cwq_dec_nr_in_flight(cwq, get_work_color(work));
1174                         ret = 1;
1175                 }
1176         }
1177         spin_unlock_irq(&cwq->lock);
1178
1179         return ret;
1180 }
1181
1182 static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
1183                                 struct work_struct *work)
1184 {
1185         struct wq_barrier barr;
1186         struct worker *worker;
1187
1188         spin_lock_irq(&cwq->lock);
1189
1190         worker = NULL;
1191         if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
1192                 worker = cwq->worker;
1193                 insert_wq_barrier(cwq, &barr, work, worker);
1194         }
1195
1196         spin_unlock_irq(&cwq->lock);
1197
1198         if (unlikely(worker)) {
1199                 wait_for_completion(&barr.done);
1200                 destroy_work_on_stack(&barr.work);
1201         }
1202 }
1203
1204 static void wait_on_work(struct work_struct *work)
1205 {
1206         struct cpu_workqueue_struct *cwq;
1207         struct workqueue_struct *wq;
1208         int cpu;
1209
1210         might_sleep();
1211
1212         lock_map_acquire(&work->lockdep_map);
1213         lock_map_release(&work->lockdep_map);
1214
1215         cwq = get_wq_data(work);
1216         if (!cwq)
1217                 return;
1218
1219         wq = cwq->wq;
1220
1221         for_each_possible_cpu(cpu)
1222                 wait_on_cpu_work(get_cwq(cpu, wq), work);
1223 }
1224
1225 static int __cancel_work_timer(struct work_struct *work,
1226                                 struct timer_list* timer)
1227 {
1228         int ret;
1229
1230         do {
1231                 ret = (timer && likely(del_timer(timer)));
1232                 if (!ret)
1233                         ret = try_to_grab_pending(work);
1234                 wait_on_work(work);
1235         } while (unlikely(ret < 0));
1236
1237         clear_wq_data(work);
1238         return ret;
1239 }
1240
1241 /**
1242  * cancel_work_sync - block until a work_struct's callback has terminated
1243  * @work: the work which is to be flushed
1244  *
1245  * Returns true if @work was pending.
1246  *
1247  * cancel_work_sync() will cancel the work if it is queued. If the work's
1248  * callback appears to be running, cancel_work_sync() will block until it
1249  * has completed.
1250  *
1251  * It is possible to use this function if the work re-queues itself. It can
1252  * cancel the work even if it migrates to another workqueue, however in that
1253  * case it only guarantees that work->func() has completed on the last queued
1254  * workqueue.
1255  *
1256  * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not
1257  * pending, otherwise it goes into a busy-wait loop until the timer expires.
1258  *
1259  * The caller must ensure that workqueue_struct on which this work was last
1260  * queued can't be destroyed before this function returns.
1261  */
1262 int cancel_work_sync(struct work_struct *work)
1263 {
1264         return __cancel_work_timer(work, NULL);
1265 }
1266 EXPORT_SYMBOL_GPL(cancel_work_sync);
1267
1268 /**
1269  * cancel_delayed_work_sync - reliably kill off a delayed work.
1270  * @dwork: the delayed work struct
1271  *
1272  * Returns true if @dwork was pending.
1273  *
1274  * It is possible to use this function if @dwork rearms itself via queue_work()
1275  * or queue_delayed_work(). See also the comment for cancel_work_sync().
1276  */
1277 int cancel_delayed_work_sync(struct delayed_work *dwork)
1278 {
1279         return __cancel_work_timer(&dwork->work, &dwork->timer);
1280 }
1281 EXPORT_SYMBOL(cancel_delayed_work_sync);
1282
1283 static struct workqueue_struct *keventd_wq __read_mostly;
1284
1285 /**
1286  * schedule_work - put work task in global workqueue
1287  * @work: job to be done
1288  *
1289  * Returns zero if @work was already on the kernel-global workqueue and
1290  * non-zero otherwise.
1291  *
1292  * This puts a job in the kernel-global workqueue if it was not already
1293  * queued and leaves it in the same position on the kernel-global
1294  * workqueue otherwise.
1295  */
1296 int schedule_work(struct work_struct *work)
1297 {
1298         return queue_work(keventd_wq, work);
1299 }
1300 EXPORT_SYMBOL(schedule_work);
1301
1302 /*
1303  * schedule_work_on - put work task on a specific cpu
1304  * @cpu: cpu to put the work task on
1305  * @work: job to be done
1306  *
1307  * This puts a job on a specific cpu
1308  */
1309 int schedule_work_on(int cpu, struct work_struct *work)
1310 {
1311         return queue_work_on(cpu, keventd_wq, work);
1312 }
1313 EXPORT_SYMBOL(schedule_work_on);
1314
1315 /**
1316  * schedule_delayed_work - put work task in global workqueue after delay
1317  * @dwork: job to be done
1318  * @delay: number of jiffies to wait or 0 for immediate execution
1319  *
1320  * After waiting for a given time this puts a job in the kernel-global
1321  * workqueue.
1322  */
1323 int schedule_delayed_work(struct delayed_work *dwork,
1324                                         unsigned long delay)
1325 {
1326         return queue_delayed_work(keventd_wq, dwork, delay);
1327 }
1328 EXPORT_SYMBOL(schedule_delayed_work);
1329
1330 /**
1331  * flush_delayed_work - block until a dwork_struct's callback has terminated
1332  * @dwork: the delayed work which is to be flushed
1333  *
1334  * Any timeout is cancelled, and any pending work is run immediately.
1335  */
1336 void flush_delayed_work(struct delayed_work *dwork)
1337 {
1338         if (del_timer_sync(&dwork->timer)) {
1339                 __queue_work(get_cpu(), get_wq_data(&dwork->work)->wq,
1340                              &dwork->work);
1341                 put_cpu();
1342         }
1343         flush_work(&dwork->work);
1344 }
1345 EXPORT_SYMBOL(flush_delayed_work);
1346
1347 /**
1348  * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
1349  * @cpu: cpu to use
1350  * @dwork: job to be done
1351  * @delay: number of jiffies to wait
1352  *
1353  * After waiting for a given time this puts a job in the kernel-global
1354  * workqueue on the specified CPU.
1355  */
1356 int schedule_delayed_work_on(int cpu,
1357                         struct delayed_work *dwork, unsigned long delay)
1358 {
1359         return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
1360 }
1361 EXPORT_SYMBOL(schedule_delayed_work_on);
1362
1363 /**
1364  * schedule_on_each_cpu - call a function on each online CPU from keventd
1365  * @func: the function to call
1366  *
1367  * Returns zero on success.
1368  * Returns -ve errno on failure.
1369  *
1370  * schedule_on_each_cpu() is very slow.
1371  */
1372 int schedule_on_each_cpu(work_func_t func)
1373 {
1374         int cpu;
1375         int orig = -1;
1376         struct work_struct *works;
1377
1378         works = alloc_percpu(struct work_struct);
1379         if (!works)
1380                 return -ENOMEM;
1381
1382         get_online_cpus();
1383
1384         /*
1385          * When running in keventd don't schedule a work item on
1386          * itself.  Can just call directly because the work queue is
1387          * already bound.  This also is faster.
1388          */
1389         if (current_is_keventd())
1390                 orig = raw_smp_processor_id();
1391
1392         for_each_online_cpu(cpu) {
1393                 struct work_struct *work = per_cpu_ptr(works, cpu);
1394
1395                 INIT_WORK(work, func);
1396                 if (cpu != orig)
1397                         schedule_work_on(cpu, work);
1398         }
1399         if (orig >= 0)
1400                 func(per_cpu_ptr(works, orig));
1401
1402         for_each_online_cpu(cpu)
1403                 flush_work(per_cpu_ptr(works, cpu));
1404
1405         put_online_cpus();
1406         free_percpu(works);
1407         return 0;
1408 }
1409
1410 /**
1411  * flush_scheduled_work - ensure that any scheduled work has run to completion.
1412  *
1413  * Forces execution of the kernel-global workqueue and blocks until its
1414  * completion.
1415  *
1416  * Think twice before calling this function!  It's very easy to get into
1417  * trouble if you don't take great care.  Either of the following situations
1418  * will lead to deadlock:
1419  *
1420  *      One of the work items currently on the workqueue needs to acquire
1421  *      a lock held by your code or its caller.
1422  *
1423  *      Your code is running in the context of a work routine.
1424  *
1425  * They will be detected by lockdep when they occur, but the first might not
1426  * occur very often.  It depends on what work items are on the workqueue and
1427  * what locks they need, which you have no control over.
1428  *
1429  * In most situations flushing the entire workqueue is overkill; you merely
1430  * need to know that a particular work item isn't queued and isn't running.
1431  * In such cases you should use cancel_delayed_work_sync() or
1432  * cancel_work_sync() instead.
1433  */
1434 void flush_scheduled_work(void)
1435 {
1436         flush_workqueue(keventd_wq);
1437 }
1438 EXPORT_SYMBOL(flush_scheduled_work);
1439
1440 /**
1441  * execute_in_process_context - reliably execute the routine with user context
1442  * @fn:         the function to execute
1443  * @ew:         guaranteed storage for the execute work structure (must
1444  *              be available when the work executes)
1445  *
1446  * Executes the function immediately if process context is available,
1447  * otherwise schedules the function for delayed execution.
1448  *
1449  * Returns:     0 - function was executed
1450  *              1 - function was scheduled for execution
1451  */
1452 int execute_in_process_context(work_func_t fn, struct execute_work *ew)
1453 {
1454         if (!in_interrupt()) {
1455                 fn(&ew->work);
1456                 return 0;
1457         }
1458
1459         INIT_WORK(&ew->work, fn);
1460         schedule_work(&ew->work);
1461
1462         return 1;
1463 }
1464 EXPORT_SYMBOL_GPL(execute_in_process_context);
1465
1466 int keventd_up(void)
1467 {
1468         return keventd_wq != NULL;
1469 }
1470
1471 int current_is_keventd(void)
1472 {
1473         struct cpu_workqueue_struct *cwq;
1474         int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */
1475         int ret = 0;
1476
1477         BUG_ON(!keventd_wq);
1478
1479         cwq = get_cwq(cpu, keventd_wq);
1480         if (current == cwq->worker->task)
1481                 ret = 1;
1482
1483         return ret;
1484
1485 }
1486
1487 static struct cpu_workqueue_struct *alloc_cwqs(void)
1488 {
1489         /*
1490          * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
1491          * Make sure that the alignment isn't lower than that of
1492          * unsigned long long.
1493          */
1494         const size_t size = sizeof(struct cpu_workqueue_struct);
1495         const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
1496                                    __alignof__(unsigned long long));
1497         struct cpu_workqueue_struct *cwqs;
1498 #ifndef CONFIG_SMP
1499         void *ptr;
1500
1501         /*
1502          * On UP, percpu allocator doesn't honor alignment parameter
1503          * and simply uses arch-dependent default.  Allocate enough
1504          * room to align cwq and put an extra pointer at the end
1505          * pointing back to the originally allocated pointer which
1506          * will be used for free.
1507          *
1508          * FIXME: This really belongs to UP percpu code.  Update UP
1509          * percpu code to honor alignment and remove this ugliness.
1510          */
1511         ptr = __alloc_percpu(size + align + sizeof(void *), 1);
1512         cwqs = PTR_ALIGN(ptr, align);
1513         *(void **)per_cpu_ptr(cwqs + 1, 0) = ptr;
1514 #else
1515         /* On SMP, percpu allocator can do it itself */
1516         cwqs = __alloc_percpu(size, align);
1517 #endif
1518         /* just in case, make sure it's actually aligned */
1519         BUG_ON(!IS_ALIGNED((unsigned long)cwqs, align));
1520         return cwqs;
1521 }
1522
1523 static void free_cwqs(struct cpu_workqueue_struct *cwqs)
1524 {
1525 #ifndef CONFIG_SMP
1526         /* on UP, the pointer to free is stored right after the cwq */
1527         if (cwqs)
1528                 free_percpu(*(void **)per_cpu_ptr(cwqs + 1, 0));
1529 #else
1530         free_percpu(cwqs);
1531 #endif
1532 }
1533
1534 struct workqueue_struct *__create_workqueue_key(const char *name,
1535                                                 unsigned int flags,
1536                                                 int max_active,
1537                                                 struct lock_class_key *key,
1538                                                 const char *lock_name)
1539 {
1540         bool singlethread = flags & WQ_SINGLE_THREAD;
1541         struct workqueue_struct *wq;
1542         bool failed = false;
1543         unsigned int cpu;
1544
1545         max_active = clamp_val(max_active, 1, INT_MAX);
1546
1547         wq = kzalloc(sizeof(*wq), GFP_KERNEL);
1548         if (!wq)
1549                 goto err;
1550
1551         wq->cpu_wq = alloc_cwqs();
1552         if (!wq->cpu_wq)
1553                 goto err;
1554
1555         wq->flags = flags;
1556         mutex_init(&wq->flush_mutex);
1557         atomic_set(&wq->nr_cwqs_to_flush, 0);
1558         INIT_LIST_HEAD(&wq->flusher_queue);
1559         INIT_LIST_HEAD(&wq->flusher_overflow);
1560         wq->name = name;
1561         lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
1562         INIT_LIST_HEAD(&wq->list);
1563
1564         cpu_maps_update_begin();
1565         /*
1566          * We must initialize cwqs for each possible cpu even if we
1567          * are going to call destroy_workqueue() finally. Otherwise
1568          * cpu_up() can hit the uninitialized cwq once we drop the
1569          * lock.
1570          */
1571         for_each_possible_cpu(cpu) {
1572                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1573
1574                 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
1575                 cwq->cpu = cpu;
1576                 cwq->wq = wq;
1577                 cwq->flush_color = -1;
1578                 cwq->max_active = max_active;
1579                 spin_lock_init(&cwq->lock);
1580                 INIT_LIST_HEAD(&cwq->worklist);
1581                 INIT_LIST_HEAD(&cwq->delayed_works);
1582                 init_waitqueue_head(&cwq->more_work);
1583
1584                 if (failed)
1585                         continue;
1586                 cwq->worker = create_worker(cwq,
1587                                             cpu_online(cpu) && !singlethread);
1588                 if (cwq->worker)
1589                         start_worker(cwq->worker);
1590                 else
1591                         failed = true;
1592         }
1593
1594         spin_lock(&workqueue_lock);
1595         list_add(&wq->list, &workqueues);
1596         spin_unlock(&workqueue_lock);
1597
1598         cpu_maps_update_done();
1599
1600         if (failed) {
1601                 destroy_workqueue(wq);
1602                 wq = NULL;
1603         }
1604         return wq;
1605 err:
1606         if (wq) {
1607                 free_cwqs(wq->cpu_wq);
1608                 kfree(wq);
1609         }
1610         return NULL;
1611 }
1612 EXPORT_SYMBOL_GPL(__create_workqueue_key);
1613
1614 /**
1615  * destroy_workqueue - safely terminate a workqueue
1616  * @wq: target workqueue
1617  *
1618  * Safely destroy a workqueue. All work currently pending will be done first.
1619  */
1620 void destroy_workqueue(struct workqueue_struct *wq)
1621 {
1622         int cpu;
1623
1624         cpu_maps_update_begin();
1625         spin_lock(&workqueue_lock);
1626         list_del(&wq->list);
1627         spin_unlock(&workqueue_lock);
1628         cpu_maps_update_done();
1629
1630         flush_workqueue(wq);
1631
1632         for_each_possible_cpu(cpu) {
1633                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1634                 int i;
1635
1636                 if (cwq->worker) {
1637                         destroy_worker(cwq->worker);
1638                         cwq->worker = NULL;
1639                 }
1640
1641                 for (i = 0; i < WORK_NR_COLORS; i++)
1642                         BUG_ON(cwq->nr_in_flight[i]);
1643                 BUG_ON(cwq->nr_active);
1644                 BUG_ON(!list_empty(&cwq->delayed_works));
1645         }
1646
1647         free_cwqs(wq->cpu_wq);
1648         kfree(wq);
1649 }
1650 EXPORT_SYMBOL_GPL(destroy_workqueue);
1651
1652 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
1653                                                 unsigned long action,
1654                                                 void *hcpu)
1655 {
1656         unsigned int cpu = (unsigned long)hcpu;
1657         struct cpu_workqueue_struct *cwq;
1658         struct workqueue_struct *wq;
1659
1660         action &= ~CPU_TASKS_FROZEN;
1661
1662         list_for_each_entry(wq, &workqueues, list) {
1663                 if (wq->flags & WQ_SINGLE_THREAD)
1664                         continue;
1665
1666                 cwq = get_cwq(cpu, wq);
1667
1668                 switch (action) {
1669                 case CPU_POST_DEAD:
1670                         flush_workqueue(wq);
1671                         break;
1672                 }
1673         }
1674
1675         return notifier_from_errno(0);
1676 }
1677
1678 #ifdef CONFIG_SMP
1679
1680 struct work_for_cpu {
1681         struct completion completion;
1682         long (*fn)(void *);
1683         void *arg;
1684         long ret;
1685 };
1686
1687 static int do_work_for_cpu(void *_wfc)
1688 {
1689         struct work_for_cpu *wfc = _wfc;
1690         wfc->ret = wfc->fn(wfc->arg);
1691         complete(&wfc->completion);
1692         return 0;
1693 }
1694
1695 /**
1696  * work_on_cpu - run a function in user context on a particular cpu
1697  * @cpu: the cpu to run on
1698  * @fn: the function to run
1699  * @arg: the function arg
1700  *
1701  * This will return the value @fn returns.
1702  * It is up to the caller to ensure that the cpu doesn't go offline.
1703  * The caller must not hold any locks which would prevent @fn from completing.
1704  */
1705 long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
1706 {
1707         struct task_struct *sub_thread;
1708         struct work_for_cpu wfc = {
1709                 .completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion),
1710                 .fn = fn,
1711                 .arg = arg,
1712         };
1713
1714         sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu");
1715         if (IS_ERR(sub_thread))
1716                 return PTR_ERR(sub_thread);
1717         kthread_bind(sub_thread, cpu);
1718         wake_up_process(sub_thread);
1719         wait_for_completion(&wfc.completion);
1720         return wfc.ret;
1721 }
1722 EXPORT_SYMBOL_GPL(work_on_cpu);
1723 #endif /* CONFIG_SMP */
1724
1725 void __init init_workqueues(void)
1726 {
1727         unsigned int cpu;
1728
1729         for_each_possible_cpu(cpu)
1730                 ida_init(&per_cpu(worker_ida, cpu));
1731
1732         singlethread_cpu = cpumask_first(cpu_possible_mask);
1733         hotcpu_notifier(workqueue_cpu_callback, 0);
1734         keventd_wq = create_workqueue("events");
1735         BUG_ON(!keventd_wq);
1736 }