The parallel\Channel class
(0.9.0)
Unbuffered Channels
An unbuffered channel will block on calls to parallel\Channel::send() until there is a receiver, and block on calls to parallel\Channel::recv() until there is a sender. This means an unbuffered channel is not only a way to share data among tasks but also a simple method of synchronization.
An unbuffered channel is the fastest way to share data among tasks, requiring the least copying.
Buffered Channels
A buffered channel will not block on calls to parallel\Channel::send() until capacity is reached, calls to parallel\Channel::recv() will block until there is data in the buffer.
Closures over Channels
A powerful feature of parallel channels is that they allow the exchange of closures between tasks (and runtimes).
When a closure is sent over a channel the closure is buffered, it doesn't change the buffering of the channel transmitting the closure, but it does effect the static scope inside the closure: The same closure sent to different runtimes, or the same runtime, will not share their static scope.
This means that whenever a closure is executed that was transmitted by a channel, static state will be as it was when the closure was buffered.
Anonymous Channels
The anonymous channel constructor allows the programmer to avoid assigning names to every channel: parallel will generate a unique name for anonymous channels.
クラス概要
目次
- parallel\Channel::__construct — Channel Construction
- parallel\Channel::make — Access
- parallel\Channel::open — Access
- parallel\Channel::recv — Sharing
- parallel\Channel::send — Sharing
- parallel\Channel::close — Closing
User Contributed Notes 5 notes
This is an example of using a channel to produce data for consumers. In this example, the producer Runtime instance will send the time in seconds in which the consumers shall sleep.
<?php
use parallel\{Runtime, Channel};
main($argv);
function main(array $argv)
{
if (count($argv) !== 3) {
echo "Type: hello-parallel.php <number-of-tasks> <maximum-time-of-sleep (in seconds)>" . PHP_EOL;
echo "Example: hello-parallel.php 5 3" . PHP_EOL;
die;
} else {
$numberOfTasks = intval($argv[1]);
$maximumTimeOfSleep = intval($argv[2]);
$t1 = microtime(true);
parallelize($numberOfTasks, $maximumTimeOfSleep);
$endTime = microtime(true) - $t1;
echo PHP_EOL."Finished $numberOfTasks task(s) in {$endTime}s".PHP_EOL;
}
}
function parallelize(int $numberOfTasks, int $maximumTimeOfSleep)
{
$channel = new Channel();
$taskIds = array_map(function () use ($maximumTimeOfSleep) {
return $id = uniqid("task::");
}, range(0, $numberOfTasks - 1));
$timesToSleep = array_map(function () use ($maximumTimeOfSleep) {
return rand(1, $maximumTimeOfSleep);
}, $taskIds);
$producer = new Runtime();
$producerFuture = $producer->run(function (Channel $channel, array $timesToSleep) {
foreach ($timesToSleep as $timeToSleep) {
$channel->send($timeToSleep);
}
}, [$channel, $timesToSleep]);
$consumerFutures = array_map(function (string $id) use ($channel) {
$runtime = new Runtime();
return $runtime->run(function (string $id, Channel $channel) {
$timeToSleep = $channel->recv();
echo "Hello from $id. I will sleep for $timeToSleep second(s).".PHP_EOL;
sleep($timeToSleep);
echo "$id slept for $timeToSleep second(s).".PHP_EOL;
return $timeToSleep;
}, [$id, $channel]);
}, $taskIds);
wait($consumerFutures);
wait([$producerFuture]);
}
function wait(array $futures)
{
return array_map(function ($future) {
return $future->value();
}, $futures);
}
an example used unbuffered channel.
<?php
use parallel\{Channel,Runtime};
$sum=function(array $a, Channel $ch) {
$sum=0;
foreach ($a as $v) {
$sum+=$v;
}
$ch->send($sum);
};
try {
$a=[7, 2, 8, 1, 4, 0, 9, 10];
//unbuffered channel
$runtime=new Runtime;
$ch2=new Channel;
$runtime->run($sum, [array_slice($a, 0, $num), $ch2]);
$runtime->run($sum, [array_slice($a, $num), $ch2]);
//receive from channel
$x=$ch2->recv();
$y=$ch2->recv();
$ch2->close();
echo "\nch2:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(Error $err) {
echo "\nError:", $err->getMessage();
} catch(Exception $e) {
echo "\nException:", $e->getMessage();
}
//output
//ch2:18 23 41
<?php
// the very weird way to calculate factorial ^_^
// we create one thread and synching them with buffered channel
// at fact only one thread is executing at the time
use parallel\{Channel, Future, Runtime};
for ($n = 0; $n <= 10; $n++) {
echo "!$n = " . factorial($n) . PHP_EOL;
}
/**
* Creates $n threads.
*/
function factorial(int $n): int
{
// buffered channel - using for sync threads ^_^
$channel = new Channel(1);
$futureList = [];
for ($i = 2; $i <= $n; $i++) {
$runtime = new Runtime();
$futureList[] = $runtime->run(
static function (Channel $channel, $multiplier): void {
$f = $channel->recv();
$channel->send($f * $multiplier);
},
[$channel, $i]
);
}
$channel->send(1);
// waiting until all threads are done
do {
$allDone = array_reduce(
$futureList,
function (bool $c, Future $future): bool {
return $c && $future->done();
},
true
);
} while (false === $allDone);
return $channel->recv();
}
// output:
// !0 = 1
// !1 = 1
// !2 = 2
// !3 = 6
// !4 = 24
// !5 = 120
// !6 = 720
// !7 = 5040
// !8 = 40320
// !9 = 362880
// !10 = 3628800
<?php
use parallel\Channel;
function sum(array $a, Channel $ch) {
$sum=0;
foreach ($a as $v) {
$sum+=$v;
}
$ch->send($sum);
}
try {
$a=[7, 2, 8, 1, 4, 0, 9, 10];
$ch1=Channel::make('sum', 2);
$ch2=new Channel;
$num=count($a) / 2;
sum(array_slice($a, 0, $num), $ch1);
sum(array_slice($a, $num), $ch1);
//receive from channel
$x=$ch1->recv();
$y=$ch1->recv();
$ch1->close();
echo "\nch1:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(Error $err) {
echo "\nError:", $err->getMessage();
} catch(Exception $e) {
echo "\nException:", $e->getMessage();
}
<?php
/**
* Bzz reloaded!
* Run two simple tasks in parallel and synchronize them with a channel
*
* parallel\Channel(int $capacity): Buffered channel
* Creates a buffered channel for communication between tasks
* @ref https://www.php.net/manual/en/class.parallel-channel.php
*/
echo "zzz... " . PHP_EOL;
// Create new buffered channel
$channel = new \parallel\Channel(2);
\parallel\run(
function($channel) {
$snaps_count = rand (8, 12);
echo "Number of snaps: $snaps_count" . PHP_EOL;
for ($i=1; $i<=$snaps_count; $i++) {
$other_sleep_time = rand(3, 5);
$my_sleep_time = rand(1, 3);
echo "Send sleep time to buffer" . PHP_EOL;
$start = microtime(true);
$channel->send($other_sleep_time);
$wait_time = microtime(true) - $start;
if ($wait_time > .1) {
echo "Buffer was full. I waited " . round($wait_time) . "s" . PHP_EOL;
}
echo "I sleep for {$my_sleep_time}s" . PHP_EOL;
sleep($my_sleep_time);
}
echo "I finished sleeping. Closing channel" . PHP_EOL;
$channel->close();
},
[$channel]
);
\parallel\run(
function($channel) {
try {
while(true) {
$my_sleep_time = $channel->recv();
echo "Other sleeps for {$my_sleep_time}s" . PHP_EOL;
sleep($my_sleep_time);
}
} catch(\parallel\Channel\Error\Closed $e) {
echo "Channel is closed. Other die.";
die;
}
},
[$channel]
);