ACE_Proactor框架是ACE_Reactor框架的升级版,前者异步读写,后者同步读写。
这里主要讲个人对前者的理解:
(1)事件循环:ACE_Proactor模式使用ACE_Proactor::instance ()->proactor_run_event_loop();来监听事件
(2)ACE_Asynch_Connector<A_Service_Handler> connector;主动发起连接
(3)ACE_Asynch_Acceptor<B_Service_Handler> acceptor;被动监听等待连接的到来
(4)A_Service_Handler和B_Service_Handler均是用户自定义的继承自ACE_Service_Handler的类
(5)具体实施发送和接收的分别是A_Service_Handler中的ACE_Asynch_Write_Stream writer_;和B_Service_Handler中的ACE_Asynch_Read_Stream reader_;
具体实现用了两个项目Asynch_Connector和Asynch_Acceptor,一个用来连续发送,一个用来不停的接收
Asynch_Connector项目
connector_main.cpp
#include "ace/Message_Queue.h" #include "ace/Asynch_IO.h" #include "ace/OS.h" #include "ace/Proactor.h" #include "ace/Asynch_Connector.h" class Service_Handler : public ACE_Service_Handler { public: Service_Handler() { ACE_OS::printf("Service_Handler constructed for connector \n"); } ~Service_Handler () { if (this->handle () != ACE_INVALID_HANDLE) ACE_OS::closesocket (this->handle ()); ACE_OS::printf("one Service_Handler for connecter destructed"); } //把当前时间发送到远端 void send_to_accepter(void); virtual void open (ACE_HANDLE h, ACE_Message_Block&) { //在OPEN函数中完成读写操作 this->handle (h); //connect if (this->writer_.open (*this) != 0 ) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("Service_Handler open"))); delete this; return; } ACE_OS::printf("connceted\n"); send_to_accepter(); return; } //writer_.write()异步写完成后会调用此函数 virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) { result.message_block ().release(); ACE_OS::sleep(1); //上次发送完毕之后再接着发送一次,这次发送完成之后又会调用 //handle_write_stream,所以会一直发送 send_to_accepter(); return; } private: //用来向远端发送内容 ACE_Asynch_Write_Stream writer_; }; //把当前时间发送到远端 void Service_Handler::send_to_accepter(void) { time_t now = ACE_OS::gettimeofday().sec(); ACE_Message_Block *mb = new ACE_Message_Block(100); //获取当前时间的字符串格式 mb->copy( ctime(&now)); //send message to accepter if (this->writer_.write(*mb,mb->length()) !=0) { ACE_OS::printf("Begin write fail in open\n"); delete this; return; } else { ACE_OS::printf("sended %s\n",mb->rd_ptr()); } } int main(int argc, char *argv[]) { ACE_INET_Addr addr(3000,ACE_LOCALHOST); ACE_Asynch_Connector<Service_Handler> connector; connector.open(); if (connector.connect(addr) == -1) return -1; ACE_Proactor::instance ()->proactor_run_event_loop(); return 0; }
accepter_main.cpp
#include "ace/Message_Queue.h" #include "ace/Asynch_IO.h" #include "ace/OS.h" #include "ace/Proactor.h" #include "ace/Asynch_Acceptor.h" class Service_Handler : public ACE_Service_Handler { public: Service_Handler() { ACE_OS::printf("Service_Handler constructed for accepter \n"); } ~Service_Handler () { if (this->handle () != ACE_INVALID_HANDLE) ACE_OS::closesocket (this->handle ()); ACE_OS::printf("one Service_Handler for accepter destructed"); } //异步读取消息 void reveive_from_connector(void); virtual void open (ACE_HANDLE h, ACE_Message_Block&) { this->handle (h); if (this->reader_.open (*this) != 0 ) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("Service_Handler open"))); delete this; return; } //异步读取 reveive_from_connector(); return; } //reader_.read()异步读完成后会调用此函数 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { ACE_Message_Block &mb = result.message_block (); if (!result.success () || result.bytes_transferred () == 0) { mb.release (); delete this; return; } //输出读取内容 ACE_OS::printf("received:%s\n",mb.rd_ptr()); mb.release(); //继续下一次异步读取 reveive_from_connector(); } private: ACE_Asynch_Read_Stream reader_; char buffer[1024]; }; void Service_Handler::reveive_from_connector(void) { ACE_Message_Block *mb = new ACE_Message_Block(buffer,1024); if (this->reader_.read (*mb, mb->space ()) != 0) { ACE_OS::printf("Begin read fail\n"); delete this; return; } } int main(int argc, char *argv[]) { int port=3000; ACE_Asynch_Acceptor<Service_Handler> acceptor; if (acceptor.open (ACE_INET_Addr (port)) == -1) return -1; ACE_Proactor::instance ()->proactor_run_event_loop(); return 0; }
先运行接收程序,再运行发送程序,结果如下
ACE_Proactor一个发送时间一个接受时间,布布扣,bubuko.com
原文:http://blog.csdn.net/calmreason/article/details/21237003