springboot(六)——springboot与webflux结合初探

spring-cloud-gateway 的ReactorHttpHandlerAdapter

这几天看了看spring-cloud-gateway的请求处理流程,因为之前一直用的springboot1.x和spring4,一开始对spring-cloud-gateway的处理流程有点懵逼,找不到入口,后来跟了代码,在网上找了点资料,发现spring-cloud-gateway的入口在ReactorHttpHandlerAdapter的apply方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {

private static final Log logger = HttpLogging.forLogName(ReactorHttpHandlerAdapter.class);


private final HttpHandler httpHandler;
public ReactorHttpHandlerAdapter(HttpHandler httpHandler) {
Assert.notNull(httpHandler, "HttpHandler must not be null");
this.httpHandler = httpHandler;
}
@Override
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try {
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);

if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}

return this.httpHandler.handle(request, response)
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
}
catch (URISyntaxException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
return Mono.empty();
}
}

}

该方法的作用就是把接收到的HttpServerRequest或者最终需要返回的HttpServerResponse,包装转换为ReactorServerHttpRequest和ReactorServerHttpResponse。

spring-webflux

当然,这篇文章的主要内容不是谈论spring-cloud-gateway了,因为之前一直用的spring4,所以对spring5当中的反应式编程范式和webflux不太了解,所以先写个demo了解一下
第一步:引入相关pom,测试的相关pom根据自己的需要引入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>


<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

第二步:创建一个HandlerFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestFunction implements HandlerFunction<ServerResponse> {

@Override
public Mono<ServerResponse> handle(ServerRequest serverRequest) {
return ServerResponse.ok().body(
Mono.just(parse(serverRequest, "args1") + parse(serverRequest, "args2"))
, Integer.class);
}

private int parse(final ServerRequest request, final String param) {
return Integer.parseInt(request.queryParam(param).orElse("0"));
}
}

第三步:注入一个RouterFunction

1
2
3
4
5
6
7
8
@Configuration
public class TestRouteFunction {

@Bean
public RouterFunction<ServerResponse> routerFunction() {
return RouterFunctions.route(RequestPredicates.GET("/add"), new TestFunction());
}
}

第四步:在webflux中,也可以使用之前的java注解的编程方式,我们也创建一个controller

1
2
3
4
5
6
7
8
9
@RestController
@RequestMapping("/api/test")
public class HelloController {

@RequestMapping("/hello")
public Mono<String> hello() {
return Mono.just("hello world");
}
}

第五步:创建启动类

1
2
3
4
5
6
7
@SpringBootApplication
public class Spring5DemoApplication {

public static void main(String[] args) {
SpringApplication.run(Spring5DemoApplication.class, args);
}
}

第六步:启动项目,访问如下两个接口都可以

1
2
http://localhost:8080/api/test/hello
http://localhost:8080/add?args1=2&args2=3

和spring-boot结合

通过上面的例子,我们看到基本的两个类:HandlerFunction和RouterFunction,同时webflux有如下特性:

  1. 异步非阻塞
  2. 响应式(reactive)函数编程,纯lambda表达式
  3. 不仅仅是在Servlet容器中tomcat/jetty中运行,同时支持NIO的Netty和Undertow中,实际项目中,我们往往与spring-boot项目结合,我们跟进代码可以看看spring-boot是在什么时候创建的server
    一、SpringApplication
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    public ConfigurableApplicationContext run(String... args) {
    StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    ConfigurableApplicationContext context = null;
    Collection<SpringBootExceptionReporter> exceptionReporters = new ArrayList<>();
    configureHeadlessProperty();
    SpringApplicationRunListeners listeners = getRunListeners(args);
    listeners.starting();
    try {
    ApplicationArguments applicationArguments = new DefaultApplicationArguments(
    args);
    ConfigurableEnvironment environment = prepareEnvironment(listeners,
    applicationArguments);
    configureIgnoreBeanInfo(environment);
    Banner printedBanner = printBanner(environment);
    context = createApplicationContext();
    exceptionReporters = getSpringFactoriesInstances(
    SpringBootExceptionReporter.class,
    new Class[] { ConfigurableApplicationContext.class }, context);
    prepareContext(context, environment, listeners, applicationArguments,
    printedBanner);
    refreshContext(context);
    afterRefresh(context, applicationArguments);
    stopWatch.stop();
    if (this.logStartupInfo) {
    new StartupInfoLogger(this.mainApplicationClass)
    .logStarted(getApplicationLog(), stopWatch);
    }
    listeners.started(context);
    callRunners(context, applicationArguments);
    }
    catch (Throwable ex) {
    handleRunFailure(context, ex, exceptionReporters, listeners);
    throw new IllegalStateException(ex);
    }

    try {
    listeners.running(context);
    }
    catch (Throwable ex) {
    handleRunFailure(context, ex, exceptionReporters, null);
    throw new IllegalStateException(ex);
    }
    return context;
    }

我们只分析入口,其它代码暂时不管,找到refreshContext(context);这一行进去

二、ReactiveWebServerApplicationContext的refresh()

1
2
3
4
5
6
7
8
9
10
@Override
public final void refresh() throws BeansException, IllegalStateException {
try {
super.refresh();
}
catch (RuntimeException ex) {
stopAndReleaseReactiveWebServer();
throw ex;
}
}

三、ReactiveWebServerApplicationContext的onRefresh()

1
2
3
4
5
6
7
8
9
10
11
@Override
protected void onRefresh() {
super.onRefresh();
try {
createWebServer();
}
catch (Throwable ex) {
throw new ApplicationContextException("Unable to start reactive web server",
ex);
}
}

四、看到这里我们就找到入口方法了:createWebServer(),跟进去,找到NettyReactiveWebServerFactory中创建webserver

1
2
3
4
5
6
7
@Override
public WebServer getWebServer(HttpHandler httpHandler) {
HttpServer httpServer = createHttpServer();
ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(
httpHandler);
return new NettyWebServer(httpServer, handlerAdapter, this.lifecycleTimeout);
}

看到ReactorHttpHandlerAdapter这个类想必特别亲切,在开篇说过是spring-cloud-gateway的入口,createHttpServer方法的细节暂时没有去学习了,后续有时间去深入了解下

结语

spring5的相关新特性也是在学习中,这一篇文章算是和springboot结合的入门吧,后续有时间再深入学习

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×