+- add patches.fixes/linux-post-2.6.3-20040220
[linux-flexiantxendom0-3.2.10.git] / kernel / workqueue.c
1 /*
2  * linux/kernel/workqueue.c
3  *
4  * Generic mechanism for defining kernel helper threads for running
5  * arbitrary tasks in process context.
6  *
7  * Started by Ingo Molnar, Copyright (C) 2002
8  *
9  * Derived from the taskqueue/keventd code by:
10  *
11  *   David Woodhouse <dwmw2@redhat.com>
12  *   Andrew Morton <andrewm@uow.edu.au>
13  *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
14  *   Theodore Ts'o <tytso@mit.edu>
15  */
16
17 #include <linux/module.h>
18 #include <linux/kernel.h>
19 #include <linux/sched.h>
20 #include <linux/init.h>
21 #include <linux/signal.h>
22 #include <linux/completion.h>
23 #include <linux/workqueue.h>
24 #include <linux/slab.h>
25 #include <linux/kthread.h>
26
27 /*
28  * The per-CPU workqueue.
29  *
30  * The sequence counters are for flush_scheduled_work().  It wants to wait
31  * until until all currently-scheduled works are completed, but it doesn't
32  * want to be livelocked by new, incoming ones.  So it waits until
33  * remove_sequence is >= the insert_sequence which pertained when
34  * flush_scheduled_work() was called.
35  */
36 struct cpu_workqueue_struct {
37
38         spinlock_t lock;
39
40         long remove_sequence;   /* Least-recently added (next to run) */
41         long insert_sequence;   /* Next to add */
42
43         struct list_head worklist;
44         wait_queue_head_t more_work;
45         wait_queue_head_t work_done;
46
47         struct workqueue_struct *wq;
48         task_t *thread;
49
50 } ____cacheline_aligned;
51
52 /*
53  * The externally visible workqueue abstraction is an array of
54  * per-CPU workqueues:
55  */
56 struct workqueue_struct {
57         struct cpu_workqueue_struct cpu_wq[NR_CPUS];
58 };
59
60 /* Preempt must be disabled. */
61 static void __queue_work(struct cpu_workqueue_struct *cwq,
62                          struct work_struct *work)
63 {
64         unsigned long flags;
65
66         spin_lock_irqsave(&cwq->lock, flags);
67         work->wq_data = cwq;
68         list_add_tail(&work->entry, &cwq->worklist);
69         cwq->insert_sequence++;
70         wake_up(&cwq->more_work);
71         spin_unlock_irqrestore(&cwq->lock, flags);
72 }
73
74 /*
75  * Queue work on a workqueue. Return non-zero if it was successfully
76  * added.
77  *
78  * We queue the work to the CPU it was submitted, but there is no
79  * guarantee that it will be processed by that CPU.
80  */
81 int queue_work(struct workqueue_struct *wq, struct work_struct *work)
82 {
83         int ret = 0, cpu = get_cpu();
84
85         if (!test_and_set_bit(0, &work->pending)) {
86                 BUG_ON(!list_empty(&work->entry));
87                 __queue_work(wq->cpu_wq + cpu, work);
88                 ret = 1;
89         }
90         put_cpu();
91         return ret;
92 }
93
94 static void delayed_work_timer_fn(unsigned long __data)
95 {
96         struct work_struct *work = (struct work_struct *)__data;
97         struct workqueue_struct *wq = work->wq_data;
98
99         __queue_work(wq->cpu_wq + smp_processor_id(), work);
100 }
101
102 int queue_delayed_work(struct workqueue_struct *wq,
103                         struct work_struct *work, unsigned long delay)
104 {
105         int ret = 0;
106         struct timer_list *timer = &work->timer;
107
108         if (!test_and_set_bit(0, &work->pending)) {
109                 BUG_ON(timer_pending(timer));
110                 BUG_ON(!list_empty(&work->entry));
111
112                 /* This stores wq for the moment, for the timer_fn */
113                 work->wq_data = wq;
114                 timer->expires = jiffies + delay;
115                 timer->data = (unsigned long)work;
116                 timer->function = delayed_work_timer_fn;
117                 add_timer(timer);
118                 ret = 1;
119         }
120         return ret;
121 }
122
123 static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
124 {
125         unsigned long flags;
126
127         /*
128          * Keep taking off work from the queue until
129          * done.
130          */
131         spin_lock_irqsave(&cwq->lock, flags);
132         while (!list_empty(&cwq->worklist)) {
133                 struct work_struct *work = list_entry(cwq->worklist.next,
134                                                 struct work_struct, entry);
135                 void (*f) (void *) = work->func;
136                 void *data = work->data;
137
138                 list_del_init(cwq->worklist.next);
139                 spin_unlock_irqrestore(&cwq->lock, flags);
140
141                 BUG_ON(work->wq_data != cwq);
142                 clear_bit(0, &work->pending);
143                 f(data);
144
145                 spin_lock_irqsave(&cwq->lock, flags);
146                 cwq->remove_sequence++;
147                 wake_up(&cwq->work_done);
148         }
149         spin_unlock_irqrestore(&cwq->lock, flags);
150 }
151
152 static int worker_thread(void *__cwq)
153 {
154         struct cpu_workqueue_struct *cwq = __cwq;
155         int cpu = cwq - cwq->wq->cpu_wq;
156         DECLARE_WAITQUEUE(wait, current);
157         struct k_sigaction sa;
158         sigset_t blocked;
159
160         current->flags |= PF_IOTHREAD;
161
162         set_user_nice(current, -10);
163         BUG_ON(smp_processor_id() != cpu);
164
165         /* Block and flush all signals */
166         sigfillset(&blocked);
167         sigprocmask(SIG_BLOCK, &blocked, NULL);
168         flush_signals(current);
169
170         /* SIG_IGN makes children autoreap: see do_notify_parent(). */
171         sa.sa.sa_handler = SIG_IGN;
172         sa.sa.sa_flags = 0;
173         siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
174         do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
175
176         while (!kthread_should_stop()) {
177                 set_task_state(current, TASK_INTERRUPTIBLE);
178
179                 add_wait_queue(&cwq->more_work, &wait);
180                 if (list_empty(&cwq->worklist))
181                         schedule();
182                 else
183                         set_task_state(current, TASK_RUNNING);
184                 remove_wait_queue(&cwq->more_work, &wait);
185
186                 if (!list_empty(&cwq->worklist))
187                         run_workqueue(cwq);
188         }
189         return 0;
190 }
191
192 /*
193  * flush_workqueue - ensure that any scheduled work has run to completion.
194  *
195  * Forces execution of the workqueue and blocks until its completion.
196  * This is typically used in driver shutdown handlers.
197  *
198  * This function will sample each workqueue's current insert_sequence number and
199  * will sleep until the head sequence is greater than or equal to that.  This
200  * means that we sleep until all works which were queued on entry have been
201  * handled, but we are not livelocked by new incoming ones.
202  *
203  * This function used to run the workqueues itself.  Now we just wait for the
204  * helper threads to do it.
205  */
206 void flush_workqueue(struct workqueue_struct *wq)
207 {
208         struct cpu_workqueue_struct *cwq;
209         int cpu;
210
211         might_sleep();
212
213         for (cpu = 0; cpu < NR_CPUS; cpu++) {
214                 DEFINE_WAIT(wait);
215                 long sequence_needed;
216
217                 if (!cpu_online(cpu))
218                         continue;
219                 cwq = wq->cpu_wq + cpu;
220
221                 spin_lock_irq(&cwq->lock);
222                 sequence_needed = cwq->insert_sequence;
223
224                 while (sequence_needed - cwq->remove_sequence > 0) {
225                         prepare_to_wait(&cwq->work_done, &wait,
226                                         TASK_UNINTERRUPTIBLE);
227                         spin_unlock_irq(&cwq->lock);
228                         schedule();
229                         spin_lock_irq(&cwq->lock);
230                 }
231                 finish_wait(&cwq->work_done, &wait);
232                 spin_unlock_irq(&cwq->lock);
233         }
234 }
235
236 static int create_workqueue_thread(struct workqueue_struct *wq,
237                                    const char *name,
238                                    int cpu)
239 {
240         struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
241         struct task_struct *p;
242
243         spin_lock_init(&cwq->lock);
244         cwq->wq = wq;
245         cwq->thread = NULL;
246         cwq->insert_sequence = 0;
247         cwq->remove_sequence = 0;
248         INIT_LIST_HEAD(&cwq->worklist);
249         init_waitqueue_head(&cwq->more_work);
250         init_waitqueue_head(&cwq->work_done);
251
252         p = kthread_create(worker_thread, cwq, "%s/%d", name, cpu);
253         if (IS_ERR(p))
254                 return PTR_ERR(p);
255         cwq->thread = p;
256         kthread_bind(p, cpu);
257         return 0;
258 }
259
260 struct workqueue_struct *create_workqueue(const char *name)
261 {
262         int cpu, destroy = 0;
263         struct workqueue_struct *wq;
264
265         BUG_ON(strlen(name) > 10);
266
267         wq = kmalloc(sizeof(*wq), GFP_KERNEL);
268         if (!wq)
269                 return NULL;
270
271         for (cpu = 0; cpu < NR_CPUS; cpu++) {
272                 if (!cpu_online(cpu))
273                         continue;
274                 if (create_workqueue_thread(wq, name, cpu) < 0)
275                         destroy = 1;
276                 else
277                         wake_up_process(wq->cpu_wq[cpu].thread);
278         }
279         /*
280          * Was there any error during startup? If yes then clean up:
281          */
282         if (destroy) {
283                 destroy_workqueue(wq);
284                 wq = NULL;
285         }
286         return wq;
287 }
288
289 static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
290 {
291         struct cpu_workqueue_struct *cwq;
292
293         cwq = wq->cpu_wq + cpu;
294         if (cwq->thread)
295                 kthread_stop(cwq->thread);
296 }
297
298 void destroy_workqueue(struct workqueue_struct *wq)
299 {
300         int cpu;
301
302         flush_workqueue(wq);
303
304         for (cpu = 0; cpu < NR_CPUS; cpu++) {
305                 if (cpu_online(cpu))
306                         cleanup_workqueue_thread(wq, cpu);
307         }
308         kfree(wq);
309 }
310
311 static struct workqueue_struct *keventd_wq;
312
313 int schedule_work(struct work_struct *work)
314 {
315         return queue_work(keventd_wq, work);
316 }
317
318 int schedule_delayed_work(struct work_struct *work, unsigned long delay)
319 {
320         return queue_delayed_work(keventd_wq, work, delay);
321 }
322
323 void flush_scheduled_work(void)
324 {
325         flush_workqueue(keventd_wq);
326 }
327
328 int keventd_up(void)
329 {
330         return keventd_wq != NULL;
331 }
332
333 int current_is_keventd(void)
334 {
335         struct cpu_workqueue_struct *cwq;
336         int cpu;
337
338         BUG_ON(!keventd_wq);
339
340         for_each_cpu(cpu) {
341                 cwq = keventd_wq->cpu_wq + cpu;
342                 if (current == cwq->thread)
343                         return 1;
344         }
345         return 0;
346 }
347
348 void init_workqueues(void)
349 {
350         keventd_wq = create_workqueue("events");
351         BUG_ON(!keventd_wq);
352 }
353
354 EXPORT_SYMBOL_GPL(create_workqueue);
355 EXPORT_SYMBOL_GPL(queue_work);
356 EXPORT_SYMBOL_GPL(queue_delayed_work);
357 EXPORT_SYMBOL_GPL(flush_workqueue);
358 EXPORT_SYMBOL_GPL(destroy_workqueue);
359
360 EXPORT_SYMBOL(schedule_work);
361 EXPORT_SYMBOL(schedule_delayed_work);
362 EXPORT_SYMBOL(flush_scheduled_work);
363