首页 > 编程语言 > 详细

4 ACE Task 线程管理

时间:2017-01-06 22:41:52      阅读:314      评论:0      收藏:0      [点我收藏+]



技术分享

技术分享

技术分享







message 消息块,

chunli@Linux:~/ace/AceTask$ cat message_block_test.cpp 
#include <ace/ACE.h>
#include <ace/Message_Block.h>
#include <ace/Log_Msg.h>

int main() {
    ACE_Message_Block *head = new ACE_Message_Block(8);
    ACE_Message_Block *mblk = head;

    for (;;) {
        ssize_t nbytes = ACE::read_n(ACE_STDIN, mblk->wr_ptr(), mblk->size());
        if (nbytes <= 0)
            break; // Break out at EOF or error.

        // Advance the write pointer to the end of the buffer.
        mblk->wr_ptr(nbytes);
        // Allocate message block and chain it at the end of list.
        mblk->cont(new ACE_Message_Block(8));
        mblk = mblk->cont();
    }
    // Print the contents of the list to the standard output.
    int i = 1;
    for (mblk = head; mblk != 0; mblk = mblk->cont()) {
        ACE_DEBUG((LM_DEBUG, "\n%dth message: ", i++));
        ACE::write_n(ACE_STDOUT, mblk->rd_ptr(), mblk->length());
    }

    head->release(); // This releases all the memory in the chain.
    return 0;
}
chunli@Linux:~/ace/AceTask$ 



chunli@Linux:~/ace/AceTask$ g++ message_block_test.cpp -lACE && ./a.out 
12345678
qwertyuio
asdfgh
11111111111111111111111

1th message: 12345678
2th message: 
qwertyu
3th message: io
asdfg
4th message: h
111111
5th message: 11111111
6th message: 11111111
7th message: chunli@Linux:~/ace/AceTask$






ACE多线程

chunli@Linux:~/ace/AceTask$ cat thread_manager_test1.cpp 
//ACE_Guard提供了自动锁定、释放机制的同步锁,对可见的scope范围内进行自动加锁,当超出scope范围是程序自动卸锁。

#include <ace/Thread_Manager.h>
#include <ace/Log_Msg.h>

ACE_Thread_Mutex nm, iom;
int n = 0;

static ACE_THR_FUNC_RETURN func(void* arg) 
{
	while (n < 30) 
	{
		{
			ACE_Guard<ACE_Thread_Mutex> guard(nm);//保护n
			n++;
		}
		{
			//ACE_Guard<ACE_Thread_Mutex> guard2(iom);//保护io
			ACE_DEBUG((LM_DEBUG, "(%t) %d\n", n));
		}
		ACE_OS::sleep(1);
	}
	return 0;
}

int main() 
{
	//创建3个线程
	ACE_Thread_Manager::instance()->spawn_n(5, func, NULL, THR_SCOPE_SYSTEM | THR_NEW_LWP);
	ACE_Thread_Manager::instance()->wait();
}
chunli@Linux:~/ace/AceTask$ g++ thread_manager_test1.cpp -lACE && ./a.out 
(139716248389376) 2
(139716158551808) 3
(139716239996672) 5
(139716256782080) 4
(139716265174784) 2
(139716158551808) 6
(139716256782080) 7
(139716248389376) 8
(139716239996672) 9
(139716265174784) 10
(139716256782080) 11
(139716158551808) 13
(139716248389376) 12
(139716239996672) 14
(139716265174784) 15
(139716239996672) 16
(139716256782080) 17
(139716248389376) 18
(139716158551808) 19
(139716265174784) 20
(139716256782080) 21
(139716248389376) 22
(139716239996672) 23
(139716158551808) 24
(139716265174784) 25
(139716239996672) 26
(139716256782080) 27
(139716248389376) 28
(139716158551808) 29
(139716265174784) 30
chunli@Linux:~/ace/AceTask$




ACE_Task 多线程管理版 echo_server

chunli@Linux:~/ace/AceTask$ cat echo_server_mt.cpp 
#include <ace/INET_Addr.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/SOCK_Stream.h>
#include <ace/Thread_Manager.h>
#include <ace/Log_Msg.h>

static ACE_THR_FUNC_RETURN func(void* arg) {
	ACE_SOCK_Stream* socket = static_cast<ACE_SOCK_Stream*> (arg);

	char buf[512];
	for (;;) {
		memset(buf, 0, sizeof(buf));
		ssize_t n;
		if ((n = socket->recv(buf, sizeof(buf))) <= 0) {
			ACE_ERROR((LM_ERROR, "%p\n", "recv()"));
			break;
		} else {
			if (socket->send(buf, n) == -1) {
				ACE_ERROR((LM_ERROR, "%p\n", "send()"));
				break;
			}
		}
	}
	socket->close();
	delete socket;
	return 0;
}

int main() {
	ACE_INET_Addr server_addr;
	ACE_SOCK_Acceptor acceptor;

	if (server_addr.set(8868) == -1)
		ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "set()"), 1);

	if (acceptor.open(server_addr) == -1)
		ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open()"), 1);

	for (;;) {
		ACE_SOCK_Stream* peer = new ACE_SOCK_Stream;

		if (acceptor.accept(*peer) == -1)
			ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept()"), 1);
		peer->disable(ACE_NONBLOCK); // Ensure blocking <recv>s.

		ACE_Thread_Manager::instance()->spawn(func, peer);
	}

	return acceptor.close() == -1 ? 1 : 0;
}
chunli@Linux:~/ace/AceTask$ 


编译运行:
chunli@Linux:~/ace/AceTask$ g++ echo_server_mt.cpp -lACE && ./a.out 


第一个客户端:
chunli@Linux:~$ nc localhost 8868
qaz
qaz
wsx
wsx

第二个客户端:
chunli@Linux:~$ nc localhost 8868
123
123
eqwe
eqwe




局域网 组播收发实验

chunli@Linux:~/ace/AceTask$ cat chat_room.cpp 
#include <iostream>

#include <ace/SOCK_Dgram_Mcast.h>
#include <ace/Log_Msg.h>
#include <ace/Task.h>

class ChatRoom: public ACE_Task_Base {
public:
    int join(const ACE_INET_Addr& group) {
        _group = group;
        if (sock.join(_group) == -1)
            ACE_ERROR_RETURN((LM_ERROR,"%p\n", "join"), -1);

        return 0;
    }

    virtual int svc() {
        ACE_INET_Addr remoteAddr;
        for (;;) {
            memset(buf, 0, sizeof(buf));
            if (sock.recv(buf, sizeof(buf), remoteAddr) != -1) {
                ACE_DEBUG((LM_DEBUG, "recv msg from %s:%d: %s\n",
                                remoteAddr.get_host_addr(),
                                remoteAddr.get_port_number(), buf));
            } else {
                sock.leave(_group);
                break;
            }
        }
        return 0;
    }

    void sendMsg() {
        char sendBuf[512];
        for (;;) {
            memset(sendBuf, 0, sizeof(sendBuf));
            std::cin.getline(sendBuf, sizeof(sendBuf));

            if (sock.send(sendBuf, strlen(sendBuf)) == -1) {
                ACE_ERROR((LM_ERROR,"%p\n", "send"));
                sock.leave(_group);
                break;
            }
        }
    }

private:
    char buf[512];
    ACE_INET_Addr _group;
    ACE_SOCK_Dgram_Mcast sock;
};

int main() {
    ChatRoom cm;
    ACE_INET_Addr group(8000, "224.0.0.12");
    if (cm.join(group) == -1)
        return 1;

    cm.activate();
    cm.sendMsg();
}

一个运行:
chunli@Linux:~/ace/AceTask$ g++ chat_room.cpp -lACE && ./a.out 
recv msg from 11.11.11.6:8000: 1234edxs
中国上海
recv msg from 11.11.11.6:8000: 中国上海


局域网其他机子运行
chunli@Linux:~/ace/AceTask$ g++ chat_room.cpp -lACE && ./a.out
1234edxs
recv msg from 11.11.11.6:8000: 1234edxs
recv msg from 11.11.11.6:8000: 中国上海



ACE_Message_Queue 生产者消费者模型

chunli@Linux:~/ace/AceTask$ cat producer.cpp 
#include "ace/Task.h"
#include "ace/Message_Block.h"
const int N = 10;

//The Consumer Task.
class Consumer: public ACE_Task<ACE_MT_SYNCH> {
public:
	int open(void*) 
	{
		ACE_DEBUG((LM_DEBUG, "(%t)Consumer task opened \n"));
		//Activate the Task
		activate(THR_NEW_LWP, 3);//开启3个线程
		return 0;
	}
	//The Service Processing routine
	int svc(void) 
	{
		ACE_Message_Block* mb = 0;
		while(1)
		 {
			mb = 0;
			getq(mb);//Get message from underlying queue
			//ACE_Message_Block提供了两个指针函数以供程序员进行读写操作,
			//rd_ptr()指向可读的数据块地址,
			//wr_ptr()指向可写的数据块地址,
			//默认情况下都执行数据块的首地址
			if (*mb->rd_ptr() < N) 
			{
				ACE_DEBUG((LM_DEBUG, "(%t)Got message: %d from remote task\n", *mb->rd_ptr()));
			}
			else if (*mb->rd_ptr() == N) 
			{
				ACE_DEBUG((LM_DEBUG, "(%t)Got message: %d from remote task\n", *mb->rd_ptr()));
				++*mb->rd_ptr(); // *mb->rd_ptr() == N+1
				ungetq(mb);
				break;// 供后续线程查看
			}
			else 
			{ 
				// *mb->rd_ptr() > N
				ungetq(mb);
				break;// 供后续线程查看
			}
		}
		return 0;
	}
	int close(u_long) 
	{
		ACE_DEBUG((LM_DEBUG, "(%t)Consumer closes down \n"));
		return 0;
	}
};

class Producer: public ACE_Task_Base {
public:
	Producer(Consumer* consumer):data_(0), consumer_(consumer) 
	{
			mb_ = new ACE_Message_Block((char*) &data_, sizeof(data_));
	}
	int open(void*) 
	{
		ACE_DEBUG((LM_DEBUG, "(%t)Producer task opened \n"));
		//Activate the Task
		activate(THR_NEW_LWP, 1);
		return 0;
	}
	//The Service Processing routine
	int svc(void) 
	{
		while (data_ <= N) 
		{
			//Send message to consumer
			ACE_DEBUG((LM_DEBUG, "(%t)Sending message: %d to remote task\n", data_));
			consumer_->putq(mb_);
			//Go to sleep for a sec.
			ACE_OS::sleep(1);
			++data_;
		}
		return 0;
	}
	int close(u_long) 
	{
		ACE_DEBUG((LM_DEBUG, "(%t)Producer closes down \n"));
		return 0;
	}
private:
	char data_;
	Consumer * consumer_;
	ACE_Message_Block * mb_;
};

int main() {
	Consumer* consumer = new Consumer;	
	consumer->open(0);

	Producer* producer = new Producer(consumer);
	producer->open(0);

	ACE_Thread_Manager::instance()->wait();	//Wait for all the tasks to exit.
}
chunli@Linux:~/ace/AceTask$ g++ producer.cpp -lACE -lpthread && ./a.out 
(140704458946432)Consumer task opened 
(140704458946432)Producer task opened 
(140704408925952)Sending message: 0 to remote task
(140704434104064)Got message: 0 from remote task
(140704408925952)Sending message: 1 to remote task
(140704434104064)Got message: 1 from remote task
(140704408925952)Sending message: 2 to remote task
(140704425711360)Got message: 2 from remote task
(140704408925952)Sending message: 3 to remote task
(140704417318656)Got message: 3 from remote task
(140704408925952)Sending message: 4 to remote task
(140704434104064)Got message: 4 from remote task
(140704408925952)Sending message: 5 to remote task
(140704425711360)Got message: 5 from remote task
(140704408925952)Sending message: 6 to remote task
(140704417318656)Got message: 6 from remote task
(140704408925952)Sending message: 7 to remote task
(140704434104064)Got message: 7 from remote task
(140704408925952)Sending message: 8 to remote task
(140704425711360)Got message: 8 from remote task
(140704408925952)Sending message: 9 to remote task
(140704417318656)Got message: 9 from remote task
(140704408925952)Sending message: 10 to remote task
(140704434104064)Got message: 10 from remote task
(140704434104064)Consumer closes down 
(140704425711360)Consumer closes down 
(140704417318656)Consumer closes down 
(140704408925952)Producer closes down 
chunli@Linux:~/ace/AceTask$



本文出自 “魂斗罗” 博客,请务必保留此出处http://990487026.blog.51cto.com/10133282/1889842

4 ACE Task 线程管理

原文:http://990487026.blog.51cto.com/10133282/1889842

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!