// SPDX-License-Identifier: GPL-2.0 /* * padata.c - generic interface to process data streams in parallel * * See Documentation/core-api/padata.rst for more information. * * Copyright (C) 2008, 2009 secunet Security Networks AG * Copyright (C) 2008, 2009 Steffen Klassert <steffen.klassert@secunet.com> * * Copyright (c) 2020 Oracle and/or its affiliates. * Author: Daniel Jordan <daniel.m.jordan@oracle.com>
*/
staticint padata_cpu_hash(struct parallel_data *pd, unsignedint seq_nr)
{ /* * Hash the sequence numbers to the cpus by taking * seq_nr mod. number of cpus in use.
*/ int cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu);
/* * This function is marked __ref because this function may be optimized in such * a way that it directly refers to work_fn's address, which causes modpost to * complain when work_fn is marked __init. This scenario was observed with clang * LTO, where padata_work_init() was optimized to refer directly to * padata_mt_helper() because the calls to padata_work_init() with other work_fn * values were eliminated or inlined.
*/ staticvoid __ref padata_work_init(struct padata_work *pw, work_func_t work_fn, void *data, int flags)
{ if (flags & PADATA_WORK_ONSTACK)
INIT_WORK_ONSTACK(&pw->pw_work, work_fn); else
INIT_WORK(&pw->pw_work, work_fn);
pw->pw_data = data;
}
spin_lock_bh(&padata_works_lock); /* Start at 1 because the current task participates in the job. */ for (i = 1; i < nworks; ++i) { struct padata_work *pw = padata_work_alloc();
/** * padata_do_parallel - padata parallelization function * * @ps: padatashell * @padata: object to be parallelized * @cb_cpu: pointer to the CPU that the serialization callback function should * run on. If it's not in the serial cpumask of @pinst * (i.e. cpumask.cbcpu), this function selects a fallback CPU and if * none found, returns -EINVAL. * * The parallelization callback function will run with BHs off. * Note: Every object which is parallelized by padata_do_parallel * must be seen by padata_do_serial. * * Return: 0 on success or else negative error code.
*/ int padata_do_parallel(struct padata_shell *ps, struct padata_priv *padata, int *cb_cpu)
{ struct padata_instance *pinst = ps->pinst; struct parallel_data *pd; struct padata_work *pw; int cpu_index, err;
if (!pw) { /* Maximum works limit exceeded, run in the current task. */
padata->parallel(padata);
}
rcu_read_unlock_bh();
if (pw) {
padata_work_init(pw, padata_parallel_worker, padata, 0);
queue_work(pinst->parallel_wq, &pw->pw_work);
}
return 0;
out:
rcu_read_unlock_bh();
return err;
}
EXPORT_SYMBOL(padata_do_parallel);
/* * padata_find_next - Find the next object that needs serialization. * * Return: * * A pointer to the control struct of the next object that needs * serialization, if present in one of the percpu reorder queues. * * NULL, if the next object that needs serialization will * be parallel processed by another cpu and is not yet present in * the cpu's reorder queue.
*/ staticstruct padata_priv *padata_find_next(struct parallel_data *pd, int cpu, unsignedint processed)
{ struct padata_priv *padata; struct padata_list *reorder;
reorder = per_cpu_ptr(pd->reorder_list, cpu);
spin_lock(&reorder->lock); if (list_empty(&reorder->list)) goto notfound;
/* * Checks the rare case where two or more parallel jobs have hashed to * the same CPU and one of the later ones finishes first.
*/ if (padata->seq_nr != processed) goto notfound;
do { struct padata_serial_queue *squeue; int cb_cpu;
processed++; /* When sequence wraps around, reset to the first CPU. */ if (unlikely(processed == 0))
cpu = cpumask_first(pd->cpumask.pcpu); else
cpu = cpumask_next_wrap(cpu, pd->cpumask.pcpu);
/* * If the next object that needs serialization is parallel * processed by another cpu and is still on it's way to the * cpu's reorder queue, end the loop.
*/
padata = padata_find_next(pd, cpu, processed);
spin_unlock(&squeue->serial.lock);
} while (padata);
}
/** * padata_do_serial - padata serialization function * * @padata: object to be serialized. * * padata_do_serial must be called for every parallelized object. * The serialization callback function will run with BHs off.
*/ void padata_do_serial(struct padata_priv *padata)
{ struct parallel_data *pd = padata->pd; int hashed_cpu = padata_cpu_hash(pd, padata->seq_nr); struct padata_list *reorder = per_cpu_ptr(pd->reorder_list, hashed_cpu); struct padata_priv *cur; struct list_head *pos; bool gotit = true;
spin_lock(&reorder->lock); /* Sort in ascending order of sequence number. */
list_for_each_prev(pos, &reorder->list) {
cur = list_entry(pos, struct padata_priv, list); /* Compare by difference to consider integer wrap around */ if ((signedint)(cur->seq_nr - padata->seq_nr) < 0) break;
} if (padata->seq_nr != pd->processed) {
gotit = false;
list_add(&padata->list, pos);
}
spin_unlock(&reorder->lock);
if (gotit)
padata_reorder(padata);
}
EXPORT_SYMBOL(padata_do_serial);
staticint padata_setup_cpumasks(struct padata_instance *pinst)
{ struct workqueue_attrs *attrs; int err;
attrs = alloc_workqueue_attrs(); if (!attrs) return -ENOMEM;
/** * padata_do_multithreaded - run a multithreaded job * @job: Description of the job. * * See the definition of struct padata_mt_job for more details.
*/ void __init padata_do_multithreaded(struct padata_mt_job *job)
{ /* In case threads finish at different times. */ staticconstunsignedlong load_balance_factor = 4; struct padata_work my_work, *pw; struct padata_mt_job_state ps;
LIST_HEAD(works); int nworks, nid; static atomic_t last_used_nid __initdata;
if (job->size == 0) return;
/* Ensure at least one thread when size < min_chunk. */
nworks = max(job->size / max(job->min_chunk, job->align), 1ul);
nworks = min(nworks, job->max_threads);
if (nworks == 1) { /* Single thread, no coordination needed, cut to the chase. */
job->thread_fn(job->start, job->start + job->size, job->fn_arg); return;
}
/* * Chunk size is the amount of work a helper does per call to the * thread function. Load balance large jobs between threads by * increasing the number of chunks, guarantee at least the minimum * chunk size from the caller, and honor the caller's alignment. * Ensure chunk_size is at least 1 to prevent divide-by-0 * panic in padata_mt_helper().
*/
ps.chunk_size = job->size / (ps.nworks * load_balance_factor);
ps.chunk_size = max(ps.chunk_size, job->min_chunk);
ps.chunk_size = max(ps.chunk_size, 1ul);
ps.chunk_size = roundup(ps.chunk_size, job->align);
list_for_each_entry(pw, &works, pw_list) if (job->numa_aware) { int old_node = atomic_read(&last_used_nid);
do {
nid = next_node_in(old_node, node_states[N_CPU]);
} while (!atomic_try_cmpxchg(&last_used_nid, &old_node, nid));
queue_work_node(nid, system_unbound_wq, &pw->pw_work);
} else {
queue_work(system_unbound_wq, &pw->pw_work);
}
/* Use the current thread, which saves starting a workqueue worker. */
padata_work_init(&my_work, padata_mt_helper, &ps, PADATA_WORK_ONSTACK);
padata_mt_helper(&my_work.pw_work);
/* Wait for all the helpers to finish. */
wait_for_completion(&ps.completion);
/* Initialize all percpu queues used by serial workers */ staticvoid padata_init_squeues(struct parallel_data *pd)
{ int cpu; struct padata_serial_queue *squeue;
pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL); if (!pd) goto err;
pd->reorder_list = alloc_percpu(struct padata_list); if (!pd->reorder_list) goto err_free_pd;
pd->squeue = alloc_percpu(struct padata_serial_queue); if (!pd->squeue) goto err_free_reorder_list;
pd->ps = ps;
if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL)) goto err_free_squeue; if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL)) goto err_free_pcpu;
/* If cpumask contains no active cpu, we mark the instance as invalid. */ staticbool padata_validate_cpumask(struct padata_instance *pinst, conststruct cpumask *cpumask)
{ if (!cpumask_intersects(cpumask, cpu_online_mask)) {
pinst->flags |= PADATA_INVALID; returnfalse;
}
pinst->flags &= ~PADATA_INVALID; returntrue;
}
staticint __padata_set_cpumasks(struct padata_instance *pinst,
cpumask_var_t pcpumask,
cpumask_var_t cbcpumask)
{ int valid; int err;
/** * padata_set_cpumask - Sets specified by @cpumask_type cpumask to the value * equivalent to @cpumask. * @pinst: padata instance * @cpumask_type: PADATA_CPU_SERIAL or PADATA_CPU_PARALLEL corresponding * to parallel and serial cpumasks respectively. * @cpumask: the cpumask to use * * Return: 0 on success or negative error code
*/ int padata_set_cpumask(struct padata_instance *pinst, int cpumask_type,
cpumask_var_t cpumask)
{ struct cpumask *serial_mask, *parallel_mask; int err = -EINVAL;
/** * padata_alloc - allocate and initialize a padata instance * @name: used to identify the instance * * Return: new instance on success, NULL on error
*/ struct padata_instance *padata_alloc(constchar *name)
{ struct padata_instance *pinst;
pinst = kzalloc(sizeof(struct padata_instance), GFP_KERNEL); if (!pinst) goto err;
pinst->parallel_wq = alloc_workqueue("%s_parallel", WQ_UNBOUND, 0,
name); if (!pinst->parallel_wq) goto err_free_inst;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.