Documentation

AsyncPool
in package

Manages general-purpose worker threads used for processing asynchronous tasks, and the tasks submitted to those workers.

Table of Contents

WORKER_START_OPTIONS  = \PTHREADS_INHERIT_INI
$size  : int
$classLoader  : ClassLoader
$eventLoop  : SleeperHandler
$logger  : ThreadedLogger
$taskQueues  : array<string|int, SplQueue>|array<string|int, array<string|int, AsyncTask>>
$workerLastUsed  : array<string|int, int>
$workerMemoryLimit  : int
$workers  : array<string|int, AsyncWorker>
$workerStartHooks  : array<string|int, Closure>
__construct()  : mixed
addWorkerStartHook()  : void
Registers a Closure callback to be fired whenever a new worker is started by the pool.
collectTasks()  : bool
Collects finished and/or crashed tasks from the workers, firing their on-completion hooks where appropriate.
collectTasksFromWorker()  : bool
getRunningWorkers()  : array<string|int, int>
Returns an array of IDs of currently running workers.
getSize()  : int
Returns the maximum size of the pool. Note that there may be less active workers than this number.
getTaskQueueSizes()  : array<string|int, int>
Returns an array of worker ID => task queue size
increaseSize()  : void
Increases the maximum size of the pool to the specified amount. This does not immediately start new workers.
removeWorkerStartHook()  : void
Removes a previously-registered callback listening for workers being started.
selectWorker()  : int
Selects a worker ID to run a task.
shutdown()  : void
Cancels all pending tasks and shuts down all the workers in the pool.
shutdownUnusedWorkers()  : int
submitTask()  : int
Submits an AsyncTask to the worker with the least load. If all workers are busy and the pool is not full, a new worker may be started.
submitTaskToWorker()  : void
Submits an AsyncTask to an arbitrary worker.
getWorker()  : AsyncWorker
Fetches the worker with the specified ID, starting it if it does not exist, and firing any registered worker start hooks.

Constants

WORKER_START_OPTIONS

private mixed WORKER_START_OPTIONS = \PTHREADS_INHERIT_INI

Properties

$taskQueues

private array<string|int, SplQueue>|array<string|int, array<string|int, AsyncTask>> $taskQueues = []
Tags
phpstan-var

array<int, \SplQueue<AsyncTask>>

$workerLastUsed

private array<string|int, int> $workerLastUsed = []
Tags
phpstan-var

array<int, int>

$workerMemoryLimit

private int $workerMemoryLimit

$workers

private array<string|int, AsyncWorker> $workers = []
Tags
phpstan-var

array<int, AsyncWorker>

$workerStartHooks

private array<string|int, Closure> $workerStartHooks = []
Tags
phpstan-var

(\Closure(int $workerId) : void)[]

Methods

addWorkerStartHook()

Registers a Closure callback to be fired whenever a new worker is started by the pool.

public addWorkerStartHook(Closure $hook) : void

The signature should be function(int $worker) : void

This function will call the hook for every already-running worker.

Parameters
$hook : Closure
Tags
phpstan-param

\Closure(int $workerId) : void $hook

Return values
void

collectTasks()

Collects finished and/or crashed tasks from the workers, firing their on-completion hooks where appropriate.

public collectTasks() : bool
Tags
throws
ReflectionException
Return values
bool

whether there are tasks left to be collected

collectTasksFromWorker()

public collectTasksFromWorker(int $worker) : bool
Parameters
$worker : int
Return values
bool

getRunningWorkers()

Returns an array of IDs of currently running workers.

public getRunningWorkers() : array<string|int, int>
Return values
array<string|int, int>

getSize()

Returns the maximum size of the pool. Note that there may be less active workers than this number.

public getSize() : int
Return values
int

getTaskQueueSizes()

Returns an array of worker ID => task queue size

public getTaskQueueSizes() : array<string|int, int>
Tags
phpstan-return

array<int, int>

Return values
array<string|int, int>

increaseSize()

Increases the maximum size of the pool to the specified amount. This does not immediately start new workers.

public increaseSize(int $newSize) : void
Parameters
$newSize : int
Return values
void

removeWorkerStartHook()

Removes a previously-registered callback listening for workers being started.

public removeWorkerStartHook(Closure $hook) : void
Parameters
$hook : Closure
Tags
phpstan-param

\Closure(int $workerId) : void $hook

Return values
void

selectWorker()

Selects a worker ID to run a task.

public selectWorker() : int
  • if an idle worker is found, it will be selected
  • else, if the worker pool is not full, a new worker will be selected
  • else, the worker with the smallest backlog is chosen.
Return values
int

shutdown()

Cancels all pending tasks and shuts down all the workers in the pool.

public shutdown() : void
Return values
void

shutdownUnusedWorkers()

public shutdownUnusedWorkers() : int
Return values
int

submitTask()

Submits an AsyncTask to the worker with the least load. If all workers are busy and the pool is not full, a new worker may be started.

public submitTask(AsyncTask $task) : int
Parameters
$task : AsyncTask
Return values
int

submitTaskToWorker()

Submits an AsyncTask to an arbitrary worker.

public submitTaskToWorker(AsyncTask $task, int $worker) : void
Parameters
$task : AsyncTask
$worker : int
Return values
void

getWorker()

Fetches the worker with the specified ID, starting it if it does not exist, and firing any registered worker start hooks.

private getWorker(int $worker) : AsyncWorker
Parameters
$worker : int
Return values
AsyncWorker

Search results