网站首页 文章专栏 soul-web是如何处理使用webflux处理请求的
SpringWebflux是SpringFramework5.0添加的新功能,WebFlux本身追随当下最火的Reactive Programming而诞生的框架
WebFlux是一个典型非阻塞异步的框架,它的核心是基于Reactor的相关API实现的。相对于传统的web框架来说,它可以运行在诸如Netty,Undertow及支持Servlet3.1的容器上,因此它的运行环境的可选择行要比传统web框架多的多。
根据官方的说法,webflux主要在如下两方面体现出独有的优势:
1)非阻塞式
其实在servlet3.1提供了非阻塞的API,WebFlux提供了一种比其更完美的解决方案。使用非阻塞的方式可以利用较小的线程或硬件资源来处理并发进而提高其可伸缩性
2) 函数式编程端点
老生常谈的编程方式了,Spring5必须让你使用java8,那么函数式编程就是java8重要的特点之一,而WebFlux支持函数式编程来定义路由端点处理请求。
WebHandler
WebHandler定义了Web请求必要一些处理行为,WebFlux已经脱离了Servlet API,那么使用WebFlux时遇到会话机制怎么办,想要对请求过滤处理怎么办或者想要处理静态资源怎么办等等,那么WebHandler就是做这个事情的。其实在HttpHandler的基本实现类通过适配器模式及装饰模式也间接的实现了WebHandler接口
WebFlux 内部使用的是响应式编程(Reactive Programming),以 Reactor 库为基础, 基于异步和事件驱动,可以让我们在不扩充硬件资源的前提下,提升系统的吞吐量和伸缩性。
看到这里,你是不是以为 WebFlux 能够使程序运行的更快呢?量化一点,比如说我使用 WebFlux 以后,一个接口的请求响应时间是不是就缩短了呢?
抱歉了,答案是否定的!以下是官方原话:
Reactive and non-blocking generally do not make applications run faster.
翻译一下:
WebFlux 并不能使接口的响应时间缩短,它仅仅能够提升吞吐量和伸缩性。
那么问题来了,他的应用场景是什么呢?
Spring WebFlux 是一个异步非阻塞式的 Web 框架,所以,它特别适合应用在 IO 密集型的服务中,比如微服务网关这样的应用中。
IO 密集型包括:磁盘IO密集型, 网络IO密集型,微服务网关就属于网络 IO 密集型,使用异步非阻塞式编程模型,能够显著地提升网关对下游服务转发的吞吐量。
Spring MVC 的前端控制器是 DispatcherServlet, 而 WebFlux 是 DispatcherHandler,它实现了 WebHandler 接口:
public interface WebHandler { Mono<Void> handle(ServerWebExchange var1); }
DispatcherHandler类中处理请求的 handle 方法:
①:ServerWebExchange 对象中放置每一次 HTTP 请求响应信息,包括参数等;
②:判断整个接口映射 mappings 集合是否为空,空则创建一个 Not Found 的错误;
③:根据具体的请求地址获取对应的 handlerMapping;
④:调用具体业务方法,也就是我们定义的接口方法;
⑤:处理返回的结果;
soul中通过自定义一个SoulWebHandler,实现WebHandler接口,去处理请求
public final class SoulWebHandler implements WebHandler { private List<SoulPlugin> plugins; private Scheduler scheduler; public SoulWebHandler(final List<SoulPlugin> plugins) { this.plugins = plugins; String schedulerType = System.getProperty("soul.scheduler.type", "fixed"); if (Objects.equals(schedulerType, "fixed")) { int threads = Integer.parseInt(System.getProperty( "soul.work.threads", "" + Math.max((Runtime.getRuntime().availableProcessors() << 1) + 1, 16))); scheduler = Schedulers.newParallel("soul-work-threads", threads); } else { scheduler = Schedulers.elastic(); } } @Override public Mono<Void> handle(@NonNull final ServerWebExchange exchange) { MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName()); Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName()); return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler) .doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time))); } ... }
并通过自动装配,加载为bean
@Bean("webHandler") public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) { List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList); final List<SoulPlugin> soulPlugins = pluginList.stream() .sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList()); soulPlugins.forEach(soulPlugin -> log.info("loader plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName())); return new SoulWebHandler(soulPlugins); } @Bean("dispatcherHandler") public DispatcherHandler dispatcherHandler() { return new DispatcherHandler(); }
当请求过来时,先去处理过滤器(soul中定义了一系列过滤器先处理请求),在走到自定义的soulWebHandler中的handler方法,且在 exchange中有具体请求的 request 以及 response信息
接下来通过责任链模式,走到DefaultSoulPluginChain,再去根据初始化的插件链,去循环具体的执行每一个插件的 excute 方法,(这里的插件是有顺序的)
@Override public Mono<Void> execute(final ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < plugins.size()) { SoulPlugin plugin = plugins.get(this.index++); Boolean skip = plugin.skip(exchange); if (skip) { return this.execute(exchange); } else { return plugin.execute(exchange, this); } } else { return Mono.empty(); } }); }
其他具体执行的流程在其他文章中有过说明,具体见:
版权声明:本文由星尘阁原创出品,转载请注明出处!