Spring对WebSocket的支持
Spring 4.x引入了新的模块spring-websocket,对WebSocket提供了全面的支持,Spring的WebSocket实现遵循JSR-356(Java WebSocket API),并且添加了一些额外特性。
绝大部分现代浏览器均支持WebSocket,包括IE 10+。对于不支持WebSocket的浏览器,Spring允许基于 SockJS协议作为备选传输方案。
与REST那种大量URL + HTTP方法来区分对象和操作的风格完全不同,WebSocket仅仅使用单个URL。WebSocket更加和传统的MOM类似,它是异步的、 事件驱动的基于消息的架构。
Spring 4 引入了新的模块spring-messaging,抽象出了Message、MessageChannel、MessageHandler等消息架构的基础概念。此模块包含了一些注解,用于将消息映射到方法(类似于Spring MVC把URL映射到方法)。
WebSocket是在TCP之上很薄的一层封装,它仅仅是把比特流转换为消息(文本、二进制)流,解析消息的职责由应用程序负责。我们可以在WebSocket之上提供应用层子协议。
在WebSocket握手阶段,客户端和服务器可以基于Sec-WebSocket-Protocol头来协商子协议。Spring支持STOMP —— 一个简单的消息协议。
Spring提供了可以在很多WebSocket引擎中运行的API,支持的引擎包括Tomcat 7.0.47+, Jetty 9.1+, GlassFish 4.1+, WebLogic 12.1.3+等。
要创建一个WebSocket服务器,可以实现WebSocketHandler接口,或者继承TextWebSocketHandler或者BinaryWebSocketHandler类:
1 2 3 4 5 6 7 8 9 10 11 12 |
import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.TextMessage; public class HelloHandler extends TextWebSocketHandler { // 接受消息的回调 public void handleTextMessage(WebSocketSession session, TextMessage message) { // 发送消息 session.sendMessage( new TextMessage( payload ) ); } } |
每个WebSocketHandler, 处理单个URL。在一个WebSocket端口上可以有多个WebSocketHandler。要注册WebSocketHandler,可以:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(helloHander(), "/hello"); } @Bean public WebSocketHandler helloHander() { return new HelloHander(); } } |
也可以使用等价的XML配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:websocket="http://www.springframework.org/schema/websocket" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket.xsd"> <websocket:handlers> <websocket:mapping path="/hello" handler="helloHander"/> </websocket:handlers> <bean id="helloHander" class="cc.gmem.study.spring.ws.HelloHandler"/> </beans> |
通过HandshakeInterceptor可以对WebSocket最初基于HTTP的握手进行定制,此拦截器暴露beforeHandshake/afterHandshake方法,实现这些方法可以:
- 阻止握手
- 设置在WebSocketSession中可以使用的属性
拦截器的注册方式为:
1 2 3 4 5 6 7 8 9 10 11 12 |
@Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(new HelloHandler(), "/hello") // 添加拦截器 .addInterceptors(new HttpSessionHandshakeInterceptor()); } } |
等价的XML配置:
1 2 3 4 5 6 |
<websocket:handlers> <websocket:mapping path="/hello" handler="helloHandler"/> <websocket:handshake-interceptors> <bean class="org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor"/> </websocket:handshake-interceptors> </websocket:handlers> |
使用WebSocketHandlerDecorator来装饰WebSocketHandler,可以实现额外的行为。当基于Java-Config / XML来配置时,日志、异常处理这两个装饰器自动添加。
ExceptionWebSocketHandlerDecorator会捕获任何WebSocketHandler抛出的异常,并以1011状态码(服务器错误)关闭WebSocket会话。
WebSocket API可以和Spring MVC一起使用,DispatcherServlet同时负责WebSocket握手和普通HTTP请求的处理。
你也可以独立在其它HTTP服务环境中使用WebSocket API,可以借助WebSocketHttpRequestHandler集成WebSocketHandler到HTTP服务环境中。
每种底层Servlet引擎都暴露了一些配置属性,进行缓冲区大小、超时时间等参数的配置。
当使用Tomcat/WildFly/GlassFish时,你可以使用ServletServerContainerFactoryBean进行引擎配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
@Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Bean public ServletServerContainerFactoryBean createWebSocketContainer() { ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean(); // WebSocket消息缓冲区大小,如果客户端发来的消息较大,需要按需调整 // 和libwebsockets配合时,客户端报错error on reading from skt : 104,即因为缓冲区不够大 container.setMaxTextMessageBufferSize(8192); container.setMaxBinaryMessageBufferSize(8192); return container; } } |
当使用Jetty时,你需要提供一个WebSocketServerFactory,并传递给Spring的DefaultHandshakeHandler:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
@Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(helloHandler(),"/hello") // 设置握手处理器 .setHandshakeHandler(handshakeHandler()); } @Bean public DefaultHandshakeHandler handshakeHandler() { WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER); policy.setInputBufferSize(8192); policy.setIdleTimeout(600000); return new DefaultHandshakeHandler( new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy))); } } |
从 Spring4.1.5开始,WebSocket/SockJS默认仅仅支持同源请求。不同策略下的行为如下:
- 仅仅允许同源请求(默认)。在此模式下,如果启用SockJS,则IFrame的HTTP响应头X-Frame-Options被设置为SAMEORIGIN,JSONP被禁用
- 允许指定列表的源,每个源必须以http或者https开头。在此模式下,如果启用SocketJS,IFrame、JSONP两种传输都被禁用
- 设置为*。在此模式下,所有传输都可以使用
修改配置的代码:
1 2 3 4 |
@Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(helloHandler(), "/hello").setAllowedOrigins("*"); } |
WebSocket不受一些老旧的浏览器支持,并且某些网络代理阻止了WebSocket协议。因此Spring将SockJS作为备选实现,模拟WebSocket API。
要启用SockJS支持,调用:
1 2 3 4 5 6 |
@Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(helloHandler(), "/hello").withSockJS(); } // 等价XML配置 <websocket:sockjs/> |
为了防止代理服务器认为连接已经挂起,SockJS Protocol需要发送心跳消息。Spring提供配置参数 .withSockJS().setHeartbeatTime( )来设置心跳频率,默认值25s。
HTTP流/长轮询这两种传输,要求连接打开时间比使用它的时间更长。在Servlet容器中,这依赖于Servlet 3的异步支持实现 —— 允许请求处理线程退出,之后由其它线程继续向响应中写入数据。
异步请求的问题在于,服务器不知道客户端是否已经断开,只有在后续继续写入响应时,才会抛出异常。不管怎么样,心跳还是能够最终发现断开的。
如果允许跨源请求,SockJS协议依赖CORS来支持跨站HTTP流/长轮询,因此CORS头会被自动添加,除非检测到响应头中指定了对应的CORS头。
配置suppressCors可以禁止自动添加CORS头。
SockJS期望的头包括:
- Access-Control-Allow-Origin,基于Origin请求头初始化
- Access-Control-Allow-Credentials总设置为true
- Access-Control-Request-Headers从对应的请求头初始化
- Access-Control-Allow-Methods传输机制所需要的HTTP方法
- Access-Control-Max-Age设置为31536000(一年)
Spring实现了SockJS的Java客户端,允许你在服务器中使用SockJS,或者在压力测试中模拟大量客户端。
此客户端支持websocket/xhr-streaming/xhr-polling这三种传输。其中:
- WebSocketTransport可以连同下面的实现使用:
- JSR-356的StandardWebSocketClient
- Jetty 9的JettyWebSocketClient
- Spring的任何WebSocketClient实现类
- XhrTransport有两种实现:
- RestTemplateXhrTransport,基于Spring的RestTemplate
- JettyXhrTransport,基于Jetty的HttpClient
客户端代码示例:
1 2 3 4 5 6 |
List<Transport> transports = new ArrayList<>(2); transports.add(new WebSocketTransport(new StandardWebSocketClient())); transports.add(new RestTemplateXhrTransport()); SockJsClient sockJsClient = new SockJsClient(transports); sockJsClient.doHandshake(new HelloHandler(), "ws://gmem.cc:8888/hello"); |
当模拟大量并发客户端时,底层HTTP客户端实现应该配有足够的资源,例如:
1 2 3 |
HttpClient jettyHttpClient = new HttpClient(); jettyHttpClient.setMaxConnectionsPerDestination(1000); jettyHttpClient.setExecutor(new QueuedThreadPool(1000)); |
WebSocket协议定义了两种消息类型:文本和二进制数据,但是消息的内容没有定义。通常情况下,服务器和客户端能够协商使用一种子协议,来定义消息的结构,STOMP是一种常见的选择,其优势在于:
- 浏览器中可以使用stomp.js
- 不需要引入新的消息格式
- 支持基于destination的消息路由
- 能够与支持STOMP的MOM集成
STOMP是一种文本协议,最初设计供Ruby/Python/Perl之类的脚本语言使用,以连接到企业的消息代理。STOMP被设计用来处理常见的消息模式,可以基于任何双向可靠信道 —— 例如TCP、WebSocket ——传输。
尽管STOMP是基于文本的协议,但是它的载荷部分可以是二进制的。
STOMP是一种基于Frame的协议,其Frame设计理念源于HTTP。一个Frame的结构如下:
1 2 3 4 5 |
COMMAND header1:value1 header2:value2 Body^@ |
客户端可以使用SEND或者SUBSCRIBE命令,可以发送、订阅消息。此时需要指定一个destination头。下面是两个示例:
1 2 3 4 5 |
SUBSCRIBE id:sub-1 destination:/topic/price.stock.* ^@ |
1 2 3 4 5 6 |
SEND destination:/queue/trade content-type:application/json content-length:44 {"action":"BUY","ticker":"MMM","shares",44}^@ |
STOMP服务器可以使用MESSAGE来广播消息到所有订阅者:
1 2 3 4 5 6 |
MESSAGE message-id:nxahklf6-1 subscription:sub-1 destination:/topic/price.stock.MMM {"ticker":"MMM","price":129.45}^@ |
当使用Spring的STOMP支持时,Spring的WebSocket应用相对客户端而言是STOMP代理。消息会被路由给@Controller下的消息处理方法或者一个简单内存消息代理处理。
你也可以配置Spring,让其与支持STOMP的消息中间件(例如RabbitMQ、ActiveMQ)一起工作,这样客户端就可以把消息发送消息中间件网络中。Spring负责维护到MOM的TCP连接、把消息中继到MOM、并且把监听到的消息下发给连接到Spring的那些客户端。
利用spring-messaging和spring-websocket模块,Spring能够支持STOMP over WS。
配置示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
@Configuration // 启用基于WebSocket的消息代理(使用某种子协议) @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 在/stomp暴露一个基于WebSocket/SockJS的STOMP端点 registry.addEndpoint("/stomp").withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry config) { // 如果destination以/app开头,则消息路由给@Controller下的消息处理方法 config.setApplicationDestinationPrefixes("/app"); // 所有destination均由@Controller下的消息@MessageMapping方法处理 config.setApplicationDestinationPrefixes("/"); // 下面的两种开头的destination广播给所有其它客户端 config.enableSimpleBroker("/topic", "/queue"); } } @Controller @MessageMapping("greeting") public class GreetingController { @Inject private SimpMessagingTemplate template; // 用于发送消息 // 此消息处理方法处理/app/greeting/hello这一目标 @MessageMapping("hello") { public String hello(String greeting) { String msg = "[" + getTimestamp() + ": " + greeting; // 可以向任何地方发送消息 this.template.convertAndSend("/topic/greetings", msg); } } |
等价XML配置:
1 2 3 4 5 6 |
<websocket:message-broker application-destination-prefix="/app"> <websocket:stomp-endpoint path="/stomp"> <websocket:sockjs/> </websocket:stomp-endpoint> <websocket:simple-broker prefix="/topic, /queue"/> </websocket:message-broker> |
消息目的地,默认的路径分隔好符是 / ,客户端发送时,目的地必须以 / 为第一个字符。除非包含多个路径分段,@MessageMapping的路径不需要包含 / 。
如果要使用MOM领域更加通用的点号分隔符,调用:
1 |
registry.setPathMatcher(new AntPathMatcher(".")); |
等价的XML配置为:
1 2 3 4 5 |
<websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher"> </websocket:message-broker> <bean id="pathMatcher" class="org.springframework.util.AntPathMatcher"> <constructor-arg index="0" value="." /> </bean> |
即使使用点号分隔符,客户端发送的目的地,也要以 / 开头。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
// 可以使用SockJS: var socket = new SockJS("/stomp"); var client = Stomp.over(socket); // 或者直接使用WebSocket var client = Stomp.over( new WebSocket( 'ws://172.21.0.1:9090/signal' ) ); // 心跳设置 client.heartbeat.outgoing = 20000; // 每20秒发送一次心跳给服务器 client.heartbeat.incoming = 0; // 不接受服务器发送来的心跳 // 调试设置 client.debug = function(str) { console.log(str); }; // 连接 client.connect(login, passcode, connectCallback, errorCallback); client.connect(headers, connectCallback, errorCallback); function connectCallback( frame ){ } // 发送消息,目的地、头、体 client.send("/queue/hello", {priority: 9}, "Hello, STOMP"); // 订阅消息 var subscription = client.subscribe("/topic/hello", callback); function callback( message ){ console.log( message.body ); } // 带消息确认设置的订阅:客户端确认 var subscription = client.subscribe("/topic/hello", callback, {ack: 'client'}); function callback( message ){ // 确认 message.ack(); } // 事务支持 var tx = client.begin(); // transaction头必须 client.send("/queue/hello", {transaction: tx.id}, "message in a transaction"); tx.commit(); // 提交事务 tx.abort(); // 撤销事务 |
或者直接使用WebSocket:
1 2 |
var socket = new WebSocket("/stomp"); var client = Stomp.over(socket); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
WebSocketClient webSocketClient = new StandardWebSocketClient(); WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient); stompClient.setMessageConverter(new StringMessageConverter()); stompClient.setTaskScheduler(taskScheduler); // 用于发送心跳 // 创建连接 String url = "ws://127.0.0.1:8080/endpoint"; StompSessionHandler sessionHandler = new StompSessionHandlerImpl(); class StompSessionHandlerImpl extends StompSessionHandlerAdapter { public void afterConnected(StompSession session, StompHeaders connectedHeaders) { // 连接成功后,此回调被调用 } } stompClient.connect(url, sessionHandler); // 发送消息 session.send("/topic/foo", "payload"); // 订阅消息 session.subscribe("/topic/foo", new StompFrameHandler() { public Type getPayloadType(StompHeaders headers) { return String.class; } public void handleFrame(StompHeaders headers, Object payload) { // 处理消息 } }); |
spring-messaging提供了以下抽象:
对象 | 说明 |
Message | 一个带有头、载荷的消息 |
MessageHandler | 处理消息的逻辑单元 |
MessageChannel | 在发送者/接收者之间传输消息的信道的抽象,通道总是单向的 |
SubscribableChannel | 继承MessageChannel,用于传输消息到所有订阅者 |
ExecutorSubscribableChannel | 继承SubscribableChannel,使用异步线程池传输消息 |
你可以在@Controller类的方法上添加@MessageMapping注解,这类方法可以映射某个/某些消息destination。
@MessageMapping对应的URL支持Ant风格的通配符,例如/foo*、/foo/**。路径变量也是支持的,例如/foo/{id}中的id可以通过注解了@DestinationVariable的方法参数访问到。
你可以为@MessageMapping方法注入很多种参数:
参数 | 说明 |
Message | 访问完整的消息 |
@Payload | 访问消息的载荷,消息被基于org.springframework.messaging.converter.MessageConverter转换 |
@Header | 访问消息头 |
@Headers | 访问所有消息头的Map |
@DestinationVariable | 访问路径变量 |
Principal | 在WS握手阶段登陆的用户 |
使用STOMP时,身份验证基于HTTP协议的机制进行。
尽管STOMP协议包含login、passcode头,但是它们通常在STOMP over TCP的情况下使用。Spring默认会忽略这些头,并且假设在HTTP升级到WebSocket之前已经完成身份验证。
如果需要基于STOMP头进行身份验证,可以进行如下配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
@Configuration @EnableWebSocketMessageBroker public class AppConfig extends AbstractWebSocketMessageBrokerConfigurer { @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.setInterceptors(new ChannelInterceptorAdapter() { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (StompCommand.CONNECT.equals(accessor.getCommand())) { String login = accessor.getNativeHeader( "login" ).get( 0 ); Principal user = new PrincipalImpl( login ); accessor.setUser(user); } return message; } }); } } |
注意,不进行任何配置的情况下,你不能为@MessageMapping方法注入Principal对象,执行了上述配置则可以注入。其它备选的身份验证方式包括:
- 子类化DefaultHandshakeHandler,覆盖determineUser方法,这样可以在WebSocket握手阶段确定用户。示例:
123456789101112registry.addEndpoint( "/signal" ).setHandshakeHandler( new DefaultHandshakeHandler() {@Overrideprotected Principal determineUser( ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes ) {Principal principal = request.getPrincipal();if ( principal == null ) {Collection<SimpleGrantedAuthority> authorities = new ArrayList<>();authorities.add( new SimpleGrantedAuthority( AuthoritiesConstants.ANONYMOUS ) );principal = new AnonymousAuthenticationToken( "WebsocketConfiguration", "anonymous", authorities );}return principal;}} ); - 使用基于HTTP的身份验证,Spring会尝试从HttpServletRequest.getUserPrincipal中获得当前用户
默认情况下,Spring认为 /user/开头的目的地属于用户目的地,每个WebSocket会话都有这种目的地的同名副本。
客户端代码示例:
1 2 3 4 5 6 7 8 9 10 11 |
let client = Stomp.over( new WebSocket( 'ws://172.21.0.1:9090/signal' ) ); client.connect( {}, ( frame ) => { start(); } ); function start() { // 客户端订阅用户目的地,需要/user前缀 client.subscribe( '/user/rtsp/preview/sdpanswer', function ( frame ) { console.log( frame.body ); } ); client.send( '/app/rtsp/preview/sdpoffer', {}, '1' ); } |
服务器代码示例:
1 2 3 4 5 6 7 8 9 10 |
@Controller @MessageMapping( "/rtsp/preview" ) public class RtspPreviewController { @MessageMapping( "/sdpoffer" ) // 发送到用户目的地(仅仅发送给当前WebSocket会话对应的客户端),需要指定完整路径,/user前缀不需要 @SendToUser( "/rtsp/preview/sdpanswer" ) public String connect( String payload ) { return payload; } } |
关于@SendToUser需要注意,实际发送到的目的地是/user/{username}/rtsp/preview,Spring按照以下规则确定username:
- 如果当前会话的Principal存在,则取Principal.getName()作为用户名
- 否则,取会话标识符,会话标识符来自消息头中的simpSessionId字段
当允许同一个用户在多个浏览器中登陆时,要注意这个情况,如果Principal存放登陆名,客户端可能接收到不期望的消息。
配置示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
@SpringBootApplication // 两个注解都需要: @EnableWebSocket @EnableWebSocketMessageBroker public class VideoSurveillanceApp extends AbstractWebSocketMessageBrokerConfigurer implements WebSocketConfigurer { public void registerWebSocketHandlers( WebSocketHandlerRegistry registry ) { // 下面的端点使用原始WebSocket registry.addHandler( helloHandler(), "/hello" ); } public void registerStompEndpoints( StompEndpointRegistry registry ) { // 下面的端点使用STOMP registry.addEndpoint( "/signal" ); } } |
Leave a Reply