message 消息块,
chunli@Linux:~/ace/AceTask$ cat message_block_test.cpp #include <ace/ACE.h> #include <ace/Message_Block.h> #include <ace/Log_Msg.h> int main() { ACE_Message_Block *head = new ACE_Message_Block(8); ACE_Message_Block *mblk = head; for (;;) { ssize_t nbytes = ACE::read_n(ACE_STDIN, mblk->wr_ptr(), mblk->size()); if (nbytes <= 0) break; // Break out at EOF or error. // Advance the write pointer to the end of the buffer. mblk->wr_ptr(nbytes); // Allocate message block and chain it at the end of list. mblk->cont(new ACE_Message_Block(8)); mblk = mblk->cont(); } // Print the contents of the list to the standard output. int i = 1; for (mblk = head; mblk != 0; mblk = mblk->cont()) { ACE_DEBUG((LM_DEBUG, "\n%dth message: ", i++)); ACE::write_n(ACE_STDOUT, mblk->rd_ptr(), mblk->length()); } head->release(); // This releases all the memory in the chain. return 0; } chunli@Linux:~/ace/AceTask$ chunli@Linux:~/ace/AceTask$ g++ message_block_test.cpp -lACE && ./a.out 12345678 qwertyuio asdfgh 11111111111111111111111 1th message: 12345678 2th message: qwertyu 3th message: io asdfg 4th message: h 111111 5th message: 11111111 6th message: 11111111 7th message: chunli@Linux:~/ace/AceTask$
ACE多线程
chunli@Linux:~/ace/AceTask$ cat thread_manager_test1.cpp //ACE_Guard提供了自动锁定、释放机制的同步锁,对可见的scope范围内进行自动加锁,当超出scope范围是程序自动卸锁。 #include <ace/Thread_Manager.h> #include <ace/Log_Msg.h> ACE_Thread_Mutex nm, iom; int n = 0; static ACE_THR_FUNC_RETURN func(void* arg) { while (n < 30) { { ACE_Guard<ACE_Thread_Mutex> guard(nm);//保护n n++; } { //ACE_Guard<ACE_Thread_Mutex> guard2(iom);//保护io ACE_DEBUG((LM_DEBUG, "(%t) %d\n", n)); } ACE_OS::sleep(1); } return 0; } int main() { //创建3个线程 ACE_Thread_Manager::instance()->spawn_n(5, func, NULL, THR_SCOPE_SYSTEM | THR_NEW_LWP); ACE_Thread_Manager::instance()->wait(); } chunli@Linux:~/ace/AceTask$ g++ thread_manager_test1.cpp -lACE && ./a.out (139716248389376) 2 (139716158551808) 3 (139716239996672) 5 (139716256782080) 4 (139716265174784) 2 (139716158551808) 6 (139716256782080) 7 (139716248389376) 8 (139716239996672) 9 (139716265174784) 10 (139716256782080) 11 (139716158551808) 13 (139716248389376) 12 (139716239996672) 14 (139716265174784) 15 (139716239996672) 16 (139716256782080) 17 (139716248389376) 18 (139716158551808) 19 (139716265174784) 20 (139716256782080) 21 (139716248389376) 22 (139716239996672) 23 (139716158551808) 24 (139716265174784) 25 (139716239996672) 26 (139716256782080) 27 (139716248389376) 28 (139716158551808) 29 (139716265174784) 30 chunli@Linux:~/ace/AceTask$
ACE_Task 多线程管理版 echo_server
chunli@Linux:~/ace/AceTask$ cat echo_server_mt.cpp #include <ace/INET_Addr.h> #include <ace/SOCK_Acceptor.h> #include <ace/SOCK_Stream.h> #include <ace/Thread_Manager.h> #include <ace/Log_Msg.h> static ACE_THR_FUNC_RETURN func(void* arg) { ACE_SOCK_Stream* socket = static_cast<ACE_SOCK_Stream*> (arg); char buf[512]; for (;;) { memset(buf, 0, sizeof(buf)); ssize_t n; if ((n = socket->recv(buf, sizeof(buf))) <= 0) { ACE_ERROR((LM_ERROR, "%p\n", "recv()")); break; } else { if (socket->send(buf, n) == -1) { ACE_ERROR((LM_ERROR, "%p\n", "send()")); break; } } } socket->close(); delete socket; return 0; } int main() { ACE_INET_Addr server_addr; ACE_SOCK_Acceptor acceptor; if (server_addr.set(8868) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "set()"), 1); if (acceptor.open(server_addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open()"), 1); for (;;) { ACE_SOCK_Stream* peer = new ACE_SOCK_Stream; if (acceptor.accept(*peer) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept()"), 1); peer->disable(ACE_NONBLOCK); // Ensure blocking <recv>s. ACE_Thread_Manager::instance()->spawn(func, peer); } return acceptor.close() == -1 ? 1 : 0; } chunli@Linux:~/ace/AceTask$ 编译运行: chunli@Linux:~/ace/AceTask$ g++ echo_server_mt.cpp -lACE && ./a.out 第一个客户端: chunli@Linux:~$ nc localhost 8868 qaz qaz wsx wsx 第二个客户端: chunli@Linux:~$ nc localhost 8868 123 123 eqwe eqwe
局域网 组播收发实验
chunli@Linux:~/ace/AceTask$ cat chat_room.cpp #include <iostream> #include <ace/SOCK_Dgram_Mcast.h> #include <ace/Log_Msg.h> #include <ace/Task.h> class ChatRoom: public ACE_Task_Base { public: int join(const ACE_INET_Addr& group) { _group = group; if (sock.join(_group) == -1) ACE_ERROR_RETURN((LM_ERROR,"%p\n", "join"), -1); return 0; } virtual int svc() { ACE_INET_Addr remoteAddr; for (;;) { memset(buf, 0, sizeof(buf)); if (sock.recv(buf, sizeof(buf), remoteAddr) != -1) { ACE_DEBUG((LM_DEBUG, "recv msg from %s:%d: %s\n", remoteAddr.get_host_addr(), remoteAddr.get_port_number(), buf)); } else { sock.leave(_group); break; } } return 0; } void sendMsg() { char sendBuf[512]; for (;;) { memset(sendBuf, 0, sizeof(sendBuf)); std::cin.getline(sendBuf, sizeof(sendBuf)); if (sock.send(sendBuf, strlen(sendBuf)) == -1) { ACE_ERROR((LM_ERROR,"%p\n", "send")); sock.leave(_group); break; } } } private: char buf[512]; ACE_INET_Addr _group; ACE_SOCK_Dgram_Mcast sock; }; int main() { ChatRoom cm; ACE_INET_Addr group(8000, "224.0.0.12"); if (cm.join(group) == -1) return 1; cm.activate(); cm.sendMsg(); } 一个运行: chunli@Linux:~/ace/AceTask$ g++ chat_room.cpp -lACE && ./a.out recv msg from 11.11.11.6:8000: 1234edxs 中国上海 recv msg from 11.11.11.6:8000: 中国上海 局域网其他机子运行 chunli@Linux:~/ace/AceTask$ g++ chat_room.cpp -lACE && ./a.out 1234edxs recv msg from 11.11.11.6:8000: 1234edxs recv msg from 11.11.11.6:8000: 中国上海
ACE_Message_Queue 生产者消费者模型
chunli@Linux:~/ace/AceTask$ cat producer.cpp #include "ace/Task.h" #include "ace/Message_Block.h" const int N = 10; //The Consumer Task. class Consumer: public ACE_Task<ACE_MT_SYNCH> { public: int open(void*) { ACE_DEBUG((LM_DEBUG, "(%t)Consumer task opened \n")); //Activate the Task activate(THR_NEW_LWP, 3);//开启3个线程 return 0; } //The Service Processing routine int svc(void) { ACE_Message_Block* mb = 0; while(1) { mb = 0; getq(mb);//Get message from underlying queue //ACE_Message_Block提供了两个指针函数以供程序员进行读写操作, //rd_ptr()指向可读的数据块地址, //wr_ptr()指向可写的数据块地址, //默认情况下都执行数据块的首地址 if (*mb->rd_ptr() < N) { ACE_DEBUG((LM_DEBUG, "(%t)Got message: %d from remote task\n", *mb->rd_ptr())); } else if (*mb->rd_ptr() == N) { ACE_DEBUG((LM_DEBUG, "(%t)Got message: %d from remote task\n", *mb->rd_ptr())); ++*mb->rd_ptr(); // *mb->rd_ptr() == N+1 ungetq(mb); break;// 供后续线程查看 } else { // *mb->rd_ptr() > N ungetq(mb); break;// 供后续线程查看 } } return 0; } int close(u_long) { ACE_DEBUG((LM_DEBUG, "(%t)Consumer closes down \n")); return 0; } }; class Producer: public ACE_Task_Base { public: Producer(Consumer* consumer):data_(0), consumer_(consumer) { mb_ = new ACE_Message_Block((char*) &data_, sizeof(data_)); } int open(void*) { ACE_DEBUG((LM_DEBUG, "(%t)Producer task opened \n")); //Activate the Task activate(THR_NEW_LWP, 1); return 0; } //The Service Processing routine int svc(void) { while (data_ <= N) { //Send message to consumer ACE_DEBUG((LM_DEBUG, "(%t)Sending message: %d to remote task\n", data_)); consumer_->putq(mb_); //Go to sleep for a sec. ACE_OS::sleep(1); ++data_; } return 0; } int close(u_long) { ACE_DEBUG((LM_DEBUG, "(%t)Producer closes down \n")); return 0; } private: char data_; Consumer * consumer_; ACE_Message_Block * mb_; }; int main() { Consumer* consumer = new Consumer; consumer->open(0); Producer* producer = new Producer(consumer); producer->open(0); ACE_Thread_Manager::instance()->wait(); //Wait for all the tasks to exit. } chunli@Linux:~/ace/AceTask$ g++ producer.cpp -lACE -lpthread && ./a.out (140704458946432)Consumer task opened (140704458946432)Producer task opened (140704408925952)Sending message: 0 to remote task (140704434104064)Got message: 0 from remote task (140704408925952)Sending message: 1 to remote task (140704434104064)Got message: 1 from remote task (140704408925952)Sending message: 2 to remote task (140704425711360)Got message: 2 from remote task (140704408925952)Sending message: 3 to remote task (140704417318656)Got message: 3 from remote task (140704408925952)Sending message: 4 to remote task (140704434104064)Got message: 4 from remote task (140704408925952)Sending message: 5 to remote task (140704425711360)Got message: 5 from remote task (140704408925952)Sending message: 6 to remote task (140704417318656)Got message: 6 from remote task (140704408925952)Sending message: 7 to remote task (140704434104064)Got message: 7 from remote task (140704408925952)Sending message: 8 to remote task (140704425711360)Got message: 8 from remote task (140704408925952)Sending message: 9 to remote task (140704417318656)Got message: 9 from remote task (140704408925952)Sending message: 10 to remote task (140704434104064)Got message: 10 from remote task (140704434104064)Consumer closes down (140704425711360)Consumer closes down (140704417318656)Consumer closes down (140704408925952)Producer closes down chunli@Linux:~/ace/AceTask$
本文出自 “魂斗罗” 博客,请务必保留此出处http://990487026.blog.51cto.com/10133282/1889842
原文:http://990487026.blog.51cto.com/10133282/1889842