Tuesday, October 14, 2008

interprocess communication IPC (1) - message queue

Beej's Guide
Advanced Linux Programming

IPC methods:
  • Signal
  • Pipe
  • Named Pipe
  • Message Queue
  • Shared Memory
Message Queue:

/* use ftok(...) to generate a key */
/* key_t ftok(const char* path, int id) */

key_t key = ftok("/home/someone/somefile", 'b');

/* create or connect to an existing queue */
/* int msgget(key_t key, int msgflg) */
/* 666 -> rw-rw-rw- */
int msqid = msgget(key, 0666|IPC_CREAT);

/* each message is made up of two parts */
/* mtype is set to any positive number as a message type */
/* mdata is any data struct that will be added to the queue */
struct pirate_msgbuf {
long mtype; /* must be positive */
char name[30];
char ship_type;
int notoriety;
int cruelty;
int booty_value;
};

/* pass the message to the queue */
/* int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
struct pirate_msgbuf pmb = {2, "L'Olonais", 'S', 80, 10, 12035};
msgsnd(msqid, &pmb, sizeof(pmb), 0);

/* retrieve message from the queue */
struct pirate_msgbuf pmrecv;
/* int msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); */
msgrcv(msqid, &pmrecv, sizeof(pmb), 2, 0);

/* destroy a message queue */
msgctl(msqid, IPC_RMID, NULL);


beej's guide provides a pair of nice example source
kirk.c spock.c
start kirk and spock, you can send text through kirk and receive them through spock.
Also you can try to add mtype to enable multiple kirks and spocks.

Here is a little piece of code of my test:
I am using fork() to create a kid process to read characters from stdin,
then using the parent process to simulate running a simple algorithm based on the message read by kid.

/* interprocess communication: Message Queue */

#include < stdio.h >
#include < stdlib.h >
#include < ctype.h >
#include < sys/ipc.h >
#include < sys/msg.h >
#include < unistd.h >
#include < time.h >

const int ARR_SIZE = 8;


struct mymsgbuf{
long mtype; // a positive integer:
int a[ARR_SIZE];
};

int main(int argc, char* argv[]) {

int N = 0;
if(argc < 2)
N = 100;
else
N = atoi(argv[1]);

printf("number of computations = %d\n", N*N);

key_t key = ftok("ab.", 'm');
int msgq_id; // message queue id

//initializes a new message queue
if((msgq_id = msgget(key, IPC_CREAT|0660)) == -1) {
printf("msgget error\n");
exit(1);
} else {
printf("msg queue id %d created\n", msgq_id);
}

if(fork() == 0) {
printf("this is child process\n");
mymsgbuf qbuf;
qbuf.mtype = 1;

int j = 0;
while(!feof(stdin)) {
for(int i = 0; i < ARR_SIZE; ++i) {
scanf("%d ", &qbuf.a[i]);
}
if((msgsnd(msgq_id, &qbuf, sizeof(mymsgbuf)-sizeof(long), 0)) == -1) {
printf("error in msgsnd\n");
exit(1);
}
++j;
}
// the last message
// use mtype = 2 to notify the parent process
qbuf.mtype = 2;
msgsnd(msgq_id, &qbuf, sizeof(mymsgbuf)-sizeof(long),0);
printf("send %d\n", j);
return 0;
} else { // in parent process
printf("this is parent process\n");
mymsgbuf qbuf;
int j = 0;
while(msgrcv(msgq_id, &qbuf, 200, 0, 0)) {
if(qbuf.mtype == 2) break;

int total = 0;
int t = N;

/* just doing some work */
while(t--) {
for(int i = 0; i < ARR_SIZE; ++i)
for(int k = 0; k < ARR_SIZE; ++k)
total += qbuf.a[i] * qbuf.a[k];
}
++j;
}
wait(); // wait until the child process finish
printf("recv %d\n", j);
msgctl(msgq_id, IPC_RMID, 0); // delete the message queue
printf("msg q removed\n");
}

// printf("time taken %d\n", clock()-startt);
return 0;
}

running output:time ./msgq 100 < tmp
number of computations = 10000
msg queue id 557057 created
this is child process
this is parent process
send 39036
recv 39036
msg q removed

real 0m1.452s
user 0m1.384s
sys 0m0.064s