在类Unix操作系统中,管道(Pipeline)是一系列将标准输入输出链接起来的进程,其中每一个进程的输出被直接作为下一个进程的输入。

目录

  1. 系统调用 fork
  2. 管道
    1. 示例
    2. 管道的读写
      1. 读管道
      2. 写管道
    3. 管道的特点
    4. 管道的局限性
  3. 双向管道通信
  4. 参考

系统调用 fork

在linux系统中创建进程有两种方式

  • 一是由操作系统创建。
  • 二是由父进程创建进程。系统调用函数fork()是创建一个新进程的唯一方式。

fork()函数通过系统调用创建一个与原来进程几乎完全相同的进程。

  1. 系统先给新的进程分配资源,例如存储数据和代码的空间。
  2. 然后把原来的进程(父进程)的所有值都复制到新的新进程(子进程)中,只有少数值与原来的进程的值不同。
  3. Linux的fork()采用写时拷贝实现,只有子进程发起写操作时才正真执行拷贝,在写时拷贝之前都是以只读的方式共享。这样可以避免发生拷贝大量数据而不被使用的情况。

fork是Linux系统中一个比较特殊的函数,其一次调用会有两个返回值。在fork函数执行完毕后,如果创建新进程成功,则出现两个进程,一个是子进程,一个是父进程。如果失败返回值是1。

  • 在子进程中,fork函数返回0。
  • 在父进程中,fork返回新创建子进程的进程ID。

因此我们可以通过fork返回的值来判断当前进程是子进程还是父进程。

管道

Linux中,每个管道允许两个进程交互数据,一个进程向管道写入数据,一个进程从管道读出数据。Linux并没有给管道定义一个新的数据结构,而是借用了文件系统中文件的数据结构。即管道实际是一个文件(但是与文件并不完全形同)。

操作系统在内存中为每个管道开辟一页内存(4KB),给这一页赋予了文件的属性。这一页内存由两个进程共享,但不会分配给任何进程,只由内核掌控。

示例

Linux pipe手册中的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <sys/types.h>
#include <sys/wait.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

int main(int argc, char *argv[])
{
int pipefd[2];
pid_t cpid;
char buf;

if (argc != 2)
{
fprintf(stderr, "Usage: %s <string>\n", argv[0]);
exit(EXIT_FAILURE);
}

if (pipe(pipefd) == -1)
{
perror("pipe");
exit(EXIT_FAILURE);
}

cpid = fork();
if (cpid == -1)
{
perror("fork");
exit(EXIT_FAILURE);
}

if (cpid == 0)
{ /* Child reads from pipe */
close(pipefd[1]); /* Close unused write end */

while (read(pipefd[0], &buf, 1) > 0)
write(STDOUT_FILENO, &buf, 1);

write(STDOUT_FILENO, "\n", 1);
close(pipefd[0]);
_exit(EXIT_SUCCESS);
}
else
{ /* Parent writes argv[1] to pipe */
close(pipefd[0]); /* Close unused read end */
write(pipefd[1], argv[1], strlen(argv[1]));
close(pipefd[1]); /* Reader will see EOF */
wait(NULL); /* Wait for child */
exit(EXIT_SUCCESS);
}
}

管道的读写

  • 读管道进程执行时,如果管道中有未读数据,就读取数据,没有未读数据就挂起,这样就不会读取垃圾数据。
  • 写管道进程执行时,如果管道中有剩余空间,就写入数据,没有剩余空间了,就挂起,这样就不会覆盖尚未读取的数据。

读管道

对于读管道操作,数据是从管道尾读出,并使管道尾指针前移‘读取字节数’个位置。

Linux 0.11 源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

//fs/pipe.c
int read_pipe(struct m_inode * inode, char * buf, int count)
{
int chars, size, read = 0;

while (count>0) {
while (!(size=PIPE_SIZE(*inode))) { //管道空
wake_up(&inode->i_wait); //唤醒等待写管道进程
if (inode->i_count != 2) /* are there any writers? */
return read; //没有writer,返回
sleep_on(&inode->i_wait); //挂起读管道进程
}
chars = PAGE_SIZE-PIPE_TAIL(*inode); //PAGE_SIZE: 4KB
if (chars > count)
chars = count;
if (chars > size)
chars = size;
count -= chars;
read += chars;
size = PIPE_TAIL(*inode);
PIPE_TAIL(*inode) += chars; // 读多少数据,指针就偏移多少
PIPE_TAIL(*inode) &= (PAGE_SIZE-1); // 指针超过一个页面,(&= )操作可以实现自动回滚

while (chars-- >0)
put_fs_byte(((char *)inode->i_size)[size++],buf++); //将管道中的数据拷贝至buf
}
wake_up(&inode->i_wait); //唤醒等待写管道进程
return read;
}

// include/linux/fs.h
#define PIPE_HEAD(inode) ((inode).i_zone[0])
#define PIPE_TAIL(inode) ((inode).i_zone[1])
//PIPE_HEAD(inode)-PIPE_TAIL(inode) < 0,和 (PAGE_SIZE-1) 相与能够计算出管道未读数据的长度
#define PIPE_SIZE(inode) ((PIPE_HEAD(inode)-PIPE_TAIL(inode))&(PAGE_SIZE-1))

写管道

对于写管道操作,数据是向管道头部写入,并使管道头指针前移‘写入字节数’个位置。

Linux 0.11 源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//fs/pipe.c
int write_pipe(struct m_inode * inode, char * buf, int count)
{
int chars, size, written = 0;

while (count>0) {
while (!(size=(PAGE_SIZE-1)-PIPE_SIZE(*inode))) { //管道已满
wake_up(&inode->i_wait); //唤醒等待读管道进程
if (inode->i_count != 2) { /* no readers */
current->signal |= (1<<(SIGPIPE-1));
return written?written:-1; //没有reader,返回
}
sleep_on(&inode->i_wait); //挂起写管道进程
}
chars = PAGE_SIZE-PIPE_HEAD(*inode);
if (chars > count)
chars = count;
if (chars > size)
chars = size;
count -= chars;
written += chars;
size = PIPE_HEAD(*inode);
PIPE_HEAD(*inode) += chars;
PIPE_HEAD(*inode) &= (PAGE_SIZE-1); // 指针超过一个页面,(&= )操作可以实现自动回滚

while (chars-- >0)
((char *)inode->i_size)[size++]=get_fs_byte(buf++); //读取buf中的数据写入管道
}
wake_up(&inode->i_wait); //唤醒等待读管道进程
return written;
}

// include/linux/fs.h
#define PIPE_HEAD(inode) ((inode).i_zone[0])
#define PIPE_TAIL(inode) ((inode).i_zone[1])
//PIPE_HEAD(inode)-PIPE_TAIL(inode) < 0,和 (PAGE_SIZE-1) 相与能够计算出管道未读数据的长度
#define PIPE_SIZE(inode) ((PIPE_HEAD(inode)-PIPE_TAIL(inode))&(PAGE_SIZE-1))

管道的特点

  • 管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道;
  • 只能用于父子进程或者兄弟进程之间(具有亲缘关系的进程);
  • 单独构成一种独立的文件系统:管道对于管道两端的进程而言,就是一个文件,但它不是普通的文件,它不属于某种文件系统,而是自立门户,单独构成一种文件系统,并且只存在于内存中。
  • 数据的读出和写入:一个进程向管道中写的内容被管道另一端的进程读出。写入的内容每次都添加在管道缓冲区的末尾,并且每次都是从缓冲区的头部读出数据。

管道的局限性

  • 只支持单向数据流。
  • 只能用于具有亲缘关系的进程之间。
  • 没有名字(有名管道是 FIFO)。
  • 管道的缓冲区是有限的(管道制存在于内存中,在管道创建时,为缓冲区分配一个页面大小)。
  • 管道所传送的是无格式字节流,这就要求管道的读出方和写入方必须事先约定好数据的格式,比如多少字节算作一个消息(或命令、或记录)。

双向管道通信

  1. 父进程创建两个管道,pipe1和pipe2.
  2. 父进程创建子进程,调用fork()的过程中子进程会复制父进程创建的两个管道.
  3. 实现父进程向子进程通信:父进程关闭pipe1的读端,保留写端;而子进程关闭pipe1的写端,保留读端.
  4. 实现子进程向父进程通信:子进程关闭pipe2的读端,保留写端;而父进程关闭pipe2的写端,保留读端.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#include <sys/types.h>
#include <sys/wait.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

int main(int argc, char *argv[])
{
int pipe_command[2]; //管道1 父进程 -> 子进程
int pipe_result[2]; //管道2 子进程 -> 父进程
pid_t cpid;

char buf[4];

//父进程创建管道
if (pipe(pipe_command) == -1)
{
perror("pipe_command");
exit(EXIT_FAILURE);
}

if(pipe(pipe_result) == -1)
{
perror("pipe_result");
exit(EXIT_FAILURE);
}

//创建子进程
cpid = fork();

if (cpid == -1)
{
perror("fork error");
exit(EXIT_FAILURE);
}

if (cpid == 0) /*子进程*/
{
printf("sub: pid %d\n", getpid()); //输出子进程ID

close(pipe_command[1]); //关闭管道写端
close(pipe_result[0]); //关闭管道读端

int read_status;
while (1)
{
read_status = read(pipe_command[0], buf, 4); //读取父进程的命令
if(read_status > 0)
{
printf("sub: command %s\n", buf);
if(strcmp(buf, "hell") == 0)
{
write(pipe_result[1], "okok", 4); //回应父进程
}
else if( strcmp(buf, "exit") == 0)
{
printf("sub: exit\n");
break;
}
}
else if(read_status < 0) //读取错误
{
perror("sub: read error!");
break;
}
}

close(pipe_command[0]); //关闭管道
close(pipe_result[1]);

exit(EXIT_SUCCESS);
}
else /*父进程*/
{
printf("parent: pid %d\n", getpid()); //输出父进程ID

close(pipe_command[0]); //关闭管道读端
close(pipe_result[1]); //关闭管道写端

write(pipe_command[1], "hell", 4); //向子进程发送命令

int read_status;
while (1)
{
read_status = read(pipe_result[0], buf, 4);
if(read_status > 0)
{
printf("parent: received %s\n", buf); //接收子进程回应
write(pipe_command[1], "exit", 4); //通知子进程退出
break;
}
else if(read_status < 0) //读取错误
{
perror("parent: read error!");
break;
}
}

close(pipe_command[1]);
close(pipe_result[0]);

/* 等待子进程退出,并判断状态吗码*/
int status;
waitpid(-1, &status , 0);

if(WIFEXITED(status))
{
printf("exited: %d\n", WEXITSTATUS(status));
}
else if(WIFSIGNALED(status))
{
printf("signaled: %d\n", WTERMSIG(status));
}

exit(EXIT_SUCCESS);
}
}

参考

  1. 《Linux内核设计的艺术》
  2. 《Linux内核设计与实现》
  3. Linux v0.11内核源码(https://github.com/karottc/linux-0.11)