Linux系统编程——POSIX IPC

    技术2025-01-19  11

    POSIX IPC

    POSIX IPC中的xxx_open()中的一些意义相同参数的取值: xxx可以是mq(消息队列)、sem(信号量)、shm(共享内存)

    参数oflag:

    O_RDONLY - 可读 O_WRONLY - 可写 O_RDWR - 可读可写 O_CREAT - 队列不存则创建,需要使用mode和attr参数 如果文件存在,mode和attr参数被忽略 O_EXCL - 如果使用O_CREAT,消息队列存在则创建失败 O_NONBLOCK - 非阻塞模式打开

    参数mode:

    S_IRWXU —— 文件拥有者可读可写可执行 S_IRUSR —— 用户可读 S_IWUSR —— 用户可写 S_IXUSR —— 用户可执行 S_IRWXG —— 文件拥有者所在组成员可读可写可执行 S_IRGRP —— 组用户可读 S_IWGRP —— 组用户可写 S_IXGRP —— 组用户可写 S_IRWXO —— 其他用户拥有可读可写可执行权限 S_IROTH —— 其他用户可读 S_IWOTH —— 其他用户可写 S_IXOTH —— 其他用户可执行

    消息队列

    消息队列也具备描述符,消息队列的描述符与文件的描述符类似 消息队列的描述符是一个进程级别的句柄 子进程会继承父进程打开的消息队列描述符

    创建或打开消息队列

    #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <mqueue.h> mqd_t mq_open(const char *name, int oflag); mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr); name —— 消息队列名字(消息队列标识) oflag —— 位掩码,mq_open()操作选项,与文件的open()的flag参数差不多: O_RDONLY - 可读 O_WRONLY - 可写 O_RDWR - 可读可写 O_CREAT - 队列不存则创建,需要使用mode和attr参数 如果文件存在,mode和attr参数被忽略 O_EXCL - 如果使用O_CREAT,消息队列存在则创建失败 O_NONBLOCK - 非阻塞模式打开 mode —— 消息队列权限掩码,与文件的权限掩码一致,但执行权限无效 attr —— 消息队列的属性,用于指定消息队列的最大数量和最大大小 struct mq_attr { long mq_flags; /* oflag */ long mq_maxmsg; /* 消息队列消息最大数量 */ long mq_msgsize; /* 每条消息的大小 */ long mq_curmsgs; /* 消息队列中当前存在的消息数量 */ }; RETURN VALUE 成功返回消息队列描述符,失败返回-1

    关闭消息队列

    #include <mqueue.h> int mq_close(mqd_t mqdes); mqdes —— 消息队列描述符 RETURN VALUE 成功返回0, 失败返回-1

    删除消息队列

    删除name标识的消息队列,标记其在所有使用它的进程使用完后销毁该消息队列

    #include <mqueue.h> int mq_unlink(const char *name); name —— 消息队列名字 RETURN VALUE 成功返回0,失败返回-1

    获取消息队列的属性

    #include <mqueue.h> int mq_getattr(mqd_t mqdes, struct mq_attr *attr); mqdes —— 消息队列描述符 attr —— 消息队列属性 RETURN VALUE 成功返回0,失败返回-1

    设置消息队列的属性

    #include <mqueue.h> int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr); mqdes —— 消息队列描述符 newattr —— 用来设置的消息队列属性 oldattr —— 获取消息队列的旧属性 RETURN VALUE 成功返回0,失败返回-1

    发送消息

    #include <time.h> #include <mqueue.h> int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio); //发送消息数量达到attr字段mq_maxmsg进程将阻塞 int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abs_timeout); mqdes —— 消息队列描述符 msg_ptr —— 发送的消息 msg_len —— 消息的大小 msg_prio —— 消息的优先级,posix的消息队列可以设置优先级, 优先级高的消息永远放在队列靠前的位置 mq_timedsend()的行为与mq_send()类似, 但如果队列已满,并且没有为消息队列描述启用O_NONBLOCK标志, 则将abs_timeout指向指定调用将阻塞多长时间的结构 struct timespec { time_t tv_sec; /* seconds */ long tv_nsec; /* nanoseconds */ }; RETURN VALUE 成功返回0, 失败返回-1

    接收消息

    #include <mqueue.h> #include <time.h> ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio); ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abs_timeout); mqdes —— 消息队列描述符 msg_ptr —— 发送的消息 msg_len —— 消息的大小,必须大于消息队列属性attr中mq_msgsize字段 msg_prio —— 消息的优先级 mq_timedreceive()的行为与mq_receive()类似,但如果队列是空的, 并且没有为消息队列描述启用O_NONBLOCK标志, 则将abs_timeout指向指定调用将阻塞多长时间的结构 RETURN VALUE 成功返回接收到的数据大小,失败返回-1

    代码

    /********************************************************************** * @file: xxx.c * @author: guangjieMVP * @version: v1.00.00 * @date: 2020-xx-xx * @github: https://github.com/guangjieMVP * @brief: ************************************************************************/ #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <mqueue.h> #include <string.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #define handler_err(err) \ do{ perror(err); exit(0); } while(0) char father_buf[] = "I am your father"; int main(int argc, char **argv) { struct mq_attr m_attr; m_attr.mq_maxmsg = 10; m_attr.mq_msgsize = 40; mqd_t mq_fd = mq_open("/mq", O_RDWR | O_CREAT, S_IRWXU | S_IRWXG, &m_attr); if (mq_fd < 0) { handler_err("mq_open"); } struct mq_attr get_attr; int ret = mq_getattr(mq_fd, &get_attr); if (ret < 0) { mq_close(mq_fd); handler_err("mq_getattr"); } printf("maxmsg = %ld, msgsize = %ld\r\n", get_attr.mq_maxmsg, get_attr.mq_msgsize); pid_t pid = fork(); if (pid == 0) { unsigned int msg_prio = 0; char msg_ptr[100]; while(1) { if (mq_receive(mq_fd, msg_ptr, sizeof(msg_ptr), &msg_prio) < 0)//msg_ptr的大小必须大于或等于m_attr.mq_msgsize { handler_err("mq_receive"); } printf("%s\r\n", msg_ptr); } } else { while (1) { if ( mq_send(mq_fd, father_buf, sizeof(father_buf), 1) < 0) { handler_err("mq_send"); } printf("father is running\r\n"); sleep(1); } } mq_close(mq_fd); return 0; }

    信号量

    进程或线程间同步、互斥访问共享资源 信号量属于系统范畴,进程退出也不会自动销毁,需要手动删除释放资源

    使用信号量的程序编译时需要链接pthread库:在编译时加上下面编译选项

    -pthread

    信号量类型

    有名信号量

    拥有名字,不同进程或线程之间可以通过名字访问同一信号量

    无名信号量

    特点:

    轻量化、基于内存(不存在于任何文件系统中) 多用于进程内部线程间的同步互斥 没有名字,存放在内存预先指定位置 可以在进程或线程之间共享: 进程之间共享时,必须位于进程共享内存区域中 线程之间共享,信号量存放于线程共享内存区域中

    有名信号量

    打开or创建信号量

    #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <semaphore.h> sem_t *sem_open(const char *name, int oflag); sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value); name —— 信号量名字 oflag —— 信号量打开选项掩码 mode —— 信号量的权限 value —— 信号量的初始化值 RETURN VALUE 成功返回信号量的句柄,失败返回SEM_FAILED

    关闭信号量

    #include <semaphore.h> int sem_close(sem_t *sem); sem —— 信号量句柄 RETURN VALUE 成功返回0,失败返回-1

    删除信号量

    #include <semaphore.h> int sem_unlink(const char *name); name —— 信号量名字 RETURN VALUE 成功返回0; 失败返回-1

    释放信号量

    #include <semaphore.h> int sem_post(sem_t *sem); sem —— 信号量句柄 RETURN VALUE 成功返回0,失败返回 -1,信号量的值不变

    等待信号量

    #include <semaphore.h> int sem_wait(sem_t *sem); sem —— 信号量句柄 RETURN VALUE 成功返回0,失败返回 -1,信号量的值不变

    获取信号量当前值

    #include <semaphore.h> int sem_getvalue(sem_t *sem, int *sval); sem —— 信号量句柄 sval —— 存放获取的信号量值 RETURN VALUE 成功返回0,失败返回 -1

    代码

    #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <semaphore.h> #include <unistd.h> #include <stdio.h> #include <string.h> #include <stdlib.h> #define handler_err(err) \ do{ perror(err);} while(0) #define SEM_NAME "/name_sem" //信号量的名字 int main(int argc ,char **argv) { sem_t *name_sem = sem_open(SEM_NAME, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG, 0); if (name_sem == SEM_FAILED) { handler_err("sem_open"); exit(0); } pid_t pid = fork(); if (pid == 0) { int ret; while (1) { ret = sem_wait(name_sem); if (ret == 0) { printf("child is running\r\n"); } } } else { char buf[100]; char *ret; while (1) { ret = fgets(buf, sizeof(buf), stdin); if (ret != NULL) { printf("father : %s\r\n", buf); } if (strstr(buf, "post") != NULL) { sem_post(name_sem); //释放信号量 } } } sem_close(name_sem); //关闭信号量 sem_unlink(SEM_NAME); //删除信号量 return 0; }

    无名信号量

    一般多用于进程的线程间通信

    初始化无名信号量

    #include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value); sem —— 信号量句柄 pshared —— 无名信号量共享方式: pshared = 0,同一个进程的线程之间共享 pshared != 0, 不同进程之间之间共享,无名信号量必须存在 共享内存区域(POSIX共享内存、system V共享内存、内存映射) value —— 信号量的初始化值 RETURN VALUE 成功返回0; 失败返回-1

    销毁无名信号量

    不存在进程或者线程等待时才能够安全销毁信号量

    #include <semaphore.h> int sem_destroy(sem_t *sem); sem —— 信号量句柄 RETURN VALUE 成功返回0; 失败返回-1

    代码

    #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <semaphore.h> #include <unistd.h> #include <stdio.h> #include <string.h> #include <stdlib.h> #include <sys/mman.h> #define handler_err(err) \ do{ perror(err); } while(0) int main(int argc ,char **argv) { sem_t *p_sem = (sem_t *)mmap(NULL, sizeof(sem_t), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); //MAP_ANONYMOUS 匿名映射 if (p_sem != MAP_FAILED){ printf("sem addr : %p\r\n", p_sem); }else{ handler_err("mmap"); exit(0); } if (sem_init(p_sem, 2, 0) < 0) { //初始化无名信号量 handler_err("sem_open"); exit(0); } pid_t pid = fork(); if (pid == 0){ int ret; int sem_val; int cnt = 0; while (1){ ret = sem_wait(p_sem); if (ret == 0){ printf("child is running\r\n"); }else{ perror("sem_wait"); } sem_getvalue(p_sem, &sem_val); printf("sem val : %d\r\n", sem_val); printf("cnt : %d\r\n", cnt++); } }else{ char buf[100]; char *ret; int sem_val; while (1){ ret = fgets(buf, sizeof(buf), stdin); if (ret != NULL){ // printf("father : %s\r\n", buf); } if (strstr(buf, "post") != NULL){ if (sem_post(p_sem) < 0){ //释放信号量 perror("sem_post"); } sem_getvalue(p_sem, &sem_val); printf("sem val : %d\r\n", sem_val); } } } sem_close(p_sem); sem_destroy(p_sem); return 0; }

    共享内存

    共享内存的使用一般要加上互斥,防止多进程写入数据造成数据践踏

    使用步骤:

    1、shm_open()打开一个共享内存对象

    2、ftruncate()设置共享内存对象大小

    3、mmap()使用flags指定MAP_SHARED进行内存映射

    shm_open()、mmap()分别类似system V 共享内存的shmget()、shmat()操作

    创建共享内存对象

    #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> int shm_open(const char *name, int oflag, mode_t mode); name —— 共享内存对象名字 oflag —— 打开方式选项 mode —— shm权限 RETURN VALUE 成功返回非负的文件描述符,失败返回-1

    新创建的共享内存对象长度会被设置为0,需调用**ftruncate()**函数设置长度,新增加的字节的值被初始化为0

    设置共享内存对象的长度

    #include <unistd.h> #include <sys/types.h> int ftruncate(int fd, off_t length); fd —— 描述符(文件描述符、共享内存描述符等) length —— 设置的长度 RETURN VALUE 成功返回0,失败返回-1

    删除共享内存对象

    #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> int shm_unlink(const char *name); name —— 共享内存对象名字 RETURN VALUE 成功返回0,失败返回-1 /********************************************************************** * @file: xxx.c * @author: guangjieMVP * @version: v1.00.00 * @date: 2020-xx-xx * @github: https://github.com/guangjieMVP * @brief: ************************************************************************/ #include <sys/mman.h> #include <sys/stat.h> /* For mode constants */ #include <fcntl.h> /* For O_* constants */ #include <unistd.h> #include <stdio.h> #include <string.h> #include <stdlib.h> #include <semaphore.h> #include <sys/types.h> #define handler_err(err) \ do{ perror(#err); } while(0) #define SHM_NAME "/shm_test" #define SEM_NAME "/name_sem" int main(int argc ,char **argv) { int shm_fd = shm_open(SHM_NAME, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG); //打开共享内存 if (shm_fd < 0){ handler_err(shm_open); exit(0); } if (ftruncate(shm_fd, 1024) < 0){ //设置共享内存大小 handler_err(ftruncate); exit(0); } struct stat shmstat; int ret = fstat(shm_fd, &shmstat); //获取共享内存信息 if (ret < 0){ handler_err(fstat); exit(0); } printf("shm size : %ld\r\n", shmstat.st_size); //内存映射 char *p = mmap(NULL, shmstat.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); if (p == MAP_FAILED){ handler_err(mmap); exit(0); } sem_t *name_sem = sem_open(SEM_NAME, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG, 0);//打开信号量 if (name_sem == SEM_FAILED){ handler_err(sem_open); exit(0); } pid_t pid = fork(); if (pid == 0){ while(1){ ret = sem_wait(name_sem); if (ret == 0) { printf("child is running\r\n"); } printf("%s\r\n", p); } }else{ int cnt = 0; // char buf[40]; while(1){ sprintf(p, "Hello, Linux %d\r\n", cnt++); sem_post(name_sem); //释放信号量 sleep(1); } } if ( shm_unlink(SHM_NAME) < 0){ handler_err(shm_unlink); } return 0; }
    Processed: 0.009, SQL: 9