当前位置:  开发笔记 > 编程语言 > 正文

swoole_process之进程间通信

swoole_process之进程间通信
博客好久没更新了,这大半年来主要精力放在了折腾swoole上,公司项目也上线了几个swoole的服务于中间件,最近马上也要上线一个中间件。swoole_server、swoole_process等都用的比较多了,现在就来慢慢总结。

从 swoole_process文档中可以看出,swoole_process进程间支持3种通信方式:

  • 1、管道pipe
  • 2、IPC msgqueue
  • 3、信号

接下来就详细介绍下每一种通信的原理以及实现。

1、管道pipe

关于管道文档中是这么说的

int swoole_process::__construct(mixed $function, $redirect_stdin_stdout = false, $create_pipe = true)

* $redirect_stdin_stdout,重定向子进程的标准输入和输出。 启用此选项后,在进程内echo将不是打印屏幕,而是写入到管道。读取键盘输入将变为从管道中读取数据。 默认为阻塞读取。* $create_pipe,是否创建管道,启用$redirect_stdin_stdout后,此选项将忽略用户参数,强制为true 如果子进程内没有进程间通信,可以设置为false

* $process对象在销毁时会自动关闭管道,子进程内如果监听了管道会收到CLOSE事件

* 1.7.22或更高版本允许设置管道的类型,默认为SOCK_STREAM流式

* 参数$create_pipe为2时,管道类型将设置为SOCK_DGRAM

int swoole_process->write(string $data)

* swoole底层使用Unix Socket实现通信,UnixSocket是内核实现的全内存通信,无任何IO消耗。在1进程write,1进程read,每次读写1024字节数据的测试中,100万次通信仅需1.02秒。* 管道通信默认的方式是流式,write写入的数据在read可能会被底层合并。可以设置swoole_process构造函数的第三个参数为2改变为数据报式。

从上面摘自文档中的部分就可以看出,管道有两种类型: SOCK_STREAMSOCK_DGRAM,而这两个正是建立socket时候需要的类型参数。我们最常见的linux命令中使用管道操作符“|”,而在标准unix编程中,管道的创建也是通过函数 int pipe(int filedes[2])创建的匿名管道;或者通过 int mkfifo(const char *pathname, mode_t mode)创建的命名管道,这两种方式创建的管道都没有SOCK_*参数的。

下面是摘自swoole扩展中创建pipe的代码

if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|bl", &callback, &redirect_stdin_and_stdout, &pipe_type) == FAILURE){        RETURN_FALSE;} ...... if (pipe_type > 0){    swPipe *_pipe = emalloc(sizeof(swWorker));    int socket_type = pipe_type == 1 ? SOCK_STREAM : SOCK_DGRAM;    if (swPipeUnsock_create(_pipe, 1, socket_type) < 0)    {        RETURN_FALSE;    }     process->pipe_object = _pipe;    process->pipe_master = _pipe->getFd(_pipe, SW_PIPE_MASTER);    process->pipe_worker = _pipe->getFd(_pipe, SW_PIPE_WORKER);    process->pipe = process->pipe_master;     zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("pipe"), process->pipe_masterTSRMLS_CC);}

从上面的代码中可以看出,这里的pipe是通过函数 swPipeUnsock_create创建的,下面是这个函数的完整实现

int swPipeUnsock_create(swPipe *p, int blocking, int protocol){    int ret;    swPipeUnsock *object = sw_malloc(sizeof(swPipeUnsock));    if (object == NULL)    {        swWarn("malloc() failed.");        return SW_ERR;    }    p->blocking = blocking;    ret = socketpair(AF_UNIX, protocol, 0, object->socks);    if (ret < 0)    {        swWarn("socketpair() failed. Error: %s [%d]", strerror(errno), errno);        return SW_ERR;    }    else    {        //Nonblock        if (blocking == 0)        {            swSetNonBlock(object->socks[0]);            swSetNonBlock(object->socks[1]);        }         int sbsize = SwooleG.socket_buffer_size;        swSocket_set_buffer_size(object->socks[0], sbsize);        swSocket_set_buffer_size(object->socks[1], sbsize);         p->object = object;        p->read = swPipeUnsock_read;        p->write = swPipeUnsock_write;        p->getFd = swPipeUnsock_getFd;        p->close = swPipeUnsock_close;    }    return 0;}

从上面的代码就可以很直观的看到,是通过函数 socketpair创建了一对已连接的(UNIX族)无名socket。在Linux中,完全可以把这一对socket当成pipe返回的文件描述符一样使用,唯一的区别就是这一对文件描述符中的任何一个都可读和可写。所以这样主进程和子进程间的通信就完全是在进行socket通信。

相关资料:

Linux 上实现双向进程间通信管道

socketpair

2、IPC msgqueue

bool swoole_process->useQueue(int $msgkey = 0, int $mode = 2);

* $msgkey是消息队列的key,默认会使用ftok(FILE)* $mode通信模式,默认为2,表示争抢模式,所有创建的子进程都会从队列中取数据

* 使用模式2后,创建的子进程无法进行单独通信,比如发给特定子进程。

* $process对象并未执行start,也可以执行push/pop向队列推送/提取数据

* 消息队列通信方式与管道不可公用。消息队列不支持EventLoop,使用消息队列后只能使用同步阻塞模式

上面是摘自文档中的一些描述。还是直接看源码怎么实现的

static PHP_METHOD(swoole_process, useQueue){    long msgkey = 0;    long mode = 2;     if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|ll", &msgkey, &mode) == FAILURE)    {        RETURN_FALSE;    }     swWorker *process = swoole_get_object(getThis());     if (msgkey <= 0)    {#if PHP_MAJOR_VERSION == 7        msgkey = ftok(execute_data->func->op_array.filename->val, 0);#else        msgkey = ftok(EG(active_op_array)->filename, 0);#endif    }     swMsgQueue *queue = emalloc(sizeof(swMsgQueue));    if (swMsgQueue_create(queue, 1, msgkey, 0) < 0)    {        RETURN_FALSE;    }    queue->delete = 0;    process->queue = queue;    process->ipc_mode = mode;    RETURN_TRUE;}

从源码可以看到,$msgkey的默认值为0,且当它的值小于等于0的时候,就会用使用默认值 ftok(FILE)。而mode在这里并没有直接使用,而是将它赋值给了process对象的ipc_mode属性。

但是这里使用 swMsgQueue_create创建了队列,下面是这个函数的原型

int swMsgQueue_create(swMsgQueue *q, int blocking, key_tmsg_key, long type){    int msg_id;    if (blocking == 0)    {        q->ipc_wait = IPC_NOWAIT;    }    else    {        q->ipc_wait = 0;    }    q->blocking = blocking;    msg_id = msgget(msg_key, IPC_CREAT | O_EXCL | 0666);    if (msg_id < 0)    {        swWarn("msgget() failed. Error: %s[%d]", strerror(errno), errno);        return SW_ERR;    }    else    {        q->msg_id = msg_id;        q->type = type;    }    return 0;}

这个代码很简单,创建了一个key为$msg_key并且是阻塞性的msgqueue。这里就是完整的msgqueue的创建逻辑,当你没有给它设置默认的key的时候,系统取一个默认的值作为key,当然你也能根据你的需要设置任意的key来创建一个或者多个msgqueue。

上面说了第一个参数$msgkey,下面来看看第二个参数$mode的作用。上面的代码中将mode赋值给了process对象的ipc_mode属性,即系看它都在什么地方被用到。

首先看数据如队列 pop

static PHP_METHOD(swoole_process, push){    ......     struct    {        long type;        char data[65536];    } message;     if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &data, &length) == FAILURE)    {        RETURN_FALSE;    }     ......     message.type = process->id;    memcpy(message.data, data, length);     if (swMsgQueue_push(process->queue, (swQueue_data *)&message, length) < 0)    {        php_error_docref(NULL TSRMLS_CC, E_WARNING, "msgsnd() failed. Error: %s[%d]", strerror(errno), errno);        RETURN_FALSE;    }    RETURN_TRUE;}

从上面的代码可以看出,我们PHP调用push时候传入的值被放入了一个 message结构体中,然后再调用 swMsgQueue_push将数据入队列的。

下面是 swMsgQueue_push 的实现

int swMsgQueue_push(swMsgQueue *q, swQueue_data *in, int length){    int ret;     while (1)    {        ret = msgsnd(q->msg_id, in, length, q->ipc_wait);         if (ret < 0)        {            if (errno == EINTR)            {                continue;            }            else if (errno == EAGAIN)            {                swYield();                continue;            }            else            {                return -1;            }        }        else        {            return ret;        }    }    return 0;}

这里的代码很简单,就是调用系统调用 msgsnd将数据放入队列中。

现在看数据出队列 pop

static PHP_METHOD(swoole_process, pop){    ......     struct    {        long type;        char data[SW_MSGMAX];    } message;     if (process->ipc_mode == 2)    {        message.type = 0;    }    else    {        message.type = process->id;    }     int n = swMsgQueue_pop(process->queue, (swQueue_data *) &message, maxsize);    if (n < 0)    {        php_error_docref(NULL TSRMLS_CC, E_WARNING, "msgrcv() failed. Error: %s[%d]", strerror(errno), errno);        RETURN_FALSE;    }    SW_RETURN_STRINGL(message.data, n, 1);}

看到这里,终于看到我们之前传入的 $mode参数的作用了。如果我们如文档中所说默认值为2的话, message结构体的type就被设置为0,否则的话取当前进程的ID。说到这里有个要说的是,每个process进程在它的对象被new的时候,它的构造函数会将它自己的属性ID设置一个值,这个值是一个自增计数器,也就是会将一次初始化的process对象排队,所以每个process的id的值是不一样的。

重点是看 swMsgQueue_pop

int swMsgQueue_pop(swMsgQueue *q, swQueue_data *data, int length){    int flag = q->ipc_wait;    long type = data->mtype;     return msgrcv(q->msg_id, data, length, type, flag);}

看到了吧,type是 msgrcv系统调用需要的第4个参数。下面是对这个参数的解释:

1. =0:接收第一个消息

2. >0:接收类型等于msgtyp的第一个消息

3.<0:接收类型等于或者小于msgtyp绝对值的第一个消息

所以现在分析来看,swoole_process使用msgqueue的话是相当灵活的,让你能随心所欲实现你的需求。多队列、单队列、单队列不同类型,这样的组合带来的收益就是只有你想不到,没有swoole_process实现不了的通信模型。

参考文档:

msgget

msgsnd & msgrcv

3、信号

信号这个是最常见的了。所以这里讲的就不如上面两个详细了。swoole_process给了一个设置异步监听信号的函数

bool swoole_process::signal(int $signo, mixed $callback);

* 此方法基于signalfd和eventloop是异步IO,不能用于同步程序中

* 同步阻塞的程序可以使用pcntl扩展提供的pcntl_signal

* $callback如果为null,表示移除信号监听

文档中已经说明了,这个函数只能用于异步程序中,所以就不多做说明了。而关于同步程序中怎么使用pcntl_signal,rango的blog中已经有说明了,所以直接看就行 《 PHP官方的pcntl_signal性能极差》

好了,整理完了,我的思维也更清晰了。

推荐阅读
小妖694_807
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有