#include <linux/completion.h>
#include <linux/workqueue.h>
#include <linux/slab.h>
+#include <linux/kthread.h>
/*
* The per-CPU workqueue.
struct workqueue_struct *wq;
task_t *thread;
- struct completion exit;
} ____cacheline_aligned;
struct cpu_workqueue_struct cpu_wq[NR_CPUS];
};
+/* Preempt must be disabled. */
+static void __queue_work(struct cpu_workqueue_struct *cwq,
+ struct work_struct *work)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&cwq->lock, flags);
+ work->wq_data = cwq;
+ list_add_tail(&work->entry, &cwq->worklist);
+ cwq->insert_sequence++;
+ wake_up(&cwq->more_work);
+ spin_unlock_irqrestore(&cwq->lock, flags);
+}
+
/*
* Queue work on a workqueue. Return non-zero if it was successfully
* added.
*/
int queue_work(struct workqueue_struct *wq, struct work_struct *work)
{
- unsigned long flags;
int ret = 0, cpu = get_cpu();
- struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
if (!test_and_set_bit(0, &work->pending)) {
BUG_ON(!list_empty(&work->entry));
- work->wq_data = cwq;
-
- spin_lock_irqsave(&cwq->lock, flags);
- list_add_tail(&work->entry, &cwq->worklist);
- cwq->insert_sequence++;
- wake_up(&cwq->more_work);
- spin_unlock_irqrestore(&cwq->lock, flags);
+ __queue_work(wq->cpu_wq + cpu, work);
ret = 1;
}
put_cpu();
static void delayed_work_timer_fn(unsigned long __data)
{
struct work_struct *work = (struct work_struct *)__data;
- struct cpu_workqueue_struct *cwq = work->wq_data;
- unsigned long flags;
+ struct workqueue_struct *wq = work->wq_data;
- /*
- * Do the wakeup within the spinlock, so that flushing
- * can be done in a guaranteed way.
- */
- spin_lock_irqsave(&cwq->lock, flags);
- list_add_tail(&work->entry, &cwq->worklist);
- cwq->insert_sequence++;
- wake_up(&cwq->more_work);
- spin_unlock_irqrestore(&cwq->lock, flags);
+ __queue_work(wq->cpu_wq + smp_processor_id(), work);
}
int queue_delayed_work(struct workqueue_struct *wq,
struct work_struct *work, unsigned long delay)
{
- int ret = 0, cpu = get_cpu();
+ int ret = 0;
struct timer_list *timer = &work->timer;
- struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
if (!test_and_set_bit(0, &work->pending)) {
BUG_ON(timer_pending(timer));
BUG_ON(!list_empty(&work->entry));
- work->wq_data = cwq;
+ /* This stores wq for the moment, for the timer_fn */
+ work->wq_data = wq;
timer->expires = jiffies + delay;
timer->data = (unsigned long)work;
timer->function = delayed_work_timer_fn;
add_timer(timer);
ret = 1;
}
- put_cpu();
return ret;
}
spin_unlock_irqrestore(&cwq->lock, flags);
}
-typedef struct startup_s {
- struct cpu_workqueue_struct *cwq;
- struct completion done;
- const char *name;
-} startup_t;
-
-static int worker_thread(void *__startup)
+static int worker_thread(void *__cwq)
{
- startup_t *startup = __startup;
- struct cpu_workqueue_struct *cwq = startup->cwq;
+ struct cpu_workqueue_struct *cwq = __cwq;
int cpu = cwq - cwq->wq->cpu_wq;
DECLARE_WAITQUEUE(wait, current);
struct k_sigaction sa;
+ sigset_t blocked;
- daemonize("%s/%d", startup->name, cpu);
current->flags |= PF_IOTHREAD;
- cwq->thread = current;
set_user_nice(current, -10);
- set_cpus_allowed(current, cpumask_of_cpu(cpu));
+ BUG_ON(smp_processor_id() != cpu);
- complete(&startup->done);
+ /* Block and flush all signals */
+ sigfillset(&blocked);
+ sigprocmask(SIG_BLOCK, &blocked, NULL);
+ flush_signals(current);
/* SIG_IGN makes children autoreap: see do_notify_parent(). */
sa.sa.sa_handler = SIG_IGN;
siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
- for (;;) {
+ while (!kthread_should_stop()) {
set_task_state(current, TASK_INTERRUPTIBLE);
add_wait_queue(&cwq->more_work, &wait);
- if (!cwq->thread)
- break;
if (list_empty(&cwq->worklist))
schedule();
else
if (!list_empty(&cwq->worklist))
run_workqueue(cwq);
}
- remove_wait_queue(&cwq->more_work, &wait);
- complete(&cwq->exit);
-
return 0;
}
const char *name,
int cpu)
{
- startup_t startup;
struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
- int ret;
+ struct task_struct *p;
spin_lock_init(&cwq->lock);
cwq->wq = wq;
INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work);
init_waitqueue_head(&cwq->work_done);
- init_completion(&cwq->exit);
-
- init_completion(&startup.done);
- startup.cwq = cwq;
- startup.name = name;
- ret = kernel_thread(worker_thread, &startup, CLONE_FS | CLONE_FILES);
- if (ret >= 0) {
- wait_for_completion(&startup.done);
- BUG_ON(!cwq->thread);
- }
- return ret;
+
+ p = kthread_create(worker_thread, cwq, "%s/%d", name, cpu);
+ if (IS_ERR(p))
+ return PTR_ERR(p);
+ cwq->thread = p;
+ kthread_bind(p, cpu);
+ return 0;
}
struct workqueue_struct *create_workqueue(const char *name)
continue;
if (create_workqueue_thread(wq, name, cpu) < 0)
destroy = 1;
+ else
+ wake_up_process(wq->cpu_wq[cpu].thread);
}
/*
* Was there any error during startup? If yes then clean up:
struct cpu_workqueue_struct *cwq;
cwq = wq->cpu_wq + cpu;
- if (cwq->thread) {
- /* Tell thread to exit and wait for it. */
- cwq->thread = NULL;
- wake_up(&cwq->more_work);
-
- wait_for_completion(&cwq->exit);
- }
+ if (cwq->thread)
+ kthread_stop(cwq->thread);
}
void destroy_workqueue(struct workqueue_struct *wq)
flush_workqueue(keventd_wq);
}
+int keventd_up(void)
+{
+ return keventd_wq != NULL;
+}
+
int current_is_keventd(void)
{
struct cpu_workqueue_struct *cwq;