ACE_Proactor框架是ACE_Reactor框架的升级版,前者异步读写,后者同步读写。
这里主要讲个人对前者的理解:
(1)事件循环:ACE_Proactor模式使用ACE_Proactor::instance ()->proactor_run_event_loop();来监听事件
(2)ACE_Asynch_Connector<A_Service_Handler> connector;主动发起连接
(3)ACE_Asynch_Acceptor<B_Service_Handler> acceptor;被动监听等待连接的到来
(4)A_Service_Handler和B_Service_Handler均是用户自定义的继承自ACE_Service_Handler的类
(5)具体实施发送和接收的分别是A_Service_Handler中的ACE_Asynch_Write_Stream writer_;和B_Service_Handler中的ACE_Asynch_Read_Stream reader_;
具体实现用了两个项目Asynch_Connector和Asynch_Acceptor,一个用来连续发送,一个用来不停的接收
Asynch_Connector项目
connector_main.cpp
#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Connector.h"
class Service_Handler : public ACE_Service_Handler
{
public:
Service_Handler()
{
ACE_OS::printf("Service_Handler constructed for connector \n");
}
~Service_Handler ()
{
if (this->handle () != ACE_INVALID_HANDLE)
ACE_OS::closesocket (this->handle ());
ACE_OS::printf("one Service_Handler for connecter destructed");
}
//把当前时间发送到远端
void send_to_accepter(void);
virtual void open (ACE_HANDLE h, ACE_Message_Block&)
{
//在OPEN函数中完成读写操作
this->handle (h);
//connect
if (this->writer_.open (*this) != 0 )
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("Service_Handler open")));
delete this;
return;
}
ACE_OS::printf("connceted\n");
send_to_accepter();
return;
}
//writer_.write()异步写完成后会调用此函数
virtual void handle_write_stream
(const ACE_Asynch_Write_Stream::Result &result)
{
result.message_block ().release();
ACE_OS::sleep(1);
//上次发送完毕之后再接着发送一次,这次发送完成之后又会调用
//handle_write_stream,所以会一直发送
send_to_accepter();
return;
}
private:
//用来向远端发送内容
ACE_Asynch_Write_Stream writer_;
};
//把当前时间发送到远端
void Service_Handler::send_to_accepter(void)
{
time_t now = ACE_OS::gettimeofday().sec();
ACE_Message_Block *mb = new ACE_Message_Block(100);
//获取当前时间的字符串格式
mb->copy( ctime(&now));
//send message to accepter
if (this->writer_.write(*mb,mb->length()) !=0)
{
ACE_OS::printf("Begin write fail in open\n");
delete this;
return;
}
else
{
ACE_OS::printf("sended %s\n",mb->rd_ptr());
}
}
int main(int argc, char *argv[])
{
ACE_INET_Addr addr(3000,ACE_LOCALHOST);
ACE_Asynch_Connector<Service_Handler> connector;
connector.open();
if (connector.connect(addr) == -1)
return -1;
ACE_Proactor::instance ()->proactor_run_event_loop();
return 0;
}accepter_main.cpp
#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
class Service_Handler : public ACE_Service_Handler
{
public:
Service_Handler()
{
ACE_OS::printf("Service_Handler constructed for accepter \n");
}
~Service_Handler ()
{
if (this->handle () != ACE_INVALID_HANDLE)
ACE_OS::closesocket (this->handle ());
ACE_OS::printf("one Service_Handler for accepter destructed");
}
//异步读取消息
void reveive_from_connector(void);
virtual void open (ACE_HANDLE h, ACE_Message_Block&)
{
this->handle (h);
if (this->reader_.open (*this) != 0 )
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("Service_Handler open")));
delete this;
return;
}
//异步读取
reveive_from_connector();
return;
}
//reader_.read()异步读完成后会调用此函数
virtual void handle_read_stream
(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
if (!result.success () || result.bytes_transferred () == 0)
{
mb.release ();
delete this;
return;
}
//输出读取内容
ACE_OS::printf("received:%s\n",mb.rd_ptr());
mb.release();
//继续下一次异步读取
reveive_from_connector();
}
private:
ACE_Asynch_Read_Stream reader_;
char buffer[1024];
};
void Service_Handler::reveive_from_connector(void)
{
ACE_Message_Block *mb = new ACE_Message_Block(buffer,1024);
if (this->reader_.read (*mb, mb->space ()) != 0)
{
ACE_OS::printf("Begin read fail\n");
delete this;
return;
}
}
int main(int argc, char *argv[])
{
int port=3000;
ACE_Asynch_Acceptor<Service_Handler> acceptor;
if (acceptor.open (ACE_INET_Addr (port)) == -1)
return -1;
ACE_Proactor::instance ()->proactor_run_event_loop();
return 0;
}先运行接收程序,再运行发送程序,结果如下
ACE_Proactor一个发送时间一个接受时间,布布扣,bubuko.com
原文:http://blog.csdn.net/calmreason/article/details/21237003