Flux转Mono next()
import java.util.LinkedHashMap; import java.util.Map; import java.util.NoSuchElementException; import java.util.function.Function; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Slf4j public class TestFindResult { private static final Map<String, String> templates; private static final int sleep = 1000; static { templates = new LinkedHashMap<>(); templates.put("aDB", "a"); templates.put("bDB", "b"); templates.put("cDB", "c"); } public Mono<String> findResult(Function<String, Mono<String>> query) { return Flux.fromIterable(templates.values()) .flatMap(query) .next() .onErrorResume(NoSuchElementException.class, e -> Mono.empty()) .onErrorMap(IndexOutOfBoundsException.class, MultipleUpstreamException::new); } public static void main(String[] args) { TestFindResult test = new TestFindResult(); Function<String, Mono<String>> query = (value) -> { try { Thread.sleep(sleep); // mock DB query } catch (InterruptedException e) { e.printStackTrace(); } log.info( "Thread id:{}, Thread name:{}, value:{}, used ms:{}", Thread.currentThread().getId(), Thread.currentThread().getName(), value, sleep); return Mono.just(value); }; System.out.println(test.findResult(query).subscribe()); } }
import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.StopWatch; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Slf4j public class TestFindMongo { private static final Map<String, String> templates; private static final int sleep = 1000; static { templates = new LinkedHashMap<>(); templates.put("aDB", "a"); templates.put("bDB", "b"); templates.put("cDB", "c"); } public Mono<String> findMongo() { StopWatch stopWatch = StopWatch.createStarted(); return Flux.fromIterable(templates.entrySet()) .filterWhen( template -> { String key = template.getKey(); String value = template.getValue(); try { Thread.sleep(sleep); // mock DB query } catch (InterruptedException e) { e.printStackTrace(); } log.info( "Thread id:{}, Thread name:{}, query:{}, value:{} , used ms:{}", Thread.currentThread().getId(), Thread.currentThread().getName(), key, value, sleep); return Mono.just(value.equals("b")); }) .next() .doOnSuccess(templateEntry -> log.info("Match {} ", templateEntry.getKey())) .map(Entry::getValue) .onErrorResume(NoSuchElementException.class, e -> Mono.empty()) .onErrorMap(IndexOutOfBoundsException.class, MultipleUpstreamException::new) .doOnTerminate(() -> log.info("Database recon took {} ms", stopWatch.getTime())); } public static void main(String[] args) { TestFindMongo test = new TestFindMongo(); System.out.println(test.findMongo().subscribe()); } }
import static org.springframework.http.HttpStatus.*; import org.springframework.web.server.ResponseStatusException; import reactor.core.publisher.Flux; public class MultipleUpstreamException extends ResponseStatusException { private static final String MULTILPLE_UPSTREAM_MATCH_ERR = "Your query contains properties matching multiple upstreams. " + "Data for multiple upstreams can‘t be returned in one query. " + "Please either specify upstream by providing publisherSystem " + "(GSM,MUNI_ITICKET,MUNI_OASYS,TPSDERIV,EDLR) " + "and region or request deal properties matching only one upstream"; MultipleUpstreamException() { super(BAD_REQUEST, MULTILPLE_UPSTREAM_MATCH_ERR); } /** * This constructor has syntax adapted to Mono API * * @param indexOutOfBoundsException emitted on {@link Flux#single()} when Flux has more than one * elements * @see Mono#onErrorMap(Class, java.util.function.Function)) */ MultipleUpstreamException(IndexOutOfBoundsException indexOutOfBoundsException) { this(); } }
相关推荐
flx 2020-08-31
flx 2019-08-25
89304896 2020-01-21
86334996 2020-01-16
LinuxStory 2013-05-31
wordjoke 2014-07-11
coffeecream 2010-07-03
comeonxueRong 2016-12-20
wangkeIDC 2011-05-01
renkai 2011-02-21
linuxprobe0 2010-12-25
牛初九 2008-10-08
ThinkInLinux 2019-09-29
86214251 2018-07-20
YoungForever 2016-02-23
明鱼 2014-02-23
Linux学堂 2011-05-05
fhzh0 2015-08-24