/*
* padata.c - generic interface to process data streams in parallel
*
+ * See Documentation/padata.txt for an api documentation.
+ *
* Copyright (C) 2008, 2009 secunet Security Networks AG
* Copyright (C) 2008, 2009 Steffen Klassert <steffen.klassert@secunet.com>
*
* 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#include <linux/module.h>
+#include <linux/export.h>
#include <linux/cpumask.h>
#include <linux/err.h>
#include <linux/cpu.h>
#include <linux/sysfs.h>
#include <linux/rcupdate.h>
-#define MAX_SEQ_NR (INT_MAX - NR_CPUS)
#define MAX_OBJ_NUM 1000
static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index)
return target_cpu;
}
-static int padata_cpu_hash(struct padata_priv *padata)
+static int padata_cpu_hash(struct parallel_data *pd)
{
int cpu_index;
- struct parallel_data *pd;
-
- pd = padata->pd;
/*
* Hash the sequence numbers to the cpus by taking
* seq_nr mod. number of cpus in use.
*/
- cpu_index = padata->seq_nr % cpumask_weight(pd->cpumask.pcpu);
+
+ spin_lock(&pd->seq_lock);
+ cpu_index = pd->seq_nr % cpumask_weight(pd->cpumask.pcpu);
+ pd->seq_nr++;
+ spin_unlock(&pd->seq_lock);
return padata_index_to_cpu(pd, cpu_index);
}
pd = rcu_dereference(pinst->pd);
err = -EINVAL;
- if (!(pinst->flags & PADATA_INIT))
+ if (!(pinst->flags & PADATA_INIT) || pinst->flags & PADATA_INVALID)
goto out;
if (!cpumask_test_cpu(cb_cpu, pd->cpumask.cbcpu))
padata->pd = pd;
padata->cb_cpu = cb_cpu;
- if (unlikely(atomic_read(&pd->seq_nr) == pd->max_seq_nr))
- atomic_set(&pd->seq_nr, -1);
-
- padata->seq_nr = atomic_inc_return(&pd->seq_nr);
-
- target_cpu = padata_cpu_hash(padata);
+ target_cpu = padata_cpu_hash(pd);
queue = per_cpu_ptr(pd->pqueue, target_cpu);
spin_lock(&queue->parallel.lock);
static struct padata_priv *padata_get_next(struct parallel_data *pd)
{
int cpu, num_cpus;
- int next_nr, next_index;
+ unsigned int next_nr, next_index;
struct padata_parallel_queue *queue, *next_queue;
struct padata_priv *padata;
struct padata_list *reorder;
cpu = padata_index_to_cpu(pd, next_index);
next_queue = per_cpu_ptr(pd->pqueue, cpu);
- if (unlikely(next_nr > pd->max_seq_nr)) {
- next_nr = next_nr - pd->max_seq_nr - 1;
- next_index = next_nr % num_cpus;
- cpu = padata_index_to_cpu(pd, next_index);
- next_queue = per_cpu_ptr(pd->pqueue, cpu);
- pd->processed = 0;
- }
-
padata = NULL;
reorder = &next_queue->reorder;
padata = list_entry(reorder->list.next,
struct padata_priv, list);
- BUG_ON(next_nr != padata->seq_nr);
-
spin_lock(&reorder->lock);
list_del_init(&padata->list);
atomic_dec(&pd->reorder_objects);
static void padata_reorder(struct parallel_data *pd)
{
+ int cb_cpu;
struct padata_priv *padata;
struct padata_serial_queue *squeue;
struct padata_instance *pinst = pd->pinst;
/*
* This cpu has to do the parallel processing of the next
* object. It's waiting in the cpu's parallelization queue,
- * so exit imediately.
+ * so exit immediately.
*/
if (PTR_ERR(padata) == -ENODATA) {
del_timer(&pd->timer);
return;
}
- squeue = per_cpu_ptr(pd->squeue, padata->cb_cpu);
+ cb_cpu = padata->cb_cpu;
+ squeue = per_cpu_ptr(pd->squeue, cb_cpu);
spin_lock(&squeue->serial.lock);
list_add_tail(&padata->list, &squeue->serial.list);
spin_unlock(&squeue->serial.lock);
- queue_work_on(padata->cb_cpu, pinst->wq, &squeue->work);
+ queue_work_on(cb_cpu, pinst->wq, &squeue->work);
}
spin_unlock_bh(&pd->lock);
/*
* The next object that needs serialization might have arrived to
* the reorder queues in the meantime, we will be called again
- * from the timer function if noone else cares for it.
+ * from the timer function if no one else cares for it.
*/
if (atomic_read(&pd->reorder_objects)
&& !(pinst->flags & PADATA_RESET))
if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL))
return -ENOMEM;
- cpumask_and(pd->cpumask.pcpu, pcpumask, cpu_active_mask);
+ cpumask_and(pd->cpumask.pcpu, pcpumask, cpu_online_mask);
if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL)) {
free_cpumask_var(pd->cpumask.cbcpu);
return -ENOMEM;
}
- cpumask_and(pd->cpumask.cbcpu, cbcpumask, cpu_active_mask);
+ cpumask_and(pd->cpumask.cbcpu, cbcpumask, cpu_online_mask);
return 0;
}
/* Initialize all percpu queues used by parallel workers */
static void padata_init_pqueues(struct parallel_data *pd)
{
- int cpu_index, num_cpus, cpu;
+ int cpu_index, cpu;
struct padata_parallel_queue *pqueue;
cpu_index = 0;
INIT_WORK(&pqueue->work, padata_parallel_worker);
atomic_set(&pqueue->num_obj, 0);
}
-
- num_cpus = cpumask_weight(pd->cpumask.pcpu);
- pd->max_seq_nr = num_cpus ? (MAX_SEQ_NR / num_cpus) * num_cpus - 1 : 0;
}
/* Allocate and initialize the internal cpumask dependend resources. */
padata_init_pqueues(pd);
padata_init_squeues(pd);
setup_timer(&pd->timer, padata_reorder_timer, (unsigned long)pd);
- atomic_set(&pd->seq_nr, -1);
+ pd->seq_nr = 0;
atomic_set(&pd->reorder_objects, 0);
atomic_set(&pd->refcnt, 0);
pd->pinst = pinst;
put_online_cpus();
}
-/* Replace the internal control stucture with a new one. */
+/* Replace the internal control structure with a new one. */
static void padata_replace(struct padata_instance *pinst,
struct parallel_data *pd_new)
{
if (notification_mask)
blocking_notifier_call_chain(&pinst->cpumask_change_notifier,
- notification_mask, pinst);
+ notification_mask,
+ &pd_new->cpumask);
pinst->flags &= ~PADATA_RESET;
}
static bool padata_validate_cpumask(struct padata_instance *pinst,
const struct cpumask *cpumask)
{
- if (!cpumask_intersects(cpumask, cpu_active_mask)) {
+ if (!cpumask_intersects(cpumask, cpu_online_mask)) {
pinst->flags |= PADATA_INVALID;
return false;
}
return true;
}
+static int __padata_set_cpumasks(struct padata_instance *pinst,
+ cpumask_var_t pcpumask,
+ cpumask_var_t cbcpumask)
+{
+ int valid;
+ struct parallel_data *pd;
+
+ valid = padata_validate_cpumask(pinst, pcpumask);
+ if (!valid) {
+ __padata_stop(pinst);
+ goto out_replace;
+ }
+
+ valid = padata_validate_cpumask(pinst, cbcpumask);
+ if (!valid)
+ __padata_stop(pinst);
+
+out_replace:
+ pd = padata_alloc_pd(pinst, pcpumask, cbcpumask);
+ if (!pd)
+ return -ENOMEM;
+
+ cpumask_copy(pinst->cpumask.pcpu, pcpumask);
+ cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);
+
+ padata_replace(pinst, pd);
+
+ if (valid)
+ __padata_start(pinst);
+
+ return 0;
+}
+
/**
- * padata_get_cpumask: Fetch serial or parallel cpumask from the
- * given padata instance and copy it to @out_mask
+ * padata_set_cpumasks - Set both parallel and serial cpumasks. The first
+ * one is used by parallel workers and the second one
+ * by the wokers doing serialization.
*
- * @pinst: A pointer to padata instance
- * @cpumask_type: Specifies which cpumask will be copied.
- * Possible values are PADATA_CPU_SERIAL *or* PADATA_CPU_PARALLEL
- * corresponding to serial and parallel cpumask respectively.
- * @out_mask: A pointer to cpumask structure where selected
- * cpumask will be copied.
+ * @pinst: padata instance
+ * @pcpumask: the cpumask to use for parallel workers
+ * @cbcpumask: the cpumsak to use for serial workers
*/
-int padata_get_cpumask(struct padata_instance *pinst,
- int cpumask_type, struct cpumask *out_mask)
+int padata_set_cpumasks(struct padata_instance *pinst, cpumask_var_t pcpumask,
+ cpumask_var_t cbcpumask)
{
- struct parallel_data *pd;
- int ret = 0;
+ int err;
- rcu_read_lock_bh();
- pd = rcu_dereference(pinst->pd);
- switch (cpumask_type) {
- case PADATA_CPU_SERIAL:
- cpumask_copy(out_mask, pd->cpumask.cbcpu);
- break;
- case PADATA_CPU_PARALLEL:
- cpumask_copy(out_mask, pd->cpumask.pcpu);
- break;
- default:
- ret = -EINVAL;
- }
+ mutex_lock(&pinst->lock);
+ get_online_cpus();
+
+ err = __padata_set_cpumasks(pinst, pcpumask, cbcpumask);
+
+ put_online_cpus();
+ mutex_unlock(&pinst->lock);
+
+ return err;
- rcu_read_unlock_bh();
- return ret;
}
-EXPORT_SYMBOL(padata_get_cpumask);
+EXPORT_SYMBOL(padata_set_cpumasks);
/**
* padata_set_cpumask: Sets specified by @cpumask_type cpumask to the value
cpumask_var_t cpumask)
{
struct cpumask *serial_mask, *parallel_mask;
+ int err = -EINVAL;
+
+ mutex_lock(&pinst->lock);
+ get_online_cpus();
switch (cpumask_type) {
case PADATA_CPU_PARALLEL:
serial_mask = cpumask;
break;
default:
- return -EINVAL;
+ goto out;
}
- return __padata_set_cpumasks(pinst, parallel_mask, serial_mask);
-}
-EXPORT_SYMBOL(padata_set_cpumask);
-
-/**
- * __padata_set_cpumasks - Set both parallel and serial cpumasks. The first
- * one is used by parallel workers and the second one
- * by the wokers doing serialization.
- *
- * @pinst: padata instance
- * @pcpumask: the cpumask to use for parallel workers
- * @cbcpumask: the cpumsak to use for serial workers
- */
-int __padata_set_cpumasks(struct padata_instance *pinst,
- cpumask_var_t pcpumask, cpumask_var_t cbcpumask)
-{
- int valid;
- int err = 0;
- struct parallel_data *pd = NULL;
-
- mutex_lock(&pinst->lock);
- get_online_cpus();
-
- valid = padata_validate_cpumask(pinst, pcpumask);
- if (!valid) {
- __padata_stop(pinst);
- goto out_replace;
- }
-
- valid = padata_validate_cpumask(pinst, cbcpumask);
- if (!valid)
- __padata_stop(pinst);
-
-out_replace:
- pd = padata_alloc_pd(pinst, pcpumask, cbcpumask);
- if (!pd) {
- err = -ENOMEM;
- goto out;
- }
-
- cpumask_copy(pinst->cpumask.pcpu, pcpumask);
- cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);
-
- padata_replace(pinst, pd);
-
- if (valid)
- __padata_start(pinst);
+ err = __padata_set_cpumasks(pinst, parallel_mask, serial_mask);
out:
put_online_cpus();
mutex_unlock(&pinst->lock);
return err;
-
}
-EXPORT_SYMBOL(__padata_set_cpumasks);
+EXPORT_SYMBOL(padata_set_cpumask);
static int __padata_add_cpu(struct padata_instance *pinst, int cpu)
{
struct parallel_data *pd;
- if (cpumask_test_cpu(cpu, cpu_active_mask)) {
+ if (cpumask_test_cpu(cpu, cpu_online_mask)) {
pd = padata_alloc_pd(pinst, pinst->cpumask.pcpu,
pinst->cpumask.cbcpu);
if (!pd)
return -ENOMEM;
padata_replace(pinst, pd);
+
+ cpumask_clear_cpu(cpu, pd->cpumask.cbcpu);
+ cpumask_clear_cpu(cpu, pd->cpumask.pcpu);
}
return 0;
}
/**
- * padata_remove_cpu - remove a cpu from the one or both(serial and paralell)
+ * padata_remove_cpu - remove a cpu from the one or both(serial and parallel)
* padata cpumasks.
*
* @pinst: padata instance
err = __padata_add_cpu(pinst, cpu);
mutex_unlock(&pinst->lock);
if (err)
- return NOTIFY_BAD;
+ return notifier_from_errno(err);
break;
case CPU_DOWN_PREPARE:
err = __padata_remove_cpu(pinst, cpu);
mutex_unlock(&pinst->lock);
if (err)
- return NOTIFY_BAD;
+ return notifier_from_errno(err);
break;
case CPU_UP_CANCELED:
};
/**
- * padata_alloc - Allocate and initialize padata instance.
- * Use default cpumask(cpu_possible_mask)
- * for serial and parallel workes.
+ * padata_alloc_possible - Allocate and initialize padata instance.
+ * Use the cpu_possible_mask for serial and
+ * parallel workers.
*
* @wq: workqueue to use for the allocated padata instance
*/
-struct padata_instance *padata_alloc(struct workqueue_struct *wq)
+struct padata_instance *padata_alloc_possible(struct workqueue_struct *wq)
{
- return __padata_alloc(wq, cpu_possible_mask, cpu_possible_mask);
+ return padata_alloc(wq, cpu_possible_mask, cpu_possible_mask);
}
-EXPORT_SYMBOL(padata_alloc);
+EXPORT_SYMBOL(padata_alloc_possible);
/**
- * __padata_alloc - allocate and initialize a padata instance
- * and specify cpumasks for serial and parallel workers.
+ * padata_alloc - allocate and initialize a padata instance and specify
+ * cpumasks for serial and parallel workers.
*
* @wq: workqueue to use for the allocated padata instance
* @pcpumask: cpumask that will be used for padata parallelization
* @cbcpumask: cpumask that will be used for padata serialization
*/
-struct padata_instance *__padata_alloc(struct workqueue_struct *wq,
- const struct cpumask *pcpumask,
- const struct cpumask *cbcpumask)
+struct padata_instance *padata_alloc(struct workqueue_struct *wq,
+ const struct cpumask *pcpumask,
+ const struct cpumask *cbcpumask)
{
struct padata_instance *pinst;
struct parallel_data *pd = NULL;
err:
return NULL;
}
-EXPORT_SYMBOL(__padata_alloc);
+EXPORT_SYMBOL(padata_alloc);
/**
* padata_free - free a padata instance