很多人使用 Apache Commons Pool.它有ObjectPool的接口,ObjectPoolFactory,PoolableObjectFactory和许多的实现。有addObject方法,borrowObject,invalidateObject,返回object添加,删除和返回对象。 PoolableObjectFactory定义对象池的操作行为,并提供各种回调。
但是Apache Commons Pool不是一个轻量开销的对象池,它很多方法采用了不建议使用的旧的Java的关键字synchronized。而Java 5中引入了Executor框架Java并发(多线程)。这里是最好的Executor框架。
package easypool; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public abstract class ObjectPool<T> { private ConcurrentLinkedQueue<T> pool; private ScheduledExecutorService executorService; /** * Creates the pool. * * @param minIdle minimum number of objects residing in the pool */ public ObjectPool(final int minIdle) { // initialize pool initialize(minIdle); } /** * Creates the pool. * * @param minIdle minimum number of objects residing in the pool * @param maxIdle maximum number of objects residing in the pool * @param validationInterval time in seconds for periodical checking of minIdle / maxIdle conditions in a separate thread. * When the number of objects is less than minIdle, missing instances will be created. * When the number of objects is greater than maxIdle, too many instances will be removed. */ public ObjectPool(final int minIdle, final int maxIdle, final long validationInterval) { // initialize pool initialize(minIdle); // check pool conditions in a separate thread executorService = Executors.newSingleThreadScheduledExecutor(); executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { int size = pool.size(); if (size < minIdle) { int sizeToBeAdded = minIdle - size; for (int i = 0; i < sizeToBeAdded; i++) { pool.add(createObject()); } } else if (size > maxIdle) { int sizeToBeRemoved = size - maxIdle; for (int i = 0; i < sizeToBeRemoved; i++) { pool.poll(); } } } }, validationInterval, validationInterval, TimeUnit.SECONDS); } /** * Gets the next free object from the pool. If the pool doesn‘t contain any objects, * a new object will be created and given to the caller of this method back. * * @return T borrowed object */ public T borrowObject() { T object; if ((object = pool.poll()) == null) { object = createObject(); } return object; } /** * Returns object back to the pool. * * @param object object to be returned */ public void returnObject(T object) { if (object == null) { return; } this.pool.offer(object); } /** * Shutdown this pool. */ public void shutdown() { if (executorService != null) { executorService.shutdown(); } } /** * Creates a new object. * * @return T new object */ protected abstract T createObject(); private void initialize(final int minIdle) { pool = new ConcurrentLinkedQueue<T>(); for (int i = 0; i < minIdle; i++) { pool.add(createObject()); } } }
package easypool; public class ExportingProcess { private String location; private long processNo = 0; public ExportingProcess(String location, long processNo) { this.location = location; this.processNo = processNo; // doing some time expensive calls / tasks // ... // for-loop is just for simulation for (int i = 0; i < Integer.MAX_VALUE; i++) { } System.out.println("Object with process no. " + processNo + " was created"); } public String getLocation() { return location; } public long getProcessNo() { return processNo; } @Override public String toString() { return "ExportingProcess{" + "processNo=" + processNo + ‘}‘; } }
package easypool; public class ExportingTask implements Runnable { private ObjectPool<ExportingProcess> pool; private int threadNo; public ExportingTask(ObjectPool<ExportingProcess> pool, int threadNo) { this.pool = pool; this.threadNo = threadNo; } public void run() { // get an object from the pool ExportingProcess exportingProcess = pool.borrowObject(); System.out.println("Thread " + threadNo + ": Object with process no. " + exportingProcess.getProcessNo() + " was borrowed"); // do something // ... // for-loop is just for simulation for (int i = 0; i < 100000; i++) { } // return ExportingProcess instance back to the pool pool.returnObject(exportingProcess); System.out.println("Thread " + threadNo + ": Object with process no. " + exportingProcess.getProcessNo() + " was returned"); } }
package easypool; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class TestObjectPool { private ObjectPool<ExportingProcess> pool; private AtomicLong processNo = new AtomicLong(0); @Before public void setUp() { // Create a pool of objects of type ExportingProcess. Parameters: // 1) Minimum number of special ExportingProcess instances residing in the pool = 4 // 2) Maximum number of special ExportingProcess instances residing in the pool = 10 // 3) Time in seconds for periodical checking of minIdle / maxIdle conditions in a separate thread = 5. // When the number of ExportingProcess instances is less than minIdle, missing instances will be created. // When the number of ExportingProcess instances is greater than maxIdle, too many instances will be removed. // If the validation interval is negative, no periodical checking of minIdle / maxIdle conditions // in a separate thread take place. These boundaries are ignored then. pool = new ObjectPool<ExportingProcess>(4, 10, 5) { protected ExportingProcess createObject() { // create a test object which takes some time for creation return new ExportingProcess("/home/temp/", processNo.incrementAndGet()); } }; } @After public void tearDown() { pool.shutdown(); } @Test public void testObjectPool() { ExecutorService executor = Executors.newFixedThreadPool(8); // execute 8 tasks in separate threads executor.execute(new ExportingTask(pool, 1)); executor.execute(new ExportingTask(pool, 2)); executor.execute(new ExportingTask(pool, 3)); executor.execute(new ExportingTask(pool, 4)); executor.execute(new ExportingTask(pool, 5)); executor.execute(new ExportingTask(pool, 6)); executor.execute(new ExportingTask(pool, 7)); executor.execute(new ExportingTask(pool, 8)); executor.shutdown(); try { executor.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }
package test; public class StringFormat { public String format(String str) { return "formated:"+str; } }
package test; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; public class StringFormatFactory extends BasePooledObjectFactory<StringFormat> { @Override public StringFormat create() { System.out.println("create object"); return new StringFormat(); } /** * Use the default PooledObject implementation. */ @Override public PooledObject<StringFormat> wrap(StringFormat buffer) { return new DefaultPooledObject<StringFormat>(buffer); } /** * When an object is returned to the pool, clear the buffer. */ @Override public void passivateObject(PooledObject<StringFormat> pooledObject) { System.out.println("Object been returned to pool"); } // for all other methods, the no-op implementation // in BasePooledObjectFactory will suffice }
package test; import java.io.IOException; import java.io.Reader; import java.util.ArrayList; import java.util.List; import org.apache.commons.pool2.ObjectPool; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; public class StringProcessor { private ObjectPool<StringFormat> pool; public StringProcessor(ObjectPool<StringFormat> pool) { this.pool = pool; } /** * Dumps the contents of the {@link Reader} to a String, closing the * {@link Reader} when done. */ public void process(List<String> strList) { for (String str : strList) { Thread thread = new StringProcessThread(pool, str); thread.start(); } //设置等待两秒,等待线程结束 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException { GenericObjectPoolConfig conf = new GenericObjectPoolConfig(); conf.setMaxTotal(3); conf.setMaxIdle(2); conf.setMinIdle(2); StringProcessor stringProcessor = new StringProcessor( new GenericObjectPool<StringFormat>(new StringFormatFactory(),conf)); List<String> strList = new ArrayList<String>(); strList.add("123"); strList.add("456"); strList.add("789"); strList.add("111"); strList.add("222"); strList.add("333"); stringProcessor.process(strList); } }
package test; import org.apache.commons.pool2.ObjectPool; public class StringProcessThread extends Thread { private ObjectPool<StringFormat> pool; private String toProcessStr; public StringProcessThread(ObjectPool<StringFormat> pool, String toProcessStr) { this.pool = pool; this.toProcessStr = toProcessStr; } public void run() { StringFormat stringFormat = null; try { stringFormat = pool.borrowObject(); String formattedStr = stringFormat.format(toProcessStr); System.out.println(formattedStr); } catch (Exception e) { e.printStackTrace(); } finally { try { if (stringFormat != null) { pool.returnObject(stringFormat); } } catch (Exception e) { e.printStackTrace(); } } } }