cpp-doc-消息队列

概念

消息队列,用于从一个进程向另一个进程发送数据。但仅把数据发送到一个“队列”中,而不指定由哪个进程来接受。

system v消息队列

消息队列,独立与发送消息的进程和接收消息的进程。信号、管道、命名管道都不独立与发送和接收进程。

消息队列,有最大长度限制:MSGMNB;消息队列中的单条消息,也有最大长度限制:MSGMAX

相关函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 获取或创建一个消息队列
// @key:某个消息队列的名称
// @msgflag:可使用IPC_CREAT
// 返回值:成功,返回正整数,即“消息队列标识符”; 失败,返回-1
int msgget(key_t key, int msgflg);

// 发送一个消息,即把消息添加到消息队列中
// @msgid:消息队列标识符
// @msgp:消息指针
// 消息的类型需要自己定义。但要求其第一个结构成员为long int
// struct my_msg_st {
// long int msg_type; /* 消息的类型,取>0, 接收消息时可使用该值 */
// /*other info */
// }
// @msgsz:消息的长度(不包含第一个成员msg_type)
// @msgflg:消息标识
// 如果包含: IPC_NOWAIT, 则消息队列满时,不发送该消息,而立即返回-1
// 如果不包含:IPC_NOWAIT,则消息队列满时,挂起本进程,直到消息队列有空间可用。
// 返回值:成功,返回0;失败,返回-1
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

// 从消息队列中接收一条消息。
// @msgid:消息队列标识符
// @msgp:用于接收消息的缓存
// @msgsz:是msgp指向的消息长度,这个长度不含保护消息类型的那个long int长整型
// @msgtype:它可以实现接收优先级的简单形式
// =0:返回队列第一条消息
// >0:返回队列第一条类型等于msgtype的消息
// <0:返回队列第一条类型小于等于msgtype绝对值的消息
// =IPC_NOWAIT:队列没有可读消息不等待,返回ENOMSG错误
// =MSG_NOERROR:消息大小超过msgsz时被截断
// >0且=MSC_EXCEPT:接收类型不等于msgtype的第一条消息
// @msgflg:
// 如果包含 IPC_NOWAIT, 则当消息队列中没有指定类型的消息时,立即返回-1
// 如果不包含:IPC_NOWAIT,则当消息队列中没有指定类型的消息时,挂起本进程,直到收到指定类型的消息
// 返回值:成功,返回接收到的消息的长度(不包含第一个成员msg_type);失败,返回-1
ssize_t msgrcv (int msqid, void *msgp, size_t msgsz, long msgtype, int msgflg);


// 消息队列的控制函数
// @msgid:由msgget函数返回的消息队列标识码
// @cmd:是将要采取的动作(有三个可取值)
// IPC_STAT:把msgid_ds结构中的数据设置为消息队列的当前关联值
// IPC_SET:在进程有足够权限的前提下,把消息队列的当前关联值设置为msgid_ds数据结构中给出的值
// IPC_RMID:删除消息队列
// @buf:动作所需要传递的参数
// 返回值:成功, 返回 0;失败,返回-1
int msgctl(int msqid, int cmd, struct msqid_ds *buf);

2个进程通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define MSG_SIZE 80

struct my_msg_st {
long int msg_type;
char msg[MSG_SIZE];
};

int main(void)
{
int msgid;
int ret;
struct my_msg_st msg;

msgid = msgget((key_t)1235, 0666|IPC_CREAT);
if (msgid == -1) {
printf("msgget failed!\n");
exit(1);
}

msg.msg_type = 1;
strcpy(msg.msg, "main1 say Hello world1 !");
ret = msgsnd(msgid, &msg, MSG_SIZE, 0);
if (ret == -1) {
printf("msgsnd failed!\n");
exit(1);
}

sleep(5);

strcpy(msg.msg, "main1 say Hello world2 !");
ret = msgsnd(msgid, &msg, MSG_SIZE, 0);
if (ret == -1) {
printf("msgsnd failed!\n");
exit(1);
}

return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>

#define MSG_SIZE 80

struct my_msg_st {
long int msg_type;
char msg[MSG_SIZE];
};

int main(void)
{
int msgid;
int ret;
struct my_msg_st msg;

msgid = msgget((key_t)1235, 0666|IPC_CREAT);
if (msgid == -1) {
printf("msgget failed!\n");
exit(1);
}

msg.msg_type = 0;
ret = msgrcv(msgid, &msg, MSG_SIZE, 0, 0);
if (ret == -1) {
printf("msgrcv failed!\n");
exit(1);
}

printf("main2 received: %s\n", msg.msg);

ret = msgctl(msgid, IPC_RMID, 0);
if (ret == -1) {
printf("msgctl(IPC_RMID) failed!\n");
exit(1);
}

return 0;
}

posix 消息队列

相关函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// 用来创建和访问一个消息队列
// @name:某个消息队列的名字
// 必须以/开头,并且后续不能有其他的/
// 长度不能超过NAME_MAX
// @oflag:与open函数类似,可以是
// O_RDONLY
// O_WRONLY
// O_RDWR
// O_CREAT
// O_EXCL
// O_NONBLOCK
// @mode:如果oflag是O_creat,需要设置mode
// return:成功返回消息队列文件描述符;失败返回-1
mqd_t mq_open(const char* name, int oflag);
mqd_t mq_open(const char* name, int oflag, mode_t mode, struct mq_attr* attr);

// 关闭消息队列
// @mqdes:消息队列描述符
// return:成功返回0;失败返回-1
mqd_t mq_close(mqd_t mqdes);

// 删除消息队列
// 删除连接数,连接数为0时才是正在的删除
// @name:消息队列名称
// return:成功返回0;失败返回-1
mqd_t mq_unlink(const char* name);

// 获取/设置消息队列属性
// return:成功返回0;失败返回-1
mqd_t mq_getattr(mqd_t mqdes, struct mq_attr* attr);
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr* newatr, struct mq_attr* oldattr);

// 发送消息
// @mqdes:消息队列描述符
// @msg_ptr:指向消息的指针
// @msg_len:消息长度
// @msg_prio:消息优先级
// return:成功返回0;失败返回-1
mqd_t mq_send(mqd_t mqdes, const char* msg_ptr, size_t msg_len, unsigned msg_prio);

// 接收消息
// 注意:返回指定消息队列中最高优先级的最早消息
// @mqdes:消息队列描述符
// @msg_ptr:返回可接收到的消息
// @msg_len:消息长度
// 长度需要指定为每条消息长度的最大值
// @msg_prio:返回接收到的消息的优先级
// return:成功返回接收到的消息字节数;失败返回-1
ssize_t mq_receive(mqd_t mqdes, char* msg_ptr, size_t msg_len, unsigned* msg_prio);



// 建立或者删除消息到达通知事件
// 注意:
// 任何时刻只能有一个进程可以被注册为接收某个给定队列的通知
// 当有一个消息到达某个先前为空的队列,而且已由一个进程被注册为接收该队列的通知时,只有没有任何线程阻塞在该队列的mq_receive调用的前提下,通知才会发出
// 当通知被发送它的注册进程时,其注册被撤销。进程必须再次调用mq_notify以重新注册(如果需要的话),重新注册要放在消息队列读出消息之前而不是之后。
// 通知方式:
// 产生一个信号
// 创建一个线程执行一个值得的函数
// @mqdes:消息队列描述符
// @notification:
// 非空表示当消息到达且消息队列先前为空,那么将得到通知
// NULL表示撤销已注册的通知
// return:成功返回0;失败返回-1
mqd_t mq_notify(mqd_mqdes, const struct sigevent* notification);