当一个调用者异步发出一个功能调用时,调用者不能立马得到结果。
实际处理这个调用的部件在完毕后,通过状态、通知和回调来通知调用者。
以 android中AsyncTask类为例,顾名思义异步运行任务,在doInBackground 运行完毕后,onPostExecute 方法将被UI 线程调用,后台的计算结果将通过该方法传递到UI 线程,而且在界面上展示给用户.。在android或者java异步编程中须要注意下面几个知识点:回调,监听者模式,观察者模式。这几点在之后另外几篇文章中会提及。
由于这点定义跟同步编程的定义非常相像,所以非常多人觉得同步编程就等堵塞式编程。对于同步调用来说。非常多时候当前线程还是激活的,仅仅是从逻辑上当前函数没有返回而已。比如。我们在 socket编程中调用Receive函数,假设缓冲区中没有数据,这个函数就会一直等待,直到有数据才返回。而此时。当前线程还会继续处理各种各样的消 息。假设主窗体和调用函数在同一个线程中,除非你在特殊的界面操作函数中调用。事实上主界面还是应该能够刷新。可是在android中,因为主线程(UI线程)的不安全性。特别到4.0版本号后,系统已经不同意在主线程中进行耗时的同步编程。
所以android才出现了AsyncTask类用于异步编程。
从这个定义上来说,非堵塞编程能够说是异步编程的一种,可是异步编程并不等于非堵塞式编程。
那还有另外一方式。你能够叫一个人(监听者,观察者)帮你看着。直接你买票了,再通知你。你能够先去别的事情(而异步这不会这样。异步情况下是当一个进程发
起一个函数(任务)调用的时候。不会等函数返回)。堵塞是就是等排队,非堵塞就是直接走开。
也可通过调用自己定义选择器提供者的 openSelector 方法来创建选择器。通过选择器的 close 方法关闭选择器之前,它一直保持打开状态。
假设某个线程眼下正堵塞在此选择器的某个选择方法中,则中断该线程,如同调用该选择器的 wakeup 方法那样。
全部仍与此选择器关联的未取消键已无效、其通道已注销,而且与此选择器关联的全部其它资源已释放。
假设此选择器已经关闭。则调用此方法无效。
关闭选择器后,除了调用此方法或 wakeup 方法外,以不论什么其它方式继续使用它都将导致抛出 ClosedSelectorException。
不可能为随意的已有套接字创建通道,也不可能指定与套接字通道关联的套接字所使用的 SocketImpl 对象。
可通过调用 isConnectionPending 方法来确定是否正在进行连接操作。
connect 和 finishConnect 方法是相互同步的,假设正在调用当中某个方法的同一时候试图发起读取或写入操作,则在该调用完毕之前该操作被堵塞。
Selector mSelector = null;
ByteBuffer sendBuffer = null;
SocketChannel client = null;
InetSocketAddress isa = null;
SocketEventListener mSocketEventListener = null;
private boolean Connect(String site, int port)
{
if (mSocketEventListener != null)
{
mSocketEventListener.OnSocketPause();
}
boolean ret = false;
try
{
mSelector = Selector.open();
client = SocketChannel.open();
client.socket().setSoTimeout(5000);
isa = new InetSocketAddress(site, port);
boolean isconnect = client.connect(isa);
// 将客户端设定为异步
client.configureBlocking(false);
// 在轮讯对象中注冊此客户端的读取事件(就是当server向此客户端发送数据的时候)
client.register(mSelector, SelectionKey.OP_READ);
long waittimes = 0;
if(!isconnect)
{
while (!client.finishConnect())
{
EngineLog.redLog(TAG, "等待非堵塞连接建立....");
Thread.sleep(50);
if(waittimes < 100)
{
waittimes++;
}
else
{
break;
}
}
}
Thread.sleep(500);
haverepaired();
startListener();
ret = true;
}
catch (Exception e)
{
EngineLog.redLog(TAG + " - Connect error", e != null ? e.toString() : "null");
try
{
Thread.sleep(1000 * 10);
}
catch (Exception e1)
{
EngineLog.redLog(TAG + " - Connect error", e1 != null ? e1.toString() : "null");
}
ret = false;
}
return ret;
}public interface SocketEventListener
{
/**
* Socket正在接收数据
* */
public void OnStreamRecive();
/**
* Socket接收数据完毕
* */
public void OnStreamReciveFinish();
/**
* Socket有新的消息返回
* */
public void OnStreamComing(byte[] aStreamData);
/**
* Socket出现异常
* */
public void OnSocketPause();
/**
* Socket已修复,可用
* */
public void OnSocketAvaliable();
}监听接口的使用:rivate void startListener()
{
if (mReadThread == null || mReadThread.isInterrupted())
{
mReadThread = null;
mReadThread = new Thread()
{
@Override
public void run()
{
while (!this.isInterrupted() && mRunRead)
{
MyLineLog.redLog(TAG,"startListener:" + mSendMsgTime);
try
{
// 假设client连接没有打开就退出循环
if (!client.isOpen())
break;
// 此方法为查询是否有事件发生假设没有就堵塞,有的话返回事件数量
int eventcount = mSelector.select();
// 假设没有事件返回循环
if (eventcount > 0)
{
starttime = CommonClass.getCurrentTime();
// 遍例全部的事件
for (SelectionKey key : mSelector.selectedKeys())
{
// 删除本次事件
mSelector.selectedKeys().remove(key);
// 假设本事件的类型为read时,表示server向本client发送了数据
if (key.isValid() && key.isReadable())
{
if (mSocketEventListener != null)
{
mSocketEventListener.OnStreamRecive();
}
boolean readresult = ReceiveDataBuffer((SocketChannel) key.channel());
if (mSocketEventListener != null)
{
mSocketEventListener.OnStreamReciveFinish();
}
if(readresult)
{
key.interestOps(SelectionKey.OP_READ);
sleep(200);
}
else
{
throw new Exception();
}
}
key = null;
}
mSelector.selectedKeys().clear();
}
}
catch (Exception e)
{
mRunRead = false;
mReadThread = null;
if(e instanceof InterruptedException)
{
MyLineLog.redLog(TAG, "startListener:" + e.toString());
}
else
{
break;
}
}
}
}
};
mReadThread.setName(TAG + " Listener, " + CommonClass.getCurrentTime());
mRunRead = true;
mReadThread.start();
}
}public boolean SendSocketMsg(byte[] aMessage) throws IOException
{
boolean ret = false;
try
{
sendBuffer.clear();
sendBuffer = ByteBuffer.wrap(aMessage);
int sendsize = client.write(sendBuffer);
sendBuffer.flip();
sendBuffer.clear();
mSendMsgTime = CommonClass.getCurrentTime();
MyLineLog.redLog(TAG, "SendSocketMsg:" + mSendMsgTime + ", sendsize:" + sendsize);
ret = true;
}
catch (Exception e)
{
MyLineLog.redLog(TAG, "发送数据失败。");
if (mSocketEventListener != null)
{
mSocketEventListener.OnSocketPause();
}
// crash();
}
return ret;
}private boolean ReceiveDataBuffer(SocketChannel aSocketChannel)
{
// n 有数据的时候返回读取到的字节数。
// 0 没有数据而且没有达到流的末端时返回0。
// -1 当达到流末端的时候返回-1。
boolean ret = false;
ByteArrayBuffer bab = new ByteArrayBuffer(8*1024);
while(true)
{
try
{
ByteBuffer readBuffer = ByteBuffer.allocate(1024 * 1);
readBuffer.clear();
int readsize = aSocketChannel.read(readBuffer);
if(readsize > 0)
{
MyLineLog.redLog(TAG, "aSocketChannel.read=>" + readsize);
byte[] readbytes = readBuffer.array();
bab.append(readbytes, 0, readsize);
readBuffer.clear();
readBuffer.flip();
ret = true;
}
else if(readsize == 0)
{
int buffersize = bab.length();
byte[] readdata = bab.buffer();
int readdataoffset = 0;
boolean parsedata = true;
while(readdataoffset < buffersize && parsedata)
{
byte datatype = readdata[readdataoffset];
if (datatype == PushUtils.PACKAGETYPE_HEARTBEAT || datatype == PushUtils.PACKAGETYPE_HEARTBEAR_NODATA)
{
byte[] blockdata = new byte[] { datatype };
ReceiveData(blockdata);
readdataoffset += 1;
blockdata = null;
}
else
{
byte[] blocklength = new byte[4];
System.arraycopy(readdata, readdataoffset + 5, blocklength, 0, 4);
int blocksize = CommonClass.bytes2int(CommonClass.LitteEndian_BigEndian(blocklength));
blocklength = null;
int blockdatasize = 5 + blocksize + 4;
if(blockdatasize <= buffersize)
{
MyLineLog.redLog(TAG, "块数据大小:" + blockdatasize);
byte[] blockdata = new byte[blockdatasize];
System.arraycopy(readdata, readdataoffset, blockdata, 0, blockdatasize);
long starttime = CommonClass.getCurrentTime();
ReceiveData(blockdata);
long endtime = CommonClass.getCurrentTime();
MyLineLog.redLog(TAG, "解析数据用时:" + (endtime - starttime) + "ms");
readdataoffset += blockdatasize;
blockdata = null;
}
else if(blockdatasize < 10240)
{//小于10k,则属于正常包
MyLineLog.redLog(TAG, "块数据大小:" + blockdatasize + ",小于10k,说明数据不完整。继续获取。");
//将未解析数据存到暂时buffer
int IncompleteSize = buffersize - readdataoffset;
if(IncompleteSize > 0)
{
byte[] Incompletedata = new byte[IncompleteSize];
System.arraycopy(readdata, readdataoffset, Incompletedata, 0, IncompleteSize);
bab.clear();
bab.append(Incompletedata, 0, IncompleteSize);
parsedata = false;
Incompletedata = null;
}
}
else
{//异常包
MyLineLog.yellowLog(TAG, "块数据错误大小:" + blockdatasize);
MyLineLog.redLog(TAG,"blockdatasize error:" + blockdatasize);
ret = true;
break;
}
}
}
if(parsedata)
{
ret = true;
break;
}
}
else if(readsize == -1)
{
ret = false;
break;
}
else
{
ret = true;
break;
}
}
catch (IOException e)
{
MyLineLog.redLog(TAG, "aSocketChannel IOException=>" + e.toString());
ret = false;
break;
}
}
bab.clear();
bab = null;
return ret;
}
private void ReceiveData(byte[] aDataBlock)
{
try
{
MyLineLog.redLog(TAG, "ReceiveData:" + mSendMsgTime);
if (mSendMsgTime != 0)
{
mSendMsgTime = 0;
}
byte[] ret = null;
int offset = 0;
byte datatype = aDataBlock[offset];
offset += 1;
if (datatype != -1)
{
if (datatype == PushUtils.PACKAGETYPE_HEARTBEAT)
{
ret = new byte[] { datatype };
}
else if (datatype == PushUtils.PACKAGETYPE_HEARTBEAR_NODATA)
{
ret = new byte[] { datatype };
}
else if (datatype == PushUtils.PACKAGETYPE_NORMAL || datatype == PushUtils.PACKAGETYPE_HEARTBEAR_HAVEDATA)
{
byte[] databytelength = new byte[4];
System.arraycopy(aDataBlock, offset, databytelength, 0, 4);
offset += 4;
int header = CommonClass.bytes2int(CommonClass.LitteEndian_BigEndian(databytelength));
databytelength = null;
if (header == PushUtils.PACKAGEHEADER)
{
byte[] datalengthbyte = new byte[4];
System.arraycopy(aDataBlock, offset, datalengthbyte, 0, 4);
offset += 4;
int datalength = CommonClass.bytes2int(CommonClass.LitteEndian_BigEndian(datalengthbyte));
datalengthbyte = null;
if (datalength > 4)
{
// compressed bit 临时不压缩
byte compressed = aDataBlock[offset];
offset += 1;
if (compressed == 1)
{//解压缩
//跳过头4个字节,此处用于解压缩后的数据大小,临时不须要
offset += 4;
int contentlength = datalength - 1 - 4;
byte[] datacontentbyte = new byte[contentlength];
System.arraycopy(aDataBlock, offset, datacontentbyte, 0, contentlength);
offset += contentlength;
byte[] compressdata = new byte[contentlength - 4];
System.arraycopy(datacontentbyte, 0, compressdata, 0, contentlength - 4);
long starttime = CommonClass.getCurrentTime();
byte[] decompressdatacontentbyte = CommonClass.decompress(compressdata);
long endtime = CommonClass.getCurrentTime();
MyLineLog.redLog(TAG, "解压缩数据用时:" + (endtime - starttime) + "ms");
int decompressdatacontentbytelength = decompressdatacontentbyte.length;
compressdata = null;
int footer = PushUtils.getInt(datacontentbyte, contentlength - 4);
if (footer == PushUtils.PACKAGEFOOTER)
{
ret = new byte[decompressdatacontentbytelength + 1];
ret[0] = datatype;
System.arraycopy(decompressdatacontentbyte, 0, ret, 1, decompressdatacontentbytelength);
datacontentbyte = null;
decompressdatacontentbyte = null;
}
}
else
{//数据未压缩
int contentlength = datalength - 1;
byte[] datacontentbyte = new byte[contentlength];
System.arraycopy(aDataBlock, offset, datacontentbyte, 0, contentlength);
offset += contentlength;
int footer = PushUtils.getInt(datacontentbyte, contentlength - 4);
if (footer == PushUtils.PACKAGEFOOTER)
{
ret = new byte[contentlength + 1 - 4];
ret[0] = datatype;
System.arraycopy(datacontentbyte, 0, ret, 1, contentlength - 4);
datacontentbyte = null;
}
}
}
}
}
if (mSocketEventListener != null)
{
mSocketEventListener.OnStreamComing(ret);
}
}
}
catch (Exception e)
{
MyLineLog.redLog(TAG + " - ReceiveData error", e.toString());
}
}public void closeSocket()
{
mRunRead = false;
if (mReadThread != null)
{
if (!mReadThread.isInterrupted())
{
mReadThread.interrupt();
mReadThread = null;
}
}
if (mSelector != null && mSelector.isOpen())
{
try
{
mSelector.close();
}
catch (IOException e)
{
MyLineLog.redLog(TAG + " - closeSocket error", e.toString());
}
mSelector = null;
}
if (client != null)
{
try
{
client.close();
client = null;
}
catch (IOException e)
{
MyLineLog.redLog(TAG + " - closeSocket2 error", e.toString());
}
}
System.gc();
}原文:http://www.cnblogs.com/cxchanpin/p/6806461.html