歡迎您光臨本站 註冊首頁

UNIX中的多線程數據共享與線程同步

←手機掃碼閱讀     火星人 @ 2014-03-12 , reply:0
  作者:楊海平 姚洪利 本文選自:中國計算機報 2001年12月18日



在UNIX中,一個進程讓另外實體進行某項事務而採取的操作為fork的一個子進程,子進程只是將父進程的數據區拷貝一份到自己的數據區。在符合POSIX標準的UNIX操作系統下,同一個進程的線程之間共享進程指令、大多數數據(線程私有數據除外)、信號處理方式、進程運行環境等。由於線程共享進程的全局變數,因此可以採用用戶自己編寫的消息隊列來實現數據的共享。

建立多任務模型,並用線程來實現

符合POSIX標準的UNIX操作系統提供了線程的控制函數,如:線程的創建和終止、線程之間的互斥、線程之間的同步等。利用這些系統函數可以成功地模擬消息隊列,來實現線程間數據共享和同步,以完成多任務的實時性。為成功地描述線程間數據共享和同步,以下列任務模型為例。

首先建立消息隊列屬性數據結構
#define MAXQUEUE 30
typedef struct mq_attrib {
char name[20];
pthread_mutex_t mutex_buff;
pthread_mutex_t mutex_cond;
pthread cond_t cond;
int maxElements;
int elementLength;
int curElementNum;
caddr_t buff;
}mq_attrib,mq_attribstruct,mq_attrib_t;
mq_attrib_t msqueue[MAXQUEUE];


數據結構定義了消息隊列的名字name,最大消息個數maxElements,單個消息長度elementLength,當前消息個數curElementNum,存放消息的緩衝區buff,保護緩衝區鎖mutex_buff,線程同步條件變數cond,保護線程同步條件變數鎖mutex_cond。

消息隊列的創建
依據此數據結構進行消息隊列的創建,函數為msqueue_create(參數解釋:name消息隊列名,maxnum消息的最大個數,length單個消息的長度)。

int msqueue_create( name, maxnum, length )
char name;
int maxnum,length;
{
int i;
for ( i=0; i
if ( msqueue[i]==NULL )break;
//如果消息隊列全部被分配,返回錯
if ( i==MAXQUEUE ) return MQERROR;
msqueue[i]=malloc(sizeof(mq_attribstruct));
sprintf( msqueue[i]->name, "%s", name);
msqueue[i]->maxElements = maxnum;
msqueue[i]->elementLength = length;
msqueue[i]->curElementNum = 0;
msqueue[i]->buff=malloc(maxnum?length);
//對保護鎖進行初始化
pthread_mutex_init(&&msqueue[i]
->mutex_buff, NULL);
pthread_mutex_init(&&msqueue[i]
->mutex_cond, NULL);
//對線程同步條件變數初始化
pthread_cond_init(&&msqueue[i]->cond, NULL);
return i;
}


應用消息隊列進行消息的發送和接收
發送消息到消息隊列:

消息隊列的發送和接收是在不同的線程中進行的。首先介紹發送消息到消息隊列的函數:

int msqueue_send ( id, buff, length )
int id, length;
caddr_t buff;
{
int pos;
//消息隊列id錯,返回錯
if ( id<0 || id>= MAXQUEU ) return MQERROR;
//消息長度與創建時的長度不符,返回錯
if ( length != msqueue[id]->elementLength ) return MQERROR;
//消息隊列滿,不能發送
if ( msqueue[id]->curElementNum >= msqueue[id]->maxElements )
return MQERROR;
//在對消息隊列緩衝區操作前,鎖住緩衝區,以免其他線程操作
pthread_mutex_lock ( &&msqueue[id]->mutex_buff );
pos = msqueue[id]->curElementNum * msqueue[id]->elementLength;
bcopy ( buff, &&msqueue[id]->buff[pos], msqueue[id]->elementLength );
msqueue[id]->curElementNum ++;
pthread_mutex_unlock ( &&msqueue[id]->mutex_buff );
//如果插入消息前,消息隊列是空的,插入消息后,消息隊列為非空,則通知等待從消息隊列取消息的線程,條件滿足,可以取出消息進行處理
if ( msqueue[id]->curElementNum == 1 ) {
pthread_mutex_lock ( &&msqueue[id]->mutex_cond );
pthread_cond_broadcast ( &&msqueue[id]->cond );
pthread_mutex_unlock ( &&msqueue[id]->mutex_cond );
}
return length;
}


從消息隊列中接收消息:

消息隊列的接收函數 msqueue_receive,其參數:id為消息隊列數組的索引號,buff為消息內容,length為消息長度。

int msqueue_receive ( id, buff, length )
int id, length;
caddr_t buff;
{
caddr_t temp;
int pos;
if(id<0||id>=MAXQUEUE)return MQERROR;
if(length != msqueue[id]->elementLength)
return MQERROR;
//如果消息隊列為空,則等待,直到消息隊列為非空條件滿足
if ( msqueue[id]->curElementNum == 0){
pthread_mutex_lock ( &&msqueue[id]->mutex_cond );
pthread_cond_wait ( &&msqueue[id]->cond, &&msqueue[id]->mutex_cond );
pthread_mutex_unlock ( &&msqueue[id]->mutex_cond );
}
//取消息前,鎖住消息隊列緩衝區,以免其他線程存放或取消息
pthread_mutex_lock ( &&msqueue[id]->mutex_buff );
//為符合消息隊列FIFO特性,取出消息后,進行消息隊列的調整
temp =
malloc((msqueue[id]->curElementNum-1)
msqueue[id]-elementLength );
bcopy ( &&msqueue[id]->buff[0], buff, msqueue[id]->elementLength );
msqueue[id]->curElementNum --;
bcopy ( &&msqueue[id]->buff[msqueue[id]->elementLength], temp,
msqueue[id]->elementLength
msqueue[id]->curElementNum);
bcopy ( temp, &&msqueue[id]->buff[0],
msqueue[id]->elementLength
msqueue[id]->curElementNum);
free ( temp );
//解除緩衝區鎖
pthread_mutex_unlock ( &&msqueue[id]->mutex_buff );
return length;
}


多任務模型的實現

在討論完消息隊列的創建、刪除、發送和接收后,下面講述消息隊列在線程中的應用以實現多任務線程間的數據共享。

首先在main主函數中創建消息隊列和線程:

//定義全局變數
Int msqueue_record, msqueue_process;
Void main()
{
pthread_t pthreadID1;
//創建消息隊列,用於線程間通信
msqueue_record = msqueue_create ( 「record」, 200, 200);
msqueue_process = msqueue_create ( 「process」, 200, 200);
//創建數據採集線程
pthread_create ( &&pthreadID1, NULL, receiveData, NULL);
//創建數據處理線程
pthread_create ( &&pthreadID2, NULL, process, NULL);
//創建數據記錄線程
pthread_create ( &&pthreadID1, NULL, record, NULL);
//等待進程結束
wait_thread_end( );
}


數據採集線程:

void receiveData( )
{
int count;
unsigned char buff[200];
for(;;) {
//從數據口採集數據,並將數據放置於buff中
//wait_data_from_data_port( buff )
//將數據寫入消息隊列msqueue_record中
msqueue_send ( msqueue_record, buff, 200 );
//將數據寫入消息隊列msqueue_process中
msqueue_send ( msqueue_process, buff, 200 );
}
}


記錄線程函數:

void record ( )
{
int num, count;
unsigned char buffer[200];
for ( ;; ) {
count = msqueue_receive ( msg_record, &&buffer, 200 );
if ( count < 0) {
perror ( "msgrcv in record");
continue;
}
//將取到的消息進行記錄處理
//record_message_to_lib();
}
}


數據處理線程函數:

int process( )
{
int count;
unsigned char buffer[200];
for ( ;; ) {
count = msqueue_receive ( msg_process, &&buffer, 200 );
if ( count < 0) {
perror ( "msgrcv in record");
continue;
}
//將取到的消息進行處理
//process_message_data()
}
}


在實現多任務系統時,作者曾經做過以下三種實現方法的比較:進程間通信採用IPC機制,線程間通信採用進程通信方式IPC,線程間通信採用基於作者開發的消息隊列。結果表明:利用用戶下的數據區進行線程間通信的速度最快,效率最高,而IPC方式慢。(作者聯繫方式:yhl9842@163.net)

(責任編輯 吳北)


[火星人 ] UNIX中的多線程數據共享與線程同步已經有472次圍觀

http://coctec.com/docs/program/show-post-72126.html