Skip to content

Instantly share code, notes, and snippets.

@johnidevo
Forked from KendallHopkins/thread.class.php
Created November 21, 2022 15:05
Show Gist options
  • Save johnidevo/cd8f4febba50f30f9397f46bf0b32800 to your computer and use it in GitHub Desktop.
Save johnidevo/cd8f4febba50f30f9397f46bf0b32800 to your computer and use it in GitHub Desktop.
<?php
class Thread
{
private $_socket;
private $_pid = NULL;
function __construct( Closure $closure )
{
$ary = array();
if( socket_create_pair( AF_UNIX, SOCK_STREAM, 0, $ary ) === false ) {
throw new RuntimeException( "socket_create_pair() failed. Reason: ".socket_strerror( socket_last_error() ) );
}
$pid = pcntl_fork();
if( $pid == -1 ) {
throw new RuntimeException( 'Could not fork Process.' );
} elseif( $pid ) {
$this->_pid = $pid;
socket_close( $ary[0] );
$this->_socket = $ary[1];
} else {
socket_close( $ary[1] );
$this->_socket = $ary[0];
$closure( $this );
exit();
}
}
function __destruct()
{
if( $this->_pid ) {
$this->join();
}
if( $this->_socket ) {
socket_close( $this->_socket );
}
}
function write( $string )
{
$string_length = strlen( $string );
if( socket_write( $this->_socket, pack( "L", $string_length ).$string, 4 + $string_length ) === FALSE ) {
throw new RuntimeException( "socket_write() failed. Reason: ".socket_strerror( socket_last_error( $this->_socket ) ) );
}
}
function read()
{
if( ( $read_data = socket_read( $this->_socket, 4, PHP_BINARY_READ ) ) === FALSE ) {
throw new RuntimeException( "socket_read() failed. Reason: ".socket_strerror( socket_last_error( $this->_socket ) ) );
}
if( $read_data === "" ) return NULL;
list( ,$length ) = unpack( "L", $read_data );
if( ( $read_data = socket_read( $this->_socket, $length, PHP_BINARY_READ ) ) === FALSE ) {
throw new RuntimeException( "socket_read() failed. Reason: ".socket_strerror( socket_last_error( $this->_socket ) ) );
}
return $read_data;
}
function join( $read_buffer = FALSE )
{
if( ! $this->_pid ) {
throw new RuntimeException( "no pid to join" );
}
pcntl_waitpid( $this->_pid, $status );
if( $read_buffer ) {
$output_array = array();
while( ! is_null( $read_data = $this->read() ) ) {
$output_array[] = $read_data;
}
return $output_array;
}
}
static function selectUntilJoin( array $thread_array, Closure $read_closure, Closure $join_closure = NULL )
{
$socket_array = array();
$socket_to_key_array = array();
foreach( $thread_array as $key => $thread ) {
$socket_array[(int)$thread->_socket] = $thread->_socket;
$socket_to_key_array[(int)$thread->_socket] = $key;
}
while( $socket_array ) {
$read_socket_array = $exception_socket_array = $socket_array;
$write_socket_array = array();
$count = socket_select( $read_socket_array, $write_socket_array, $exception_socket_array, NULL );
foreach( $read_socket_array as $read_socket ) {
$thread_key = $socket_to_key_array[(int)$read_socket];
$read_data = $thread_array[$thread_key]->read();
if( ! is_null( $read_data ) ) {
$read_closure( $thread_key, $read_data );
} else {
unset( $socket_array[(int)$read_socket] );
$thread_array[$thread_key]->join();
if( $join_closure ) {
$join_closure( $thread_key );
}
}
}
foreach( $exception_socket_array as $exception_socket ) {
throw new RuntimeException( "socket_select() failed. Reason: ".socket_strerror( socket_last_error( $exception_socket ) ) );
}
}
}
}
?>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment