管道是最初的Unix IPC形式,尽管对于许多操作系统来说管道很有用,但它的根本局限性在于没有名字,从而只能由有亲缘关系的进程使用。 这一点随FIFO的加入在System III Unix中得以改正。FIFO有时成为有名管道。 管道和FIFO都使用常用的read和write来进行访问。

管道

#include <unistd.h>

int pipe(int fds[2]); // 成功返回0,失败返回-1

该函数返回两个文件描述符:fd[0]和fd[1],前者用于读,后者用于写。

单进程利用管道作数据交互的示意图:

多进程利用管道作单向数据交互的示意图:

上图给出了一个在父子进程中利用管道进行单向数据通信的示例。 首先,由父进程创建一个管道后调用fork派生一个自身的副本; 接着,父进程关闭这个管道的读出端,子进程关闭这个管道的写入端。

多进程利用管道作双向数据交互的示意图:

父子进程利用管道进行双向数据传输需要创建两个管道,如上图所示。 然后,我们在父进程中关闭管道1的写入端、管道2的读出端; 在子进程中关闭管道1的读出端、管道2的写入端。

我们在某个Unix shell中输入一个像下面这样的命令时:

who | sort | lp

该shell将创建三个进程和其间的两个管道。 它还把每个管道的读出端复制到相应进程标准输入,把每个管道的写入端复制到相应进程的标准输出。 其管道线如下:

popen和pclose

#include <stdio.h>

// 成功则返回文件指针,失败则返回NULL
FILE* popen(const char *command, const char *type);

// 成功则返回shell的终止状态,失败则返回-1
int pclose(FILE *stream);
  • command参数是一个shell命令行,它是由shell程序处理的;
  • type可为r或w,当为r时:管道的写入端被复制为command进程的标准输出; 当为w时,管道的读出端被复制为command进程的标准输入。

FIFO(named pipe)

#include <sys/types.h>
#include <sys/stat.h>

int mkfifo(const char *pathname, mode_t mode); // 成功返回0,失败返回-1

在创建一个FIFO后,该FIFO必须或者打开来读,或者打开来写,FIFO不能以读写方式打开,因为它是半双工的。 对管道或FIFO的write总是往末尾添加数据,对它们的read则总是从开头返回数据。 如果对管道或FIFO调用lseek,将会返回ESPIPE错误。

O_NONBLOCK对管道和FIFO的影响

  • 当前操作:open FIFO(只读)

    • 现有打开操作:FIFO打开来写

      • 阻塞模式:成功返回
      • 非阻塞模式:成功返回
    • 现有打开操作:FIFO不是打开来写

      • 阻塞模式:阻塞到FIFO打开来写为止
      • 非阻塞模式:成功返回
  • 当前操作:open FIFO(只写)

    • FIFO打开来读

      • 阻塞模式:成功返回
      • 非阻塞模式:成功返回
    • FIFO不是打开来读

      • 阻塞模式:阻塞到FIFO打开来读为止
      • 非阻塞模式:返回ENXIO
  • 当前操作:从空管道或空FIFO read

    • 管道或FIFO打开来写

      • 阻塞模式:阻塞到管道或FIFO中有数据或者管道或FIFO不再为写打开着为止
      • 非阻塞模式:返回EAGAIN
    • 管道或FIFO不是打开来写

      • 阻塞模式:read返回0(文件结束符)
      • 非阻塞模式:read返回0(文件结束符)
  • 当前操作:往管道或FIFO write

    • 管道或FIFO打开来读

      • 阻塞模式:(见下面描述)
      • 非阻塞模式:(见下面描述)
    • 管道或FIFO不是打开来读

      • 阻塞模式:给线程产生SIGPIPE信号
      • 非阻塞模式:给线程产生SIGPIPE信号

关于管道或FIFO的读出与写入操作的若干规则描述如下:

  • 如果请求读出的数据量多于管道或FIFO中当前可用的数据量,那么只返回这些可用的数据;

  • 如果请求写入的数据量小于或等于PIPE_BUF,那么write操作将保证是原子的。 这意味着,如果有两个不同的进程差不多同时往一个管道或FIFO写数据, 那么,或者先写入来自第一个进程的所有数据,然后再写入来自第二个进程的所有数据; 或者颠倒过来。系统不会相互混杂来自这两个进程的数据。 但是,如果请求写入的数据大于PIPE_BUF,那么write操作将不能保证是原子的。

    O_NONBLOCK标志的设置对write操作的原子性没有影响(原子性由且仅由所请求写入的字节数是否<=PIPE_BUF所决定)。 然而,当一个管道或FIFO设置成非阻塞时,write的返回值取决于待写入的数据以及该管道或FIFO当前的可用空间大小。

    • 如果待写的数据<=PIPE_BUF:

      • 如果待写入字节数<=当前管道或FIFO可用空间大小,那么所有数据写入;
      • 如果待写入字节数>当前管道或FIFO可用空间大小,那么立即返回EAGAIN错误。
    • 如果待写的数据>PIPE_BUF:

      • 如果该管道或FIFO至少有1字节空间,那么内核写入该管道或FIFO能容纳的最大数目的字节,该数目同时作为write的返回值;
      • 如果该管道或FIFO已满,则返回EAGAIN错误。

示例程序(并发模式下的回射程序)

common.h

#ifndef __COMMON_H__
#define __COMMON_H__

#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define FIFO_PATH "/tmp/ServerFIFO"
#define FIFO_ECHO "/tmp/ECHOFIFO."

#define MAX_BUF 256

extern void errQuit();

#endif

common.cpp

#include "common.h"

void errQuit()
{
    printf("%s\n", strerror(errno));
    exit(1);
}

client.cpp

#include "common.h"

int main(int argc, char *argv[])
{
    char echoRPath[MAX_BUF];
    char echoWPath[MAX_BUF];
    snprintf(echoRPath, MAX_BUF, "%s%dr", FIFO_ECHO, (int)getpid());
    snprintf(echoWPath, MAX_BUF, "%s%dw", FIFO_ECHO, (int)getpid());

    if (mkfifo(echoRPath, 0644) < 0 && errno != EEXIST)
        errQuit();
    if (mkfifo(echoWPath, 0644) < 0 && errno != EEXIST)
        errQuit();

    int servFd = open(FIFO_PATH, O_WRONLY);
    if (servFd < 0)
        errQuit();

    char buf[MAX_BUF];
    snprintf(buf, MAX_BUF, "%10d", (int)getpid());
    write(servFd, buf, 10);

    int readFd = open(echoRPath, O_RDONLY);
    if (readFd < 0)
        errQuit();

    int n = read(readFd, buf, 2);
    if (n != 2 || buf[0] != 'o' || buf[1] != 'k')
        errQuit();

    int writeFd = open(echoWPath, O_WRONLY);
    if (writeFd < 0)
        errQuit();

    while (fgets(buf, MAX_BUF, stdin) != NULL)
    {
        int len = strlen(buf);
        write(writeFd, buf, len);

        n = read(readFd, buf, MAX_BUF-1);
        if (n > 0)
        {
            buf[n] = '\0';
            printf("server ret:%s", buf);
        }
    }

    exit(0);
}

server.cpp

#include "common.h"

int main(int argc, char *argv[])
{
    if (mkfifo(FIFO_PATH, 0644) < 0 && errno != EEXIST)
        errQuit();

    int servFd = open(FIFO_PATH, O_RDONLY);
    if (servFd < 0)
        errQuit();

    int dummyfd = open(FIFO_PATH, O_WRONLY);
    if (dummyfd < 0)
        errQuit();

    char buf[11] = {0};
    int n = 0;
    memset(buf, 0, sizeof(buf));
    while ((n = read(servFd, buf, 10)) > 0)
    {
        if (n != 10)
        {
            printf("client write len illegal len = %d\n", n);
            continue;
        }

        int clientpid = atoi(buf);
        if (clientpid > 0)
        {
            if (fork() == 0)
            {
                char echoRPath[MAX_BUF];
                char echoWPath[MAX_BUF];
                snprintf(echoRPath, MAX_BUF, "%s%dw", FIFO_ECHO, clientpid);
                snprintf(echoWPath, MAX_BUF, "%s%dr", FIFO_ECHO, clientpid);

                int writeFd = open(echoWPath, O_WRONLY);
                if (writeFd < 0)
                    errQuit();
                write(writeFd, "ok", 2);

                int readFd = open(echoRPath, O_RDONLY);
                if (readFd < 0)
                    errQuit();

                char echoBuf[MAX_BUF];
                int n = 0;
                while((n = read(readFd, echoBuf, MAX_BUF)) > 0)
                {
                    write(writeFd, echoBuf, n);
                }

                printf("client %u stoped.\n");

                exit(0);
            }
        }
    }

    exit(0);
}

本文作者ruleless, 欢迎评论、交流。
转载请务必标注出处: 管道、FIFO


«Previous:   I/O多路转接(I/O复用)

»Next:         Posix 消息队列