【Linux】进程聊天室:管道传纸条、共享内存开黑,内核如何当裁判?(进程间通信)
目录
一、进程间通信介绍
1. 什么是进程间通信
2. 进程间通信的发展和分类(简单介绍)
3. 进程间通信的目的
二、管道
1. 什么是管道?
2. 匿名管道
(1)核心特点
(2)创建匿名管道
(3)fork共享管道原理
(4)内核角度 - 管道本质
(5)管道代码样例
(6)管道文件的特点
(7)基于匿名管道 --- 进程池
① Task.hpp
② ProcessPool.hpp
③ Main.cc
④ Makefile
⑤ BUG --- 文件描述符的继承和泄漏
3.命名管道
(1)创建命名管道
① 命令行创建
② 系统调用函数
(2)实例
① 示例:简单实现一个客户端与服务端的通信
② 优化:用类封装
三、system V
1. system V 共享内存
(1)原理
① 物理内存共享
② 虚拟地址映射
③ 直接访问
④ 工作流程
(2)内核数据结构
① struct shmid_ds(用户可见的元信息)
② struct shmid_kernel(内核实际使用的扩展结构)
③ struct file 与物理页帧
④ 操作系统如何跟踪共享内存的使用状态?
(3)标识符和键
① 标识符shmid
② 键key
③ 总结
(4)共享内存接口函数
① shmget()
② shmctl()
③ shmat()
④ shmdt()
(5)共享内存使用全流程
① comm.hpp
② server.cc
(6)基于共享内存实现进程间通信
① 无同步机制的进程间通信
② 共享内存+管道
2. system V消息队列
(1)基本概念
(2)接口函数
① msgget()
② msgctl()
③ msgsnd()
④ msgrcv()
3.systerm V信号量
(1)并发编程,概念铺垫
(2)原子性
(3)什么是信号量?
(4)信号量接口函数
① semget()
② semctl()
③ semop()
(5)内核信号量集属性
四、内核是如何组织管理IPC资源的?
(1)核心数据结构物理布局
(2)内核源代码关系图示
简介:在多任务操作系统(如Linux)中,进程间通信(IPC)是协作与数据交换的基石。无论是简单的管道传输,还是高性能的共享内存,其背后均依赖内核精密的资源管理机制。本文将从用户态接口到内核实现,逐层拆解管道、System V IPC的工作流程,结合代码示例与内核数据结构图示,为你呈现进程间通信的全景视角,最后深入探讨内核如何通过命名空间和权限控制隔离IPC资源。
一、进程间通信介绍
1. 什么是进程间通信
进程间通信(Inter-Process Communication, IPC)是指在不同进程之间传播或交换信息的技术方法。由于操作系统中的进程通常拥有独立的地址空间,一个进程不能直接访问另一个进程的变量或数据结构,因此需要专门的机制来实现进程间的数据共享和通信。
进程间通信的本质:是让不同的进程先看到同一份资源(内存),然后才有通信的条件。
2. 进程间通信的发展和分类(简单介绍)
(1)早期IPC:管道(Pipe)是最早的IPC机制之一。
匿名管道(无名管道):单向通信,只能用于有亲缘关系的进程(如父子进程),通过pipe( )系统调用创建。
命名管道(FIFO):有名称的管道文件,可用于无亲缘关系的进程间通信,通过mkfifo()创建。
(2)System V IPC
三种主要机制:消息队列、信号量、共享内存
使用键值(Key)来标识IPC对象,需要显式删除IPC对象,否则会一直存在于系统中。
权限控制通过类似文件权限的机制
(3)POSIX IPC:对System V IPC的改进和标准化
主要包括以下组件:消息队列、共享内存、信号量、互斥量、条件变量、读写锁。
(4)发展对比
特性 | 管道 | System V IPC | POSIX IPC |
---|---|---|---|
标准化 | Unix传统 | System V Unix | POSIX标准 |
对象标识 | 文件描述符(匿名管道) | 键值(Key) | 文件系统路径名 |
生命周期 | 随进程结束 | 需显式删除 | 可配置为随进程结束 |
访问控制 | 文件权限 | IPC权限 | 文件权限 |
跨平台性 | 有限 | 有限 | 较好 |
性能 | 中等 | 高(特别是共享内存) | 高 |
3. 进程间通信的目的
• 数据传输:一个进程需要将它的数据发送给另一个进程。
• 资源共享:多个进程之间共享同样的资源。
• 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
• 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
二、管道
1. 什么是管道?
管道(Pipe)是 Unix 系统最古老的进程间通信(IPC)方式,其核心思想是:
将一个进程的输出数据流(stdout)直接连接到另一个进程的输入数据流(stdin),形成一个单向的、基于字节流(无消息边界,二进制传输)的通信通道(内核级)。
2. 匿名管道
(1)核心特点
• 单向通信:数据只能从写端流向读端(半双工)
• 仅限亲缘进程:通过 fork() 创建的父子/兄弟进程间使用
• 内存级通信:由内核管理缓冲区,不依赖磁盘文件
• 随进程销毁:当所有相关进程终止时,管道自动释放
(2)创建匿名管道
#include
int pipe(int pipefd[2]);
// 成功:返回 0,并填充 pipefd
// 失败:返回 -1,并设置 errno 表示错误原因
参数:pipefd[2]表示一个长度为 2 的整型数组,用于存储管道的两个文件描述符:
• pipefd[0]:管道的读端(用于从管道读取数据)。
• pipefd[1]:管道的写端(用于向管道写入数据)。
注:调用pipe创建管道不要文件路径,没有文件名。是内存级的,被OS单独设计,称之为匿名管道。
(3)fork共享管道原理
单个进程中的管道几乎没有任何用处,通常,进程会先调用pipe(),接着调用fork(),从而创建父进程到子进程的IPC通道。
我们怎么保证两个进程打开的是同一个管道文件?fork之后,子进程会继承父进程的管道文件描述符,父子进程通过fd访问同一管道,内核确保数据同步和生命周期管理。
fork之后做什么取决于我们想要的数据流的方向。对于父进程到子进程的管道,父进程关闭管道的读端(fd[0]),子进程关闭写端(fd[1])。
(4)内核角度 - 管道本质
管道在内核中通过 inode 表示,不同于磁盘文件的inode,它关联的是内存中的 pipe_inode_info(管道核心)。
管道是通过 文件描述符→file→inode→pipe_inode_info→数据页 的链式关系实现的,多个进程通过不同层级的共享(file 结构独立,但底层 pipe_inode_info 共享)完成通信。
匿名管道的“匿名性”体现在其 inode 不关联文件系统,仅存于内存。
管道的本质也是文件!
(5)管道代码样例
创建一个管道,用于父子进程间单向通信,子进程每隔1秒向管道写入数据,父进程从管道读取数据并打印。
#include
#include
#include
#include
#include
#include
void ChildWrite(int wfd)
{
char buffer[1024];
int cnt = 0;
while(true)
{
// 将字符串格式化到数组
// 注: C语言库函数处理字符串时会默认加' ', 所以这里用的是sizeof(buffer),没-1。
snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);
write(wfd, buffer, strlen(buffer));
sleep(1); // 每隔一秒写入一行
}
}
void FatherRead(int rfd)
{
char buffer[1024];
while(true)
{
buffer[0] = 0;
// write是系统调用,不对字符串处理,所以要用sizeof(buffer)-1
size_t n = read(rfd, buffer, sizeof(buffer)-1); // 读过来的是字符串,所以要预留一个位置空间存放' '
if(n > 0)
{
buffer[n] = 0;
std::cout << "Child say: " << buffer << std::endl;
}
}
}
int main()
{
// 1. 创建管道
int fds[2] = {0};
int n = pipe(fds);
if (n < 0)
{
std::cerr << "pipe error!" << std::endl;
return -1;
}
// 若成功,参数fd返回两个文件描述符
// fd[0]表示读端, fd[1]表示写端
std::cout << "fds[0]: " << fds[0] << std::endl;
std::cout << "fds[1]: " << fds[1] << std::endl;
// 2. 创建子进程
pid_t id = fork();
if (id == 0)
{
// child
// 3. 关闭不需要的读写端
close(fds[0]); // 子进程关闭读端
ChildWrite(fds[1]);
close(fds[1]); // 子进程结束关闭写端
exit(0);
}
// father
// 3. 关闭不需要的读写端
close(fds[1]); // 父进程关闭写端
FatherRead(fds[0]);
waitpid(id, nullptr, 0);
close(fds[0]); // 父进程结束关闭读端
return 0;
}
(6)管道文件的特点
① 只能用于具有共同祖先的进程(亲缘关系进程)
• 管道通常由父进程创建,然后调用 fork(),使父子进程共享管道的文件描述符。
• 非亲缘关系的进程无法直接使用管道通信(但可以通过其他方式,如命名管道 FIFO,后面讲)。
② 管道提供流式服务(面向字节流)
• 管道是字节流(stream),没有消息边界,数据以字节序列的形式传输。
• 不同于消息队列(如 msgqueue),不会自动分隔消息,需要应用层自行处理(如用 分隔)。
③ 进程退出,管道释放(生命周期随进程)
• 管道的生命周期依赖于进程,当所有引用该管道的进程都关闭文件描述符后,管道会被内核回收。
• 如果父进程先退出,子进程仍可继续使用管道,但若所有进程都关闭管道,数据会丢失。
④ 内核会对管道操作进行同步与互斥
• 同步(Synchronization):
当管道空时,读端 read() 会阻塞,直到有数据写入。
当管道满时(默认缓冲区大小通常为 64KB),写端 write() 会阻塞,直到有空间可写。
• 互斥(Mutual Exclusion):
内核保证多个进程同时读写管道时不会发生数据竞争(read 和 write 是原子的)。
例如:如果两个进程同时写管道,内核会确保数据不会交错(一次 write() 不会被另一个 write() 打断)。如果两个进程同时读管道,内核会确保数据不会被重复读取(每个 read() 获取不同的数据)。
⑤ 管道是半双工的(数据只能单向流动)
• 半双工(Half-Duplex):数据只能单向传输(要么父写子读,要么子写父读)。
• 如果需要双向通信(全双工),必须建立两个管道
⑥ 管道的默认缓冲区大小(影响读写阻塞)
• 在 Linux 中,管道的默认缓冲区大小通常是 64KB(PIPE_BUF,定义在
)。 • 如果写入的数据超过 PIPE_BUF,write() 可能会部分写入或阻塞,取决于是否设置 O_NONBLOCK。
⑦ 管道的读写行为(四种通信情况)
情况 | 读端 read() | 写端 write() |
---|---|---|
管道空 | 阻塞(直到有数据) | 正常写入 |
管道满 | 正常读取 | 阻塞(直到有空间) |
所有写端关闭 | read() 返回 0 (EOF) | - |
所有读端关闭 | - | 44 |
(7)基于匿名管道 --- 进程池
这是一个基于C++实现的简单进程池系统,主要用于管理多个子进程并通过管道进行任务分发。
① Task.hpp
● TaskManager类(任务管理器):管理可执行任务
• 关键特性:
使用函数指针数组存储任务
支持任务注册(Register)
随机选择任务(Code)
执行指定任务(Execute)
• 任务类型:typedef void (*task_t)()定义的无参数无返回值函数
#pragma once
#include
#include
#include
typedef void (*task_t)(); // 指向“无参数且返回void的函数”函数指针类型
Debug
void PrintLog()
{
std::cout << "我是一个打印日志的任务" << std::endl;
}
void DownLoad()
{
std::cout << "我是一个下载的任务" << std::endl;
}
void Upload()
{
std::cout << "我是一个上传的任务" << std::endl;
}
Debug
// 管理任务的类
class TaskManager
{
private:
std::vector _tasks; // 存放各类任务, 但必须是task_t类型的
public:
TaskManager()
{
srand(time(nullptr)); // 初始化伪随机数生成器种子, 让程序每次运行时生成不同的随机数序列
}
void Register(task_t t)
{
_tasks.push_back(t);
}
// 返回一个任务
int Code()
{
return rand() % _tasks.size(); // 随机选取一个任务列表里面的任务
}
// 执行一个任务
void Execute(int code)
{
if(code >= 0 && code < _tasks.size())
{
_tasks[code](); // 执行
}
}
~TaskManager()
{}
};
② ProcessPool.hpp
● Channel类(通道/管道):封装了父子进程间的单向通信管道
• 关键成员:
_wfd:管道写端文件描述符
_subid:子进程PID
_name:通道名称(用于标识)
• 主要方法:
Send():向管道写入任务码
Close():关闭管道
Wait():等待子进程结束
● ChannelManager类(通道管理器):集中管理所有Channel对象
• 关键特性:
使用vector存储Channel对象
采用轮询(round-robin)方式选择Channel
提供批量关闭和等待子进程的方法
• 负载均衡:通过Select()方法实现简单的轮询调度
● ProcessPool类(进程池):主管理类,整合上述组件
• 工作流程:
初始化时注册任务
启动时创建指定数量的子进程
通过Run()方法分发任务
通过Stop()方法关闭和回收子进程
// .hpp文件支持头源混编
#pragma once // 防止头文件被多次包含
#include
#include
#include
#include // stdlib.h stdio.h -> cstdlib cstdio
#include
#include
#include "Task.hpp"
class Channel // 通道(管道)
{
private:
int _wfd;
pid_t _subid;
std::string _name; // 方便打印信道
public:
Channel(int wfd, int subid)
: _wfd(wfd),
_subid(subid)
{
_name = "channel- " + std::to_string(wfd) + " - " + std::to_string(subid);
}
// get方法
int Fd() { return _wfd; }
pid_t Subid() { return _subid; }
std::string Name() { return _name; }
// 发送任务
void Send(int code)
{
// 向_wfd写入整数code, 实现单向发送
int n = write(_wfd, &code, sizeof(code));
(void)n; // 绕过编译器对未使用变量的检查
}
void Close()
{
close(_wfd);
}
void Wait()
{
pid_t rid = waitpid(_subid, nullptr, 0);
(void)rid;
}
~Channel() {};
};
class ChannelManager
{
private:
std::vector _channels; // 管理管道
int _next; // 表示选择的信道
public:
ChannelManager() : _next(0) {};
void InsertChannel(int wfd, pid_t subid)
{
_channels.emplace_back(wfd, subid); // emplace_back()可以直接在容器内构造对象
// Channel c(wfd, subid); // 先构建管道s
// _channels.push_back(std::move(c)); // 再将管道用vector管理起来
}
void PrintChannel()
{
for (auto &channel : _channels)
{
std::cout << channel.Name() << std::endl;
}
}
Channel &Select()
{
auto &c = _channels[_next];
_next++;
_next %= _channels.size(); // 防止越界
return c;
}
void StopSubProcess()
{
for(auto &channel : _channels)
{
channel.Close();
std::cout << "关闭: " << channel.Name() << std::endl;
}
}
void WaitSubProcess()
{
for(auto &channel : _channels)
{
channel.Wait();
std::cout << "回收: " << channel.Name() << std::endl;
}
}
~ChannelManager() {};
};
const int defaultnum = 5;
class ProcessPool
{
private:
ChannelManager _cm; // 进程池
int _process_num; // 进程池中进程的个数
TaskManager _tm; // 对任务的管理模块
public:
ProcessPool(int num) : _process_num(num)
{
// 初始化时注册任务
_tm.Register(PrintLog);
_tm.Register(DownLoad);
_tm.Register(Upload);
}
// 子进程要执行的工作
void Work(int rfd)
{
// std::cout << "我是子进程, 我的rfd是: " << rfd << std::endl;
// sleep(5);
while (true)
{
int code = 0;
ssize_t n = read(rfd, &code, sizeof(code));
if (n > 0)
{ // 读成功了
if (n != sizeof(code))
{
continue; // 读到不规范的, 继续读
}
std::cout << "子进程[" << getpid() << "]收到一个任务码:" << code << std::endl; // 读到规范的
_tm.Execute(code); // 子进程收到任务码后要执行
}
else if (n == 0)
{
std::cout << "进程退出" << std::endl;
break;
}
else
{
std::cout << "读取错误" << std::endl;
break;
}
}
}
bool Start()
{
for (int i = 0; i < _process_num; i++)
{
// 1. 创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
return false;
// 2.创建子进程
pid_t subid = fork();
if (subid < 0)
return false;
else if (subid == 0)
{
// child
// 3. 关闭不用的文件描述符
close(pipefd[1]);
Work(pipefd[0]);
close(pipefd[0]); // 完成工作,关闭读端
exit(0);
}
else
{
// father
// 3. 关闭不用的文件描述符
close(pipefd[0]);
_cm.InsertChannel(pipefd[1], subid); // 在父进程处管理管道
// wfd, subid
}
}
return true;
}
// 测试
void Debug()
{
_cm.PrintChannel();
}
// push任务, 选择信道
void Run()
{
// 1. 选择一个任务
int taskcode = _tm.Code();
// 2. 负载均衡的选择一个子进程(轮询)
auto &c = _cm.Select();
std::cout << "选择一个子进程:" << c.Name() << std::endl;
// 3. 发送任务到子进程
c.Send(taskcode);
std::cout << "发送了一个任务码:" << taskcode << std::endl;
}
void Stop()
{
// 1.关闭所有子进程, 只需要关闭父进程的wfd即可
_cm.StopSubProcess();
// 2.将所有子进程回收
_cm.WaitSubProcess();
}
~ProcessPool() {};
};
③ Main.cc
工作原理
• 初始化阶段:创建进程池对象,注册任务函数;调用Start()方法:创建管道和子进程
子进程进入Work()循环,等待任务,父进程管理所有Channel。
• 任务分发阶段:主进程通过Run()方法:随机选择一个任务码,轮询选择一个子进程,通过管道发送任务码。
• 任务执行阶段:子进程读取管道中的任务码,调用TaskManager执行对应任务。
• 终止阶段:关闭所有管道写端,等待子进程退出。
#include "ProcessPool.hpp"
int main()
{
// 1. 创建进程池对象
ProcessPool pp(defaultnum);
// 2. 启动进程池
pp.Start();
// pp.Debug(); 测试1
// 3. 自动派发任务
int cnt = 5;
while (cnt--)
{
pp.Run();
sleep(1);
}
// 4. 回收结束进程池
pp.Stop();
return 0;
}
④ Makefile
process_pool:Main.cc
g++ -o $@ $^
.PHONY:clean
clean:
rm -f process_pool
⑤ BUG --- 文件描述符的继承和泄漏
如果我们用整合后的 StopAndWait() 代替 WaitSubProcess() 和 StopAndWait()呢。会发生什么?
void StopAndWait()
{
for (auto &channel : _channels)
{
channel.Close();
std::cout << "关闭: " << channel.Name() << std::endl;
channel.Wait();
std::cout << "回收: " << channel.Name() << std::endl;
}
}
问题根源:
1、文件描述符继承:每次 fork() 时,子进程会复制父进程的文件描述符表,包括之前创建的管道描述符。
2、描述符泄漏:父进程虽然关闭了当前管道的写端(pipefd[1]),但之前子进程继承的冗余描述符未被关闭。子进程的 read() 可能阻塞在其他未关闭的管道描述符上(例如继承了前一个子进程的读端)。
3、waitpid() 阻塞:由于子进程仍有未关闭的描述符,它的 read() 可能未检测到 EOF(写端关闭),导致子进程无法退出。父进程的 waitpid() 因此无限等待。
复现路径
1. 父进程创建子进程1:子进程1继承 pipefd1[0] 和 pipefd1[1]。
2. 父进程创建子进程2:子进程2额外继承 pipefd1[0] 和 pipefd1[1](即使它只需要 pipefd2[0])。
3. 父进程调用 StopAndWait():关闭当前管道的写端,但子进程2仍持有 pipefd1[0] 的副本。子进程2的 read(pipefd1[0]) 阻塞,无法退出。
解决问题
方案1:在执行StopAndWait()时,倒着关闭文件描述符表。
void StopAndWait()
{
// 解决方案1:倒着关闭
for (int i = _channels.size() - 1; i >= 0; i--)
{
_channels[i].Close();
std::cout << "关闭: " << _channels[i].Name() << std::endl;
_channels[i].Wait();
std::cout << "回收: " << _channels[i].Name() << std::endl;
}
}
方案2:在子进程中关闭无关描述符,实现一个父进程指向所有管道
_channels 内容的生成时机:
子进程继承的是创建自己时父进程的 _channels 状态。当创建子进程N时,_channels 包含的是子进程1~N-1的管道写端。
当前管道(子进程N的管道)的写端尚未加入 _channels(父进程在 fork() 后才执行 InsertChannel)。
// 由子进程调用, 关闭无关描述符
void CloseAll()
{
for (auto &channel : _channels)
{
channel.Close();
}
}
class ProcessPool
{
// ...
public:
bool Start()
{
for (int i = 0; i < _process_num; i++)
{
// 1. 创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
return false;
// 2.创建子进程
pid_t subid = fork();
if (subid < 0)
return false;
else if (subid == 0)
{
// child
// 让子进程关闭继承下来的哥哥进程的w端
_cm.CloseAll(); // 注:此时子进程拿到的是创建该子进程之前的文件描述符的所有w端(写实拷贝)
// 3. 关闭不用的文件描述符
close(pipefd[1]);
Work(pipefd[0]);
close(pipefd[0]); // 完成工作,关闭读端
exit(0);
}
else
{
// father
// 3. 关闭不用的文件描述符
close(pipefd[0]);
_cm.InsertChannel(pipefd[1], subid); // 在父进程处管理管道
// wfd, subid
}
}
return true;
}
};
class ChannelManager
{
// ...
void StopAndWait()
{
// 解决方案2:让一个父进程指向所有管道
for (auto &channel : _channels)
{
channel.Close();
std::cout << "关闭: " << channel.Name() << std::endl;
channel.Wait();
std::cout << "回收: " << channel.Name() << std::endl;
}
}
};
3.命名管道
命名管道(Named Pipe),也称为 FIFO(First In First Out)文件,是一种特殊的文件类型,用于在不相关进程(无父子关系)之间进行通信(特性)。与匿名管道(pipe())不同,命名管道在文件系统中有一个可见的路径名,任何进程只要知道该路径,都可以访问它。
所以说FIFO是通过打开同一个路径下同一个文件(管道文件),看到同一份资源,从而实现通信的。
(1)创建命名管道
① 命令行创建
$ mkfifo fifo # 创建一个名为 fifo 的命名管道
$ ll
total 8
drwxrwxr-x 2 zyt zyt 4096 May 22 16:24 ./
drwxrwxr-x 27 zyt zyt 4096 May 22 16:19 ../
prw-rw-r-- 1 zyt zyt 0 May 22 16:24 fifo|
命名管道(FIFO)是一种 同步通信机制,写入操作(>)会 阻塞,直到另一端有进程打开管道并开始读取。如果没有任何进程在读取管道,写入操作会一直等待。
# 终端1(写入端)
$ echo "hello fifo">fifo
# 终端2(读取端)
$ cat < fifo
hello fifo
② 系统调用函数
#include
#include
int mkfifo(const char *pathname, mode_t mode);
pathname:要创建的 FIFO 文件的路径名;mode:文件权限模式
成功时返回 0,失败时返回 -1 并设置 errno
(2)实例
① 示例:简单实现一个客户端与服务端的通信
server.cc
// server.cc
#include
#include
#include
#include
#include
#include
#include "comm.hpp"
int main()
{
// 创建管道文件
umask(0);
int n = mkfifo(FIFO_FILE, 0666);
if(n < 0)
{
std::cerr << "mkfifo error" << std::endl;
return 1;
}
std::cout << "mkfifo success" << std::endl;
// 打开管道文件
// w方没有执行open时, r方就会在open内部阻塞,
// 直到有人把管道文件打开了, open才会返回
int fd = open(FIFO_FILE, O_RDONLY);
if(fd < 0)
{
std::cerr << "open fifo error" << std::endl;
return 2;
}
std::cout << "open fifo success" << std::endl;
// read操作
while(true)
{
char buffer[1024];
int number = read(fd, buffer, sizeof(buffer)-1);
if(number > 0) // 成功返回读取的字节数
{
buffer[number] = 0;
std::cout << "client say: " << buffer << std::endl;
}
else if(number == 0)
{
// write端关闭了, read的返回值就是0
std::cout << "client quit ? "<< number << std::endl;
break;
}
else
{
std::cerr << "read error" << std::endl;
break;
}
}
// 删除管道文件
int m = unlink(FIFO_FILE);
if(m == 0)
{
std::cout << "remove fifo success" << std::endl;
}
else
{
std::cout << "remove fifo failed" << std::endl;
}
return 0;
}
client.cc
// client.cc
#include
#include
#include
#include
#include
#include
#include "comm.hpp"
int main()
{
// 打开管道文件
int fd = open(FIFO_FILE, O_WRONLY);
if(fd < 0)
{
std::cerr << "open fifo error" << std::endl;
return 1;
}
std::cout << "open fifo success" << std::endl;
// write操作
std::string message;
int cnt = 1;
pid_t id = getpid();
while(true)
{
std::cout << "Please Enter# ";
std::getline(std::cin, message);
message += ", message number " + std::to_string(cnt++) + ", [" + std::to_string(id) + "]"; // 构建长字符串
write(fd, message.c_str(), message.size()); // message写入到文件
}
close(fd);
return 0;
}
Makefile
.PHONY:all
all : server client
server:server.cc
g++ -o $@ $^ -std=c++11
client:client.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f client server
② 优化:用类封装
// comm.hpp
#pragma once
#include
#include
#include
#include
#include
#include
#define FIFO_FILE "fifo"
#define PATH "."
#define FILENAME "fifo"
// 宏替换, 打印错误信息
// “”是续行符
#define ERR_EXIT(m)
do
{
perror(m);
exit(EXIT_FAILURE);
} while (0)
struct Namedfifo
{
public:
Namedfifo(const std::string path, const std::string name)
: _path(path), _name(name)
{
_fifoname = _path + "/" + _name;
umask(0);
int n = mkfifo(_fifoname.c_str(), 0666);
if (n < 0)
{
//std::cerr << "mkfifo error" << std::endl;
ERR_EXIT("mkfifo");
}
else
{
std::cout << "mkfifo success" << std::endl;
}
}
~Namedfifo()
{
// 删除管道文件
int m = unlink(_fifoname.c_str());
if (m == 0)
{
std::cout << "remove fifo success" << std::endl;
}
else
{
//std::cout << "remove fifo failed" << std::endl;
ERR_EXIT("unlink");
}
}
private:
std::string _path;
std::string _name;
std::string _fifoname;
};
class FileOper
{
public:
FileOper(const std::string &path, const std::string &name)
: _path(path), _name(name), _fd(-1)
{
_fifoname = _path + "/" + _name;
}
void OpenForRead()
{
_fd = open(_fifoname.c_str(), O_RDONLY);
if (_fd < 0)
{
//std::cerr << "open fifo error" << std::endl;
//return;
ERR_EXIT("open");
}
std::cout << "open fifo success" << std::endl;
}
void OpenForWrite()
{
_fd = open(_fifoname.c_str(), O_WRONLY);
if (_fd < 0)
{
// std::cerr << "open fifo error" << std::endl;
// return;
ERR_EXIT("open");
}
std::cout << "open fifo success" << std::endl;
}
void Write()
{
std::string message;
int cnt = 1;
pid_t id = getpid();
while (true)
{
std::cout << "Please Enter# ";
std::getline(std::cin, message);
message += ", message number " + std::to_string(cnt++) + ", [" + std::to_string(id) + "]"; // 构建长字符串
write(_fd, message.c_str(), message.size()); // message写入到文件
}
}
void Read()
{
while (true)
{
char buffer[1024];
int number = read(_fd, buffer, sizeof(buffer) - 1);
if (number > 0) // 成功返回读取的字节数
{
buffer[number] = 0;
std::cout << "client say: " << buffer << std::endl;
}
else if (number == 0)
{
// write端关闭了, read的返回值就是0
std::cout << "client quit ? " << number << std::endl;
break;
}
else
{
// std::cerr << "read error" << std::endl;
// break;
ERR_EXIT("read");
}
}
}
void Close()
{
if (_fd > 0)
close(_fd);
}
~FileOper() {}
private:
std::string _path;
std::string _name;
std::string _fifoname;
int _fd;
};
// server.cc 服务端
#include "comm.hpp"
int main()
{
// 创建管道文件
Namedfifo fifo(PATH, FILENAME);
// 文件操作
FileOper readerfile(".", "fifo");
// 打开管道文件
readerfile.OpenForRead();
// read操作
readerfile.Read();
// 删除管道文件
readerfile.Close();
return 0;
}
// client.cc 客户端
#include
#include
#include
#include
#include
#include
#include "comm.hpp"
int main()
{
// 打开管道文件
FileOper writerfile(PATH, FILENAME);
// write操作
writerfile.OpenForWrite();
writerfile.Write();
writerfile.Close();
return 0;
}
三、system V
System V IPC 是 Unix/Linux 系统中一种经典的进程间通信(IPC)标准,与 POSIX IPC 共同构成传统 UNIX 系统的两大 IPC 通信体系(通信的接口设计,原理,接口具有相似性)。System V IPC 包含以下三种核心机制(共享内存,信号量,消息队列),均在内核中维护全局唯一的标识符和数据结构。
1. system V 共享内存
共享内存允许两个或多个进程共享一个给定的存储区。因为数据不需要在客户进程和服务器进程之间复制,所有这是最快的一种IPC。
1. 映射之后读写直接被对方看到。
2. 不需要进行系统调用获取或写入内容,以指针地址的方式。
(1)原理
① 物理内存共享
内核分配一块物理内存:通过系统调用(如 shmget())在内核中申请一块物理内存区域,称为共享内存段。
• 该区域独立于任何进程,由内核统一管理。
• 生命周期持续到显式释放(shmctl(IPC_RMID))或系统重启。
② 虚拟地址映射
进程通过 shmat() 将共享内存段映射到自己的虚拟地址空间:
• 不同进程的映射地址可以不同(如进程A映射到 0x4000,进程B映射到 0x5000)。
• 通过 页表 将虚拟地址转换为同一物理地址,实现共享。
③ 直接访问
进程通过指针操作共享内存,无需系统调用:
• 写入数据后,其他进程立即可见(无延迟)。
• 性能接近直接访问本地内存。
④ 工作流程
以两个进程共享内存为例:
(2)内核数据结构
共享内存的机制依赖于内核数据结构和物理内存的紧密结合。
① struct shmid_ds(用户可见的元信息)
struct shmid_ds {
struct ipc_perm shm_perm; // 权限控制(uid/gid/mode)
int shm_segsz; // 共享内存大小(字节)
__kernel_time_t shm_atime; // 最后一次attach时间
__kernel_time_t shm_dtime; // 最后一次detach时间
__kernel_time_t shm_ctime; // 创建/修改时间
__kernel_ipc_pid_t shm_cpid;// 创建者PID
__kernel_ipc_pid_t shm_lpid;// 最后一次操作进程PID
unsigned short shm_nattch; // 当前附加进程数(关键!)
// ... 保留字段
};
② struct shmid_kernel(内核实际使用的扩展结构)
shm_file:指向一个特殊的内存文件,该文件直接管理物理页帧。
struct shmid_kernel {
struct shmid_ds u; // 用户可见的shmid_ds
struct file *shm_file; // 关联的虚拟文件(核心!)
unsigned long shm_nattch; // 实际引用计数(与u.shm_nattch同步)
// ... 其他内核专用字段
};
③ struct file 与物理页帧
shm_file->f_mapping:指向共享内存的物理页帧集合(struct address_space)。
物理页:通过 struct page 数组表示,每个页帧对应实际的物理内存。
④ 操作系统如何跟踪共享内存的使用状态?
操作系统通过内核数据结构和引用计数机制精确跟踪共享内存是否被进程使用。struct shmid_kernel 描述字段中:
• shm_nattch是核心计数器,记录有多少进程通过 shmat() 映射了该共享内存段。
• shm_perm.mode:包含标志位 IPC_RMID,标记共享内存是否已被标记为“待销毁”。
(3)标识符和键
• 我们怎么评估共享内存是存在还是不存在?
• 我们怎么确定两个不同的进程拿到的是同一个共享内存?
① 标识符shmid
每个内核中的IPC结构,都用一个非负整数的标识符加以引用(在进程内部唯一标识一个共享内存段)。例如:向一个消息队列发送信息或从一个消息队列取信息,只需要知道其队列标识符。概念与文件描述符类似,但与文件描述符不同,IPC标识符不是小的整数。当一个IPC结构被创建,然后又被删除时,与这种结构相关的标识符连续加1,直至达到一个整数型数的最大正值(INT_MAX),然后又转回0(线性增长策略)。
特点:
• shmid 是内核分配的临时标识符,在系统生命周期内唯一。
• 同一共享内存段在不同进程中的 shmid 可能不同(但指向同一物理内存)。
• 通过 shmget(key, ...) 返回。
② 键key
标识符是IPC对象的内部名。为使多个合作进程能够在同一IPC对象上汇聚,需要提供一个外部命名方案。为此,每个IPC对象都与一个键key相关联,将这个键作为该对象的外部名,key值是由用户层构建并传入的。
无论何时创建IPC结构,都应指定一个键。这个键的数据类型就是基本系统数据类型 key_t ,通常头文件
ftok() :基于文件路径和项目ID计算(依赖文件的 inode)产生一个key。path必须引用一个现有的文件。当产生键时,只使用id参数的低8位。
#include
key_t ftok(const char *path, int id); // 必须保证多进程使用相同的参数
• ftok 创建的键通常是用下列方式构成的:按给定的路径名取得其 stat 结构中的部分 st_dev和 st_ino 字段,然后再将它们与项目ID组合起来(三部分结合生成)。如果两个路径名引用的是两个不同的文件,那么 ftok 通常会为这两个路径名返回不同的键。
• 冲突:【但是,因为 i 节点编号和键通常都存放在长整型中,所以创建键时实际位数可能远超 key_t 的容量,可能会丢失信息。这意味着,对于不同文件的两个路径名,如果使用同一项目ID,那么可能产生相同的键。】
补充知识:key_t key = (st_dev & 0xFF) << 24 | (st_ino & 0xFFFF) << 8 | (proj_id & 0xFF);
用 16 进制打印 key_t 可以直观反映其内部位组合,例如 key=0x1234abcd:
• 高 8 位 0x12 → 设备号 (st_dev) 部分。
• 中间 16 位 0x34ab → i 节点号 (st_ino) 部分。
• 低 8 位 0xcd → 项目 ID (proj_id) 部分。
③ 总结
问题 | 判断依据 |
---|---|
共享内存是否存在? | shmget(key, 0, 0) 是否成功,或 shmctl(shmid, IPC_STAT, &buf) 是否有效。 |
是否同一共享内存? | • 两个进程是否使用相同的 key ,并且能互相读写数据(物理内存相同)。• 即使 shmid 数值不同,只要 key 相同,就是同一共享内存。 |
(4)共享内存接口函数
① shmget()
创建或获取一个共享内存段。若成功返回共享内存的标识码ID(shmid),若出错返回-1.
#include
#include
int shmget(key_t key, size_t size, int shmflg);
参数 | 类型 | 说明 |
---|---|---|
key | key_t | 共享内存的键值,通常由 ftok() 生成,或使用 IPC_PRIVATE 创建私有段。 |
size | size_t | 共享内存段的大小(字节)。若为获取已有段,可设为 0 。 |
shmflg | int | 标志位组合,控制创建/访问权限(见下文)。 |
标志 | 含义 |
---|---|
IPC_CREAT | 若共享内存不存在则创建,若存在打开目标共享文件并返回。 |
IPC_EXCL | 与 IPC_CREAT 联用,若共享内存不存在则创建,若已存在则出错返回。 |
0666 等权限码 | 设置共享内存的读写权限(如 0600 表示仅所有者可读写)。 |
• 使用
IPC_CREAT | IPC_EXCL
保证只要shmget成功返回,就一定是一个全新的共享内存。这样就解决了前面说的创建key值的冲突问题!
#include
#include
#include
#include
int main() {
// 1. 生成键值
key_t key = ftok("/tmp/shmdemo", 'A');
if (key == -1) {
perror("ftok failed");
exit(1);
}
// 2. 创建共享内存
int shmid = shmget(key, 4096, IPC_CREAT | 0666);
if (shmid == -1) {
perror("shmget failed");
exit(1);
}
printf("Shared memory created, shmid=%d
", shmid);
// 3. 使用 shmat 映射内存(略)
// 4. 使用后释放资源shmctl(略)
return 0;
}
验证:共享内存的生命周期持续到显式释放(shmctl(IPC_RMID))或系统重启。
// comm.hpp
#pragma once
#include
#include
#include
#include
#include
#include
// 宏替换, 打印错误信息
// “”是续行符
#define ERR_EXIT(m)
do
{
perror(m);
exit(EXIT_FAILURE);
} while (0)
const int defaultid = -1;
const int gsize = 4096;
const std::string pathname = ".";
const int projid = 0x66; // 项目id
class Shm
{
public:
Shm() : _shmid(defaultid), _size(gsize)
{}
// 创建一个全新的IPC
void Create()
{
key_t k = ftok(pathname.c_str(), projid);
if (k < 0)
{
ERR_EXIT("ftok");
}
printf("key: 0x%x
", k); // 16 进制能直观展示 key 的位组合结构
_shmid = shmget(k, _size, IPC_CREAT | IPC_EXCL);
if (_shmid < 0)
{
ERR_EXIT("shmget");
}
printf("shmid: %d
", _shmid); // 10进制打印, shmid是长整数
}
~Shm()
{}
private:
int _shmid;
int _size;
};
// server.cc
#include "comm.hpp"
int main()
{
Shm shm;
shm.Create();
return 0;
}
$ ./server
key: 0x66030437
shmid: 1
$ ./server # 第二次执行就出错返回
key: 0x66030437
shmget: File exists
$ ipcs -m # 查看共享内存资源
------ Shared Memory Segments --------
key shmid owner perms bytes nattch status
0x66030437 1 zyt 0 4096 0
进程结束了,如果没有删除共享内存,共享内存就会一直存在。
=>共享内存的资源、生命周期随内核!共享内存的生命周期持续到显式释放或系统重启。
删除共享内存:① ipcrm -m shmid ② 代码删除
注意:虽然前面我们解释key是外部名,shmid是内部名。但在用户层删除、控制共享内存,不能用key值,而要用shmid来管理共享内存!(key给内核用来进行唯一性区分的!)
② shmctl()
控制共享内存段的系统调用,可以查询状态、修改属性或删除共享内存段。
#include
#include
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
常用 cmd
控制命令
命令 | 作用 |
---|---|
IPC_STAT | 获取共享内存状态,保存到 buf 。 |
IPC_SET | 修改共享内存属性(如权限),需通过 buf 传入新值。 |
IPC_RMID | 标记删除共享内存段(实际释放需等待 shm_nattch=0 )。 |
IPC_INFO | 获取系统级共享内存限制信息(Linux 特有)。 |
shmid_ds
struct shmid_ds {
struct ipc_perm shm_perm; // 权限结构
size_t shm_segsz; // 段大小(字节)
time_t shm_atime; // 最后附加时间
time_t shm_dtime; // 最后分离时间
time_t shm_ctime; // 最后修改时间
pid_t shm_cpid; // 创建者PID
pid_t shm_lpid; // 最后操作PID
shmatt_t shm_nattch;// 当前附加计数
...
};
struct ipc_perm {
key_t __key; // IPC对象的关键键值,
// 我们的shmget(key...) 会被设置到共享内存的描述结构体中。
uid_t uid; // 所有者的用户ID
gid_t gid; // 所有者的组ID
uid_t cuid; // 创建者的用户ID
gid_t cgid; // 创建者的组ID
unsigned short mode; // 权限模式(类似文件权限)
unsigned short __seq; // 序列号(内部使用)
};
// 在comm.hpp的class Shm内实现的, 可以看上一段代码
void Destory()
{
if (_shmid == defaultid)
return;
int n = shmctl(_shmid, IPC_RMID, nullptr);
if (n > 0)
{
printf("shmctl delete shm : %d success
", _shmid);
}
else
{
ERR_EXIT("shmctl");
}
}
③ shmat()
将共享内存段挂接(attach)到当前进程的地址空间,使其能够直接访问共享内存。如果shmat成功执行,那么内核将使用与该共享内存段相关的shmid_ds结构中的shm_nattch计数器值加1。
#include
void *shmat(int shmid, const void *shmaddr, int shmflg);
参数 | 类型 | 说明 |
---|---|---|
shmid | int | 共享内存标识符(由 shmget() 返回)。 |
shmaddr | const void* | 指定附加固定地址(通常设为 NULL ,由内核自动选择合适地址)。 |
shmflg | int | 控制附加行为的标志位(如 SHM_RDONLY 表示只读访问)。 |
shmflg
的常用标志 :
标志 | 作用 |
---|---|
0 | 默认读写权限。 |
SHM_RDONLY | 以只读方式附加(需共享内存权限允许)。 |
SHM_REMAP | 强制替换 shmaddr 处的现有映射(Linux 特有,谨慎使用)。 |
返回值
成功:返回共享内存段在进程地址空间中的起始地址(类型为 void*)。
失败:返回 (void*) -1,并设置 errno。
④ shmdt()
shmdt() 将从调用进程的地址空间中分离位于 shmaddr 的共享内存段;该内存段必须之前已经通过 shmat() 附加到进程。分离操作不会删除共享内存段 - 该段会一直存在直到使用 shmctl() 配合 IPC_RMID 命令显式删除。
#include
int shmdt(const void *shmaddr);
• shmaddr: 要分离的共享内存段的地址,必须是之前 shmat() 调用返回的地址
• 成功时返回 0,失败时返回 -1 并设置 errno 来指示错误
(5)共享内存使用全流程
① comm.hpp
#pragma once
#include
#include
#include
#include
#include
#include
#include
// 宏替换, 打印错误信息
// “”是续行符
#define ERR_EXIT(m)
do
{
perror(m);
exit(EXIT_FAILURE);
} while (0)
const int defaultid = -1;
const int gsize = 4096;
const std::string pathname = ".";
const int projid = 0x66; // 项目id
const int gmode = 0666;
class Shm
{
public:
Shm() : _shmid(defaultid), _size(gsize), _start_mem(nullptr)
{
}
// 创建一个全新的IPC
void Create()
{
key_t k = ftok(pathname.c_str(), projid);
if (k < 0)
{
ERR_EXIT("ftok");
}
printf("key: 0x%x
", k); // 16 进制能直观展示 key 的位组合结构
// 共享内存的生命周期随内核
_shmid = shmget(k, _size, IPC_CREAT | IPC_EXCL | gmode);
if (_shmid < 0)
{
ERR_EXIT("shmget");
}
printf("shmid: %d
", _shmid); // 10进制打印, shmid是长整数
}
void Destory()
{
if (_shmid == defaultid)
return;
int n = shmctl(_shmid, IPC_RMID, nullptr);
if (n > 0)
{
printf("shmctl delete shm : %d success
", _shmid);
}
else
{
ERR_EXIT("shmctl");
}
}
void Attach()
{
_start_mem = shmat(_shmid, nullptr, 0);
if((long long)_start_mem < 0)
{
ERR_EXIT("shmat");
}
printf("attach success
");
}
void *VirtualAddr()
{
printf("VirtualAddr : %p
", _start_mem);
return _start_mem;
}
~Shm()
{
}
private:
int _shmid;
int _size;
void *_start_mem;
};
② server.cc
#include "comm.hpp"
int main()
{
Shm shm;
shm.Create();
sleep(3);
shm.Attach();
shm.VirtualAddr();
sleep(3);
shm.Destory();
return 0;
}
(6)基于共享内存实现进程间通信
① 无同步机制的进程间通信
// Shm.hpp
#pragma once
#include
#include
#include
#include
#include
#include
#include
// 宏替换, 打印错误信息
// “”是续行符
#define ERR_EXIT(m)
do
{
perror(m);
exit(EXIT_FAILURE);
} while (0)
const int defaultid = -1;
const int gsize = 4096;
const std::string pathname = ".";
const int projid = 0x66; // 项目id
const int gmode = 0666;
// 用户类型
#define CREATER "creater"
#define USER "user"
class Shm
{
private:
// 设置帮助函数(有代码块重复)
void CreateHelper(int flag)
{
printf("key: 0x%x
", _key); // 16 进制能直观展示 key 的位组合结构
// 共享内存的生命周期随内核
_shmid = shmget(_key, _size, flag);
if (_shmid < 0)
{
ERR_EXIT("shmget");
}
printf("shmid: %d
", _shmid); // 10进制打印, shmid是长整数
}
// 创建一个全新的IPC
void Create()
{
CreateHelper(IPC_CREAT | IPC_EXCL | gmode);
}
void Attach()
{
_start_mem = shmat(_shmid, nullptr, 0);
if ((long long)_start_mem < 0)
{
ERR_EXIT("shmat");
}
printf("attach success
");
}
void Destory()
{
// if (_shmid == defaultid)
// return;
int n = shmctl(_shmid, IPC_RMID, nullptr);
if (n > 0)
{
printf("shmctl delete shm : %d success
", _shmid);
}
else
{
ERR_EXIT("shmctl");
}
}
public:
Shm(const std::string &pathname, int projid, const std::string &usertype)
: _shmid(defaultid),
_size(gsize),
_start_mem(nullptr),
_usertype(usertype)
{
_key = ftok(pathname.c_str(), projid);
if (_key < 0)
{
ERR_EXIT("ftok");
}
if(_usertype == CREATER)
Create(); // 初始化时直接创建
else if(_usertype == USER)
Get();
Attach(); // 挂接
}
// 获取共享内存
void Get()
{
CreateHelper(IPC_CREAT);
}
int Size()
{
return _size;
}
void *VirtualAddr()
{
printf("VirtualAddr : %p
", _start_mem);
return _start_mem;
}
~Shm()
{
if(_usertype == CREATER)
Destory();
}
private:
int _shmid;
key_t _key;
int _size;
void *_start_mem;
std::string _usertype; // 用户类型
};
// server.cc
#include "shm.hpp"
// 进程间通信(无同步机制)
int main()
{
Shm shm(pathname, projid, CREATER);
char * mem = (char*)shm.VirtualAddr(); // 起始虚拟地址
// 一旦共享内存建立映射, 通信双方直接能拿到虚拟地址,
// 访问该虚拟地址如同访问自己的堆空间一样,实现高效数据共享。
// 读取共享内存地址操作
while (true)
{
printf("%s
", mem);
sleep(1);
}
return 0;
}
// client.cc
#include "shm.hpp"
// 进程间通信
int main()
{
Shm shm(pathname, projid, USER);
char *mem = (char*)shm.VirtualAddr();
// 写入操作,每隔一秒向虚拟地址写入字母(从A开始)
int index = 0;
for (char c = 'A'; c <= 'Z'; index += 2)
{
// 成对的写入, 但server端读取的时候不会按照我们预期的读取成对了的(无保护机制)
mem[index] = c;
sleep(1);
mem[index + 1] = c;
sleep(1);
}
return 0;
}
我们发现读写共享内存,没有使用系统调用。共享区属于用户空间,可以让用户直接使用。这也解释了前面说共享内存是最快的一种IPC。
1. 映射之后读写直接被对方看到。
2. 不需要进行系统调用获取或写入内容,以指针地址的方式。
执行后:
注:client端没有执行时,server端不会等待client执行,而是照样打印命令行,我们就说通信双方没有所谓的“同步机制”,所以共享内存没有保护机制(对共享内存中数据的保护)。这也是IPC快的原因之一!
为了解决没有“保护机制的问题”,我们可以与前面学的管道相结合使用,看看能不能解决?
② 共享内存+管道
Fifo.hpp + Shm.hpp
// Fifo.hpp
#pragma once
#include
#include
#include
#include
#include
#include
#define FIFO_FILE "fifo"
#define PATH "."
#define FILENAME "fifo"
// 宏替换, 打印错误信息
// “”是续行符
#define ERR_EXIT(m)
do
{
perror(m);
exit(EXIT_FAILURE);
} while (0)
struct Namedfifo
{
public:
Namedfifo(const std::string path, const std::string name)
: _path(path), _name(name)
{
_fifoname = _path + "/" + _name;
umask(0);
int n = mkfifo(_fifoname.c_str(), 0666);
if (n < 0)
{
// std::cerr << "mkfifo error" << std::endl;
ERR_EXIT("mkfifo");
}
else
{
std::cout << "mkfifo success" << std::endl;
}
}
~Namedfifo()
{
// 删除管道文件
int m = unlink(_fifoname.c_str());
if (m == 0)
{
std::cout << "remove fifo success" << std::endl;
}
else
{
// std::cout << "remove fifo failed" << std::endl;
ERR_EXIT("unlink");
}
}
private:
std::string _path;
std::string _name;
std::string _fifoname;
};
class FileOper
{
public:
FileOper(const std::string &path, const std::string &name)
: _path(path), _name(name), _fd(-1)
{
_fifoname = _path + "/" + _name;
}
void OpenForRead()
{
_fd = open(_fifoname.c_str(), O_RDONLY);
if (_fd < 0)
{
// std::cerr << "open fifo error" << std::endl;
// return;
ERR_EXIT("open");
}
std::cout << "open fifo success" << std::endl;
}
void OpenForWrite()
{
_fd = open(_fifoname.c_str(), O_WRONLY);
if (_fd < 0)
{
// std::cerr << "open fifo error" << std::endl;
// return;
ERR_EXIT("open");
}
std::cout << "open fifo success" << std::endl;
}
void Wakeup()
{
// 无论写入什么内容,一个字符就行
char c = 'c';
write(_fd, &c, 1);
}
bool Wait()
{
// 我们要求: 一次只能读取一个字符
char c;
int number = read(_fd, &c, 1);
if (c > 0)
return true;
return false;
}
void Close()
{
if (_fd > 0)
close(_fd);
}
~FileOper()
{
// 删除管道文件
int n = unlink(_fifoname.c_str());
}
private:
std::string _path;
std::string _name;
std::string _fifoname;
int _fd;
};
// Shm.hpp
#pragma once
#include
#include
#include
#include
#include
#include
#include
// 宏替换, 打印错误信息
// “”是续行符
#define ERR_EXIT(m)
do
{
perror(m);
exit(EXIT_FAILURE);
} while (0)
const int defaultid = -1;
const int gsize = 4096;
const std::string pathname = ".";
const int projid = 0x66; // 项目id
const int gmode = 0666;
// 用户类型
#define CREATER "creater"
#define USER "user"
class Shm
{
private:
// 设置帮助函数(有代码块重复)
void CreateHelper(int flag)
{
printf("key: 0x%x
", _key); // 16 进制能直观展示 key 的位组合结构
// 共享内存的生命周期随内核
_shmid = shmget(_key, _size, flag);
if (_shmid < 0)
{
ERR_EXIT("shmget");
}
printf("shmid: %d
", _shmid); // 10进制打印, shmid是长整数
}
// 创建一个全新的IPC
void Create()
{
CreateHelper(IPC_CREAT | IPC_EXCL | gmode);
}
void Attach()
{
_start_mem = shmat(_shmid, nullptr, 0);
if ((long long)_start_mem < 0)
{
ERR_EXIT("shmat");
}
printf("attach success
");
}
// 去关联
void Detach()
{
int n = shmdt(_start_mem);
if(n == 0)
{
printf("shmdt success
");
}
}
void Destory()
{
// if (_shmid == defaultid)
// return;
int n = shmctl(_shmid, IPC_RMID, nullptr);
if (n > 0)
{
printf("shmctl delete shm : %d success
", _shmid);
}
else
{
ERR_EXIT("shmctl");
}
}
public:
Shm(const std::string &pathname, int projid, const std::string &usertype)
: _shmid(defaultid),
_size(gsize),
_start_mem(nullptr),
_usertype(usertype)
{
_key = ftok(pathname.c_str(), projid);
if (_key < 0)
{
ERR_EXIT("ftok");
}
if(_usertype == CREATER)
Create(); // 初始化时直接创建
else if(_usertype == USER)
Get();
Attach(); // 挂接
}
// 获取共享内存
void Get()
{
CreateHelper(IPC_CREAT);
}
int Size()
{
return _size;
}
void *VirtualAddr()
{
printf("VirtualAddr : %p
", _start_mem);
return _start_mem;
}
~Shm()
{
Detach(); // 去关联
if(_usertype == CREATER)
Destory();
}
private:
int _shmid;
key_t _key;
int _size;
void *_start_mem;
std::string _usertype; // 用户类型
};
server.cc
#include "shm.hpp"
#include "Fifo.hpp"
int main()
{
// 在server端先创建共享内存,
// 防止读取阻塞时, client端先以USER身份Get共享内存了(出错)
Shm shm(pathname, projid, CREATER);
// 创建管道
Namedfifo fifo(PATH, FILENAME);
// 文件操作
FileOper readerfile(PATH, FILENAME);
readerfile.OpenForRead();
char *mem = (char *)shm.VirtualAddr(); // 起始虚拟地址
// 读取共享内存地址操作
while (true)
{
// 默认会阻塞, 直到写端写入
if (readerfile.Wait())
{
printf("%s
", mem);
//sleep(1);
}
else
break;
}
readerfile.Close();
return 0;
}
client.cc
#include "shm.hpp"
#include "Fifo.hpp"
// 要想弥补无同步机制的缺陷, 我们可以使用管道
int main()
{
// 文件操作
FileOper writerfile(PATH, FILENAME);
writerfile.OpenForWrite();
Shm shm(pathname, projid, USER);
char *mem = (char*)shm.VirtualAddr();
// 写入操作,每隔一秒向虚拟地址写入字母(从A开始)
int index = 0;
for (char c = 'A'; c <= 'Z'; index += 2)
{
// 向共享内存写入
sleep(1);
mem[index] = c;
mem[index + 1] = c;
c++;
sleep(1);
mem[index + 2] = 0; // 用来保证每次写进去的是字符串
writerfile.Wakeup(); // 唤醒管道, 示意共享内存可以读取了
}
writerfile.Close();
return 0;
}
2. system V消息队列
(1)基本概念
消息队列是一种进程间通信(IPC)机制,它允许不同进程通过发送和接收消息来进行异步通信。消息队列是消息的链接表,存储在内核中,由消息队列标识符标识。
消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法。每个数据块都被认为是有类型的(相当于给消息贴标签),接受者进程接收的数据块可以有不同的类型值。接收进程可以通过类型值实现三种接收模式:
接收模式 | 参数设置 | 行为 | 应用场景 |
---|---|---|---|
先进先出(FIFO) | msgtyp=0 | 读取队列中最早的消息 | 普通队列 |
指定类型 | msgtyp>0 | 读取队列中第一个该类型消息 | 优先处理告警消息(type=1) |
类型阈值 | msgtyp<0 | 读取类型≤ | msgtyp的最小类型消息 |
struct msqid_ds 是 System V 消息队列的核心数据结构,用于描述消息队列的属性和状态。
struct msqid_ds {
struct ipc_perm msg_perm; // 权限控制结构
time_t msg_stime; // 最后一次 msgsnd() 时间
time_t msg_rtime; // 最后一次 msgrcv() 时间
time_t msg_ctime; // 最后一次变更时间
unsigned long __msg_cbytes; // 当前队列字节数(内核使用)
msgqnum_t msg_qnum; // 队列中的消息数量
msglen_t msg_qbytes; // 队列最大允许字节数
pid_t msg_lspid; // 最后一个 msgsnd() 的PID
pid_t msg_lrpid; // 最后一个 msgrcv() 的PID
};
消息队列流程图:
(2)接口函数
① msgget()
#include
#include
#include
int msgget(key_t key, int msgflg);
功能:打开一个现有队列或创建一个新队列。
参数:
key:消息队列的键值(用 ftok() 生成,或使用 IPC_PRIVATE 创建私有队列)。
msgflg:权限标志(如 IPC_CREAT | 0666 表示创建并设置权限)。
返回值:成功:返回消息队列标识符(msqid)。失败:返回 -1,并设置 errno。
② msgctl()
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
功能:控制消息队列(删除、获取/设置属性等)。
参数:
msqid:消息队列标识符。
cmd:控制命令:
IPC_STAT:从msqid_ds结构体获取队列状态(存入 buf)。
IPC_SET:设置队列属性(从 buf 读取)。
IPC_RMID:删除消息队列(buf 可设为 NULL)。
buf:指向 struct msqid_ds 的指针(用于存储或设置队列信息)。
返回值:成功:返回 0。失败:返回 -1,并设置 errno。
• 消息队列的生命周期随内核,所以要删除IPC资源要用 msgctl,或者【ipcrm -p msgid】 删除。查看消息队列用【ipcs- p】。
③ msgsnd()
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
功能:向消息队列发送消息。
参数:
msqid:消息队列标识符(由 msgget() 返回)。
msgp:指向消息结构体的指针(消息格式见下文)。
msgsz:消息正文的大小(不包括 long mtype)。
msgflg:标志位(如 IPC_NOWAIT 表示非阻塞)。
返回值:成功:返回 0。失败:返回 -1,并设置 errno。
// 消息结构体示例:
struct msgbuf {
long mtype; // 消息类型(必须 > 0)
char mtext[100]; // 消息正文(可以是任意数据)
};
④ msgrcv()
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
功能:从消息队列接收消息。
参数:
msqid:消息队列标识符。
msgp:指向接收消息的缓冲区。
msgsz:缓冲区大小。
msgtyp:指定接收哪种类型的消息:
= 0:接收队列中的第一条消息。
> 0:接收类型等于 msgtyp 的第一条消息。
< 0:接收类型 ≤ |msgtyp| 的最小类型的消息。
msgflg:标志位(如 IPC_NOWAIT 非阻塞,MSG_NOERROR 允许截断消息)。
返回值:成功:返回接收到的消息正文的字节数。失败:返回 -1,并设置 errno。
3.systerm V信号量
(1)并发编程,概念铺垫
• 多个执行流(进程),能看到的同一份公共资源:共享资源
• 被保护起来的资源叫做临界资源
• 保护的方式常见:互斥与同步
• 任何时刻,只允许一个执行流访问资源,叫做互斥
• 多个执行流,访问临界资源的时候,具有一定的顺序性,叫做同步
• 系统中某些资源一次只允许一个进程使用,称这样的资源为临界资源或互斥资源
• 在进程中涉及到互斥资源的程序段叫临界区。你写的代码 = 访问临界资源的代码(临界区) + 不访问临界资源的代码(非临界区)
• 所谓的对共享资源进行保护,本质是对访问共享资源的代码进行保护
(2)原子性
• 原子性是指 一个操作(或一系列操作)要么完全执行,要么完全不执行,不会被其他线程/进程打断。原子操作是不可分割的最小执行单元,在执行过程中不会被中断。
在并发编程中,多个线程/进程可能同时访问共享资源,如果操作不是原子的,可能会导致 竞态条件,造成数据不一致。
(3)什么是信号量?
信号量与已经介绍过的IPC机构不同。信号量本质是一个计数器,用于表明临界资源中资源数量的多少,为多个进程提供队共享数据对象的访问。
为了获得共享资源,进程需要执行下列操作。
1. 测试控制该资源的信号量。
2. 若此信号量的值为正,则进程可以使用该资源。在这种情况下,进程会将信号量值减1,表示它使用了一个资源单位。(资源的预定机制)
3. 否则,若此信号量的值为0,则进程进入等待状态,直至信号量大于0。进程被唤醒后,它返回至步骤(1)。
信号量本身也是共享资源,所以对信号量的操作也必须是原子的!为此,信号量通常是在内核中实现的。
当进程不再使用由一个信号量控制的共享资源时,该信号量值加1(V操作)。如果有进程正在休眠等待此信号量,内核会唤醒一个等待进程(按队列顺序或优先级),被唤醒的进程会自动给信号量值减1( P 操作)。
信号量的P/V操作本质:
操作 | 名称 | 行为 |
---|---|---|
P (Proberen) | 等待/获取 | 尝试减少信号量值。若值 ≥ 1,则减 1 并继续;否则进程阻塞(休眠)。 |
V (Verhogen) | 释放/唤醒 | 增加信号量值。若有进程在等待该信号量,则唤醒其中一个,信号量值 保持 0(被唤醒的进程会自动完成其 P 操作)。 |
常用的信号量形式被称为二元信号量,其值只能是 0 或 1。它控制单个资源,其初始值为1。
0:资源被占用,其他进程/线程必须等待。
1:资源可用,可以立即获取。
• 信号量和通信有什么关系?
1. 先访问信号量P,每个进程都得看到同一个信号量,满足进程间通信的前提。
2. 通信的本质是信息传递!不是传递数据才是通信,通知、同步和互斥也是通信!
• 查看信号量【ipcs -s semid】,删除信号量【ipcrm -s】.
(4)信号量接口函数
① semget()
#include
#include
#include
int semget(key_t key, int nsems, int semflg);
功能:创建或获取一个信号量集
参数:
• int nsems 指定信号量集中信号量的数量。
创建新集合时必须指定正确的数量(>0)
获取已有集合时可设为0(内核会忽略)
• int semflg 控制函数行为的标志位
常用组合:
IPC_CREAT | 0666:不存在则创建,并设置权限
IPC_CREAT | IPC_EXCL | 0666:独占创建(若已存在则失败)
0:单纯获取已有信号量集
返回值:成功:返回信号量集标识符(非负整数),失败:返回-1,并设置errno。
② semctl()
#include
#include
int semctl(int semid, int semnum, int cmd, ... /* union semun arg */);
参数说明
semid:信号量集的标识符,通过 semget() 获得。
semnum:指定信号量集中的具体信号量编号,从 0 开始。
cmd:指定对信号量执行的操作命令,常见的命令如下:
IPC_RMID:删除信号量集。
IPC_SET:设置信号量集的访问权限和所有者信息。
IPC_STAT:从 semid_ds 结构获取信号量集的状态信息,存储在 arg.buf 指向的结构中。
SETALL:设置信号量集中所有信号量的值为 arg.array 指向的数组中的值。
SETVAL:设置指定信号量的值,该值由arg.val指定。
...:可变参数,具体取决于 cmd 的值。例如,当 cmd 为 IPC_SET 或 SETVAL 时,需要提供一个联合体 union semun。【注意:这个选项是一个联合,而非指向联合的指针。】
union semun {
int val; // 用于 SETVAL 命令
struct semid_ds *buf; // 用于 IPC_STAT 和 IPC_SET 命令
unsigned short *array; // 用于 GETALL 和 SETALL 命令 【柔性数组(可扩容)】
};
返回值
成功时,返回值取决于 cmd 的类型:
• 如果 cmd 是 GETVAL,返回指定信号量的值。
• 如果 cmd 是 IPC_STAT 或 IPC_SET,返回 0。
• 如果 cmd 是 GETALL 或 SETALL,返回 0。
失败时,返回 -1,并设置 errno。
③ semop()
#include
#include
int semop(int semid, struct sembuf *sops, size_t nsops);
参数:
struct sembuf *sops:指向一个 struct sembuf 类型的数组,每个数组元素定义了一个操作。
size_t nsops:数组 sops 中的元素数量,即要执行的操作数量。
struct sembuf {
unsigned short sem_num; // 信号量编号
short sem_op; // 操作类型
short sem_flg; // 操作标志
};
struct sembuf 是一个结构体,用于定义对信号量的操作,其成员分析如下:
sem_num:指定信号量集中的信号量编号,从 0 开始。
sem_op:指定对信号量的操作类型:
• 正数:释放信号量(V 操作),增加信号量的值。
• 负数:等待信号量(P 操作),减少信号量的值,但只有信号量的值大于等于 -sem_op 时才会成功。
• 0:等待信号量值变为 0。
sem_flg:指定操作标志:
• IPC_NOWAIT:如果操作不能立即完成,则返回错误,而不是阻塞。
• SEM_UNDO:如果进程终止,系统会自动撤销对信号量的修改。
返回值:成功时,返回 0。 失败时,返回 -1,并设置 errno 。
(5)内核信号量集属性
内核为每一个信号量集合维护者一个semid_ds结构,用于描述信号量集属性的结构体,它存储了信号量集的各种信息,包括权限、信号量数量、最后操作时间等。
struct semid_ds {
struct ipc_perm sem_perm; // 权限相关信息
time_t sem_otime; // 最后一次调用 semop() 的时间
time_t sem_ctime; // 最后一次调用 semctl() 的时间
unsigned short sem_nsems; // 信号量集中信号量的数量
};
struct ipc_perm {
key_t key; // 信号量集的键值
uid_t uid; // 所有者的有效用户ID
gid_t gid; // 所有者的有效组ID
uid_t cuid; // 创建者的有效用户ID
gid_t cgid; // 创建者的有效组ID
mode_t mode; // 访问权限模式
};
四、内核是如何组织管理IPC资源的?
内核通过 统一权限模型(kern_ipc_perm)+ 分类资源管理(信号量/消息队列/共享内存)+ 命名空间隔离 实现IPC资源的高效组织。核心逻辑集中在ipc/util.c中提供通用服务,具体资源由各自模块实现,系统调用作为用户入口。
(1)核心数据结构物理布局
// 所有IPC资源的容器,实现资源隔离(每个命名空间独立)。
struct ipc_namespace {
struct ipc_ids ids[3]; // 索引0:信号量, 1:消息队列, 2:共享内存
struct shmem_info shm_info;
// ...(资源限制、挂载点等)
};
// 管理同一类IPC资源的ID集合。
struct ipc_ids {
struct kern_ipc_perm *entries; // 资源指针数组
int in_use; // 当前使用数
unsigned short seq; // 序列号(防ID复用冲突)
struct rw_semaphore rwsem; // 读写锁
};
// 所有IPC资源的公共权限控制头(嵌入在具体资源结构中)。
struct kern_ipc_perm {
key_t key; // 资源标识键
uid_t uid; // 所有者UID
gid_t gid; // 所有者GID
mode_t mode; // 权限模式
int id; // 资源ID
// ...(引用计数、锁等)
};
// ============= struct ipc_namespace 内存布局 =============
+---------------------+
| ipc_namespace |
|---------------------|
| *ids[3] | // 数组指针(每个元素指向一个ipc_ids)
| [0] → sem_ids |━━━━━━━┓
| [1] → msg_ids |━━━━━━━┫
| [2] → shm_ids |━━━━━━━┛
| shm_info |
| ... |
+---------------------+
// ============= struct ipc_ids 内部数组 =============
+---------------------+
| ipc_ids (sem_ids) |
|---------------------|
| *entries |━━━━━━━┓ // 动态指针数组(存储kern_ipc_perm*)
| [0] → sem_array1 |━━━━━┓ ┃
| [1] → NULL | ┃ ┃
| [2] → sem_array2 |━━━┓ ┃ ┃
| ... | ┃ ┃ ┃
| in_use=2 | ┃ ┃ ┃
+---------------------+ ┃ ┃ ┃
▼ ▼ ▼
+---------------+
| sem_array1 | // 实际信号量对象
|---------------|
| kern_ipc_perm |━━━━━━━━━━━━━┓
| key=0x1234 | ▼
| id=0 | +-----+------+
| sem_base |━━━━━→ | sem[0..N] | // 信号量数组
+---------------+ +-----------+
// 动态分配的指针数组entries[](通过idr机制管理)
+---+---+---+---+
| 0 | 1 | 2 |... // 数组索引=资源ID
+---+---+---+---+
| * |NULL| * |... // 指针指向具体资源对象
+---+---+---+---+
| |
| +--> struct sem_array
+----------> struct msg_queue
// 信号量值存储结构sem_array.sem_base(连续内存)
+-----+-----+-----+
| sval| sval| ... | // 每个信号量的当前值
+-----+-----+-----+
| pid | pid | ... | // 最后操作进程PID
+-----+-----+-----+
(2)内核源代码关系图示