与Function接口类型,接收一个参数,返回一个对象。不同的是,其返回的是ListenableFuture
public class AsyncFuntionSample implements AsyncFunction<Long,String> {private ConcurrentMap<Long,String> map = Maps.newConcurrentMap();private ListeningExecutorService listeningExecutorService;@Overridepublic ListenableFuture<String> apply(final Long input) throws Exception {if(map.containsKey(input)) {SettableFuture<String> listenableFuture = SettableFuture.create();listenableFuture.set(map.get(input));return listenableFuture;}else{return listeningExecutorService.submit(new Callable<String>(){@Overridepublic String call() throws Exception {String retrieved = service.get(input);map.putIfAbsent (input,retrieved);return retrieved;}});}}}
完整示例
import bbejeck.guava.common.SearchingTestBase;
import bbejeck.guava.common.support.model.Person;
import com.google.common.base.Function;
import com.google.common.util.concurrent.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;public class AsyncFunctionTest extends SearchingTestBase {private int numberTasks;private CountDownLatch startSignal;private CountDownLatch doneSignal;private ListeningExecutorService executorService;@Beforepublic void setUp() throws Exception {numberTasks = 5;startSignal = new CountDownLatch(1);doneSignal = new CountDownLatch(numberTasks);executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());}@Afterpublic void tearDown() {executorService.shutdownNow();}@Testpublic void testAsyncTransform() throws Exception {AsyncFunction<List<String>, List<Person>> queryFunction = new AsyncFunction<List<String>, List<Person>>() {@Overridepublic ListenableFuture<List<Person>> apply(final List<String> ids) {return dbService.getPersonsByIdAsync(ids);}};ListenableFuture<List<String>> indexSearch = luceneSearcher.searchAsync("firstName:martin");ListenableFuture<List<Person>> results = Futures.transform(indexSearch, queryFunction, executorService);List<Person> persons = results.get(1, TimeUnit.SECONDS);assertThat(persons.size(), is(74));for (Person person : persons) {assertThat(person.firstName, is("Martin"));}}@Testpublic void testTransformSearch() throws Exception {Function<List<String>, List<Person>> transformSearchResults = new Function<List<String>, List<Person>>() {@Overridepublic List<Person> apply(List<String> ids) {return dbService.getPersonsById(ids);}};ListenableFuture<List<String>> indexSearch = luceneSearcher.searchAsync("firstName:martin");ListenableFuture<List<Person>> transformedResults = Futures.transform(indexSearch, transformSearchResults, executorService);List<Person> persons = transformedResults.get(1, TimeUnit.SECONDS);int expectedSize = 74;assertThat(persons.size(), is(expectedSize));for (Person person : persons) {assertThat(person.firstName, is("Martin"));}}}