Skip to content

Instantly share code, notes, and snippets.

@wgm89
Created March 24, 2015 07:43
Show Gist options
  • Select an option

  • Save wgm89/4c0d2caa374e9cb2849c to your computer and use it in GitHub Desktop.

Select an option

Save wgm89/4c0d2caa374e9cb2849c to your computer and use it in GitHub Desktop.
php ipc
<?php
$msg_key = 0x3000111; //系统消息队列的key
$worker_num = 2; //启动的Worker进程数量
$worker_pid = array();
$queue = msg_get_queue($msg_key, 0666);
if($queue === false)
{
die("create queue fail\n");
}
for($i = 0; $i < $worker_num; $i++)
{
$pid = pcntl_fork();
//主进程
if($pid > 0)
{
$worker_pid[] = $pid;
echo "create worker $i.pid = $pid\n";
continue;
}
//子进程
elseif($pid == 0)
{
proc_worker($i);
exit;
}
else
{
echo "fork fail\n";
}
}
proc_main();
function proc_main()
{
global $queue;
$bind = "udp://0.0.0.0:9999";
//建立一个UDP服务器接收请求
$socket = stream_socket_server($bind, $errno, $errstr, STREAM_SERVER_BIND);
if (!$socket)
{
die("$errstr ($errno)");
}
stream_set_blocking($socket, 1);
echo "stream_socket_server bind=$bind\n";
while (1)
{
$errCode = 0;
$peer = '';
$pkt = stream_socket_recvfrom($socket, 8192, 0, $peer);
if($pkt == false)
{
echo "udp error\n";
}
$ret = msg_send($queue, 1, $pkt, false, true, $errCode); //如果队列满了,这里会阻塞
if($ret)
{
stream_socket_sendto($socket, "OK\n", 0, $peer);
}
else
{
stream_socket_sendto($socket, "ER\n", 0, $peer);
}
}
}
function proc_worker($id)
{
global $queue;
$msg_type = 0;
$msg_pkt = '';
$errCode = 0;
while(1)
{
$ret = msg_receive($queue, 0, $msg_type, 8192, $msg_pkt, false, $errCode);
if($ret)
{
//.... Code ....//
echo "[Worker $id] ".$msg_pkt, PHP_EOL;
}
else
{
echo "ERROR: queue errno={$errCode}\n";
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment