再谈 websocket 论架构设计

Java基础

浏览数:1,823

2018-3-18

导语

本篇文章以websocket的原理和落地为核心,来叙述websocket的使用,以及相关应用场景。

websocket概述

http与websocket

如我们所了解,http连接为一次请求一次响应(request->response),必须为同步调用方式。
而websocket为一次连接以后,会建立tcp连接,后续客户端与服务器交互为全双工方式的交互方式,客户端可以发送消息到服务端,服务端也可将消息发送给客户端。

此图来源于Websocket协议的学习、调研和实现,如有侵权问题,告知后,删除。

根据上图,我们大致可以了解到http与websocket之间的区别和不同。

为什么要使用websocket

那么了解http与websocket之间的不同以后,我们为什么要使用websocket呢? 他的应用场景是什么呢?

我找到了一个比较符合websocket使用场景的描述

“The best fit for WebSocket is in web applications where the client and server need to exchange events at high frequency and with low latency.”
翻译: 在客户端与服务器端交互的web应用中,websocket最适合在高频率低延迟的场景下,进行事件的交换和处理

此段来源于spring websocket的官方文档

了解以上知识后,我举出几个比较常见的场景:

  1. 游戏中的数据传输
  2. 股票K线图数据
  3. 客服系统

根据如上所述,各个系统都来使用websocket不是更好吗?

其实并不是,websocket建立连接之后,后边交互都由tcp协议进行交互,故开发的复杂度会较高。当然websocket通讯,本身要考虑的事情要比HTTP协议的通讯考虑的更多.

所以如果不是有特殊要求(即 应用不是”高频率低延迟”的要求),需要优先考虑HTTP协议是否可以满足。

比如新闻系统,新闻的数据晚上10分钟-30分钟,是可以接受的,那么就可以采用HTTP的方式进行轮询(polling)操作调用REST接口。

当然有时我们建立了websocket通讯,并且希望通过HTTP提供的REST接口推送给某客户端,此时需要考虑REST接口接受数据传送给websocket中,进行广播式的通讯方式。

至此,我已经讲述了三种交互方式的使用场景:

  1. websocket独立使用场景
  2. HTTP独立使用场景
  3. HTTP中转websocket使用场景

相关技术概念

websocket

websocket为一次HTTP握手后,后续通讯为tcp协议的通讯方式。

当然,和HTTP一样,websocket也有一些约定的通讯方式,http通讯方式为http开头的方式,e.g. http://xxx.com/path ,websocket通讯方式则为ws开头的方式,e.g. ws://xxx.com/path

SSL:

  1. HTTP: https://xxx.com/path
  2. WEBSOCKET: wss://xxx.com/path

此图来源于WebSocket 教程,如有侵权问题,告知后,删除。

SockJS

正如我们所知,websocket协议虽然已经被制定,当时还有很多版本的浏览器或浏览器厂商还没有支持的很好。

所以,SockJS,可以理解为是websocket的一个备选方案。

那它如何规定备选方案的呢?

它大概支持这样几个方案:

  1. Websockets
  2. Streaming
  3. Polling

当然,开启并使用SockJS后,它会优先选用websocket协议作为传输协议,如果浏览器不支持websocket协议,则会在其他方案中,选择一个较好的协议进行通讯。

看一下目前浏览器的支持情况:

所以,如果使用SockJS进行通讯,它将在使用上保持一致,底层由它自己去选择相应的协议。

可以认为SockJS是websocket通讯层上的上层协议。

底层对于开发者来说是透明的。

STOMP

STOMP 中文为: 面向消息的简单文本协议

websocket定义了两种传输信息类型: 文本信息 和 二进制信息 ( text and binary ).

类型虽然被确定,但是他们的传输体是没有规定的。

当然你可以自己来写传输体,来规定传输内容。(当然,这样的复杂度是很高的)

所以,需要用一种简单的文本传输类型来规定传输内容,它可以作为通讯中的文本传输协议,即交互中的高级协议来定义交互信息。

STOMP本身可以支持流类型的网络传输协议: websocket协议和tcp协议

它的格式为:

COMMAND
header1:value1
header2:value2

Body^@




SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*

^@



SEND
destination:/queue/trade
content-type:application/json
content-length:44

{"action":"BUY","ticker":"MMM","shares",44}^@

当然STOMP已经应用于很多消息代理中,作为一个传输协议的规定,如:RabbitMQ, ActiveMQ

我们皆可以用STOMP和这类MQ进行消息交互.

除了STOMP相关的代理外,实际上还提供了一个stomp.js,用于浏览器客户端使用STOMP消息协议传输的js库。

让我们很方便的使用stomp.js进行与STOMP协议相关的代理进行交互.

正如我们所知,如果websocket内容传输信息使用STOMP来进行交互,websocket也很好的于消息代理器进行交互(如:RabbitMQ, ActiveMQ)

这样就很好的提供了消息代理的集成方案。

总结,使用STOMP的优点如下:

  1. 不需要自建一套自定义的消息格式
  2. 现有stomp.js客户端(浏览器中使用)可以直接使用
  3. 能路由信息到指定消息地点
  4. 可以直接使用成熟的STOMP代理进行广播 如:RabbitMQ, ActiveMQ

技术落地

后端技术方案选型

websocket服务端选型:spring websocket

支持SockJS,开启SockJS后,可应对不同浏览器的通讯支持
支持STOMP传输协议,可无缝对接STOMP协议下的消息代理器(如:RabbitMQ, ActiveMQ)

前端技术方案选型

前端选型: stomp.js,sockjs.js

后端开启SOMP和SockJS支持后,前对应有对应的js库进行支持.
所以选用此两个库.

总结

上述所用技术,是这样的逻辑:

  1. 开启socktJS:
    如果有浏览器不支持websocket协议,可以在其他两种协议中进行选择,但是对于应用层来讲,使用起来是一样的。
    这是为了支持浏览器不支持websocket协议的一种备选方案
  2. 使用STOMP:
    使用STOMP进行交互,前端可以使用stomp.js类库进行交互,消息一STOMP协议格式进行传输,这样就规定了消息传输格式。
    消息进入后端以后,可以将消息与实现STOMP格式的代理器进行整合。
    这是为了消息统一管理,进行机器扩容时,可进行负载均衡部署
  3. 使用spring websocket:
    使用spring websocket,是因为他提供了STOMP的传输自协议的同时,还提供了StockJS的支持。
    当然,除此之外,spring websocket还提供了权限整合的功能,还有自带天生与spring家族等相关框架进行无缝整合。

应用场景

应用背景

2016年,在公司与同事一起讨论和开发了公司内部的客服系统,由于前端技能的不足,很多通讯方面的问题,无法亲自调试前端来解决问题。

因为公司技术架构体系以前后端分离为主,故前端无法协助后端调试,后端无法协助前端调试

在加上websocket为公司刚启用的协议,了解的人不多,导致前后端调试问题重重。

一年后的今天,我打算将前端重温,自己来调试一下前后端,来发掘一下之前联调的问题.

当然,前端,我只是考虑stomp.js和sockt.js的使用。

代码阶段设计

角色

  1. 客服
  2. 客户

登录用户状态

  1. 上线
  2. 下线

分配策略

  1. 用户登陆后,应该根据用户角色进行分配

关系保存策略

  1. 应该提供关系型保存策略: 考虑内存式策略(可用于测试),redis式策略
    备注:优先应该考虑实现Inmemory策略,用于测试,让关系保存策略与存储平台无关

通讯层设计

  1. 归类topic的广播设计(通讯方式:1-n)
  2. 归类queue的单点设计(通讯方式:1-1)

代码实现

角色

import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.userdetails.User;
import java.util.Collection;
public enum Role {
  CUSTOMER_SERVICE,
  CUSTOMER;


  public static boolean isCustomer(User user) {
      Collection<GrantedAuthority> authorities = user.getAuthorities();
      SimpleGrantedAuthority customerGrantedAuthority = new SimpleGrantedAuthority("ROLE_" + Role.CUSTOMER.name());
      return authorities.contains(customerGrantedAuthority);
  }

  public static boolean isCustomerService(User user) {
      Collection<GrantedAuthority> authorities = user.getAuthorities();
      SimpleGrantedAuthority customerServiceGrantedAuthority = new SimpleGrantedAuthority("ROLE_" + Role.CUSTOMER_SERVICE.name());
      return authorities.contains(customerServiceGrantedAuthority);
  }
}

代码中User对象,为安全对象,即 spring中org.springframework.security.core.userdetails.User,为UserDetails的实现类。
User对象中,保存了用户授权后的很多基础权限信息,和用户信息。
如下:

public interface UserDetails extends Serializable {
  Collection<? extends GrantedAuthority> getAuthorities();

  String getPassword();

  String getUsername();

  boolean isAccountNonExpired();

  boolean isAccountNonLocked();

  boolean isCredentialsNonExpired();

  boolean isEnabled();
}

方法 #isCustomer 和 #isCustomerService 用来判断用户当前是否是顾客或者是客服。

登录用户状态

public interface StatesManager {

    enum StatesManagerEnum{
        ON_LINE,
        OFF_LINE
    }

    void changeState(User user , StatesManagerEnum statesManagerEnum);

    StatesManagerEnum currentState(User user);

}

设计登录状态时,应存在登录状态管理相关的状态管理器,此管理器只负责更改用户状态和获取用户状态相关操作。
并不涉及其他关联逻辑,这样的代码划分,更有助于面向接口编程的扩展性

分配策略

public interface DistributionUsers {
  void distribution(User user);
}

分配角色接口设计,只关注传入的用户,并不关注此用户是客服或者用户,具体需要如何去做,由具体的分配策略来决定。

关系保存策略

public interface RelationHandler {

  void saveRelation(User customerService,User customer);

  List<User> listCustomers(User customerService);

  void deleteRelation(User customerService,User customer);

  void saveCustomerService(User customerService);

  List<User> listCustomerService();

  User getCustomerService(User customer);

  boolean exist(User user);

  User availableNextCustomerService();

}

关系保存策略,亦是只关注关系保存相关,并不在乎于保存到哪个存储介质中。
实现类由Inmemory还是redis还是mysql,它并不专注。
但是,此处需要注意,对于这种关系保存策略,开发测试时,并不涉及高可用,可将Inmemory先做出来用于测试。
开发功能同时,相关同事再来开发其他介质存储的策略,性能测试以及UAT相关测试时,应切换为此介质存储的策略再进行测试。

用户综合管理

对于不同功能的实现策略,由各个功能自己来实现,在使用上,我们仅仅根据接口编程即可。
所以,要将上述所有功能封装成一个工具类进行使用,这就是所谓的 设计模式: 门面模式

@Component
public class UserManagerFacade {
    @Autowired
    private DistributionUsers distributionUsers;
    @Autowired
    private StatesManager statesManager;
    @Autowired
    private RelationHandler relationHandler;


    public void login(User user) {
        if (roleSemanticsMistiness(user)) {
            throw new SessionAuthenticationException("角色语义不清晰");
        }

        distributionUsers.distribution(user);
        statesManager.changeState(user, StatesManager.StatesManagerEnum.ON_LINE);
    }
    private boolean roleSemanticsMistiness(User user) {
        Collection<GrantedAuthority> authorities = user.getAuthorities();

        SimpleGrantedAuthority customerGrantedAuthority = new SimpleGrantedAuthority("ROLE_"+Role.CUSTOMER.name());
        SimpleGrantedAuthority customerServiceGrantedAuthority = new SimpleGrantedAuthority("ROLE_"+Role.CUSTOMER_SERVICE.name());

        if (authorities.contains(customerGrantedAuthority)
                && authorities.contains(customerServiceGrantedAuthority)) {
            return true;
        }

        return false;
    }

    public void logout(User user){
        statesManager.changeState(user, StatesManager.StatesManagerEnum.OFF_LINE);
    }


    public User getCustomerService(User user){
        return relationHandler.getCustomerService(user);
    }

    public List<User> listCustomers(User user){
        return relationHandler.listCustomers(user);
    }

    public StatesManager.StatesManagerEnum getStates(User user){
        return statesManager.currentState(user);
    }

}

UserManagerFacade 中注入三个相关的功能接口:

@Autowired
private DistributionUsers distributionUsers;
@Autowired
private StatesManager statesManager;
@Autowired
private RelationHandler relationHandler;

可提供:

  1. 登录(#login)
  2. 登出(#logout)
  3. 获取对应客服(#getCustomerService)
  4. 获取对应用户列表(#listCustomers)
  5. 当前用户登录状态(#getStates)

这样的设计,可保证对于用户关系的管理都由UserManagerFacade来决定
其他内部的操作类,对于使用者来说,并不关心,对开发来讲,不同功能的策略都是透明的。

通讯层设计 – 登录,授权

spring websocket虽然并没有要求connect时,必须授权,因为连接以后,会分发给客户端websocket的session id,来区分客户端的不同。
但是对于大多数应用来讲,登录授权以后,进行websocket连接是最合理的,我们可以进行权限的分配,和权限相关的管理。

我模拟例子中,使用的是spring security的Inmemory的相关配置:

public class WebSecurityConfig extends WebSecurityConfigurerAdapter {


  @Override
  protected void configure(AuthenticationManagerBuilder auth) throws Exception {
      auth.inMemoryAuthentication().withUser("admin").password("admin").roles(Role.CUSTOMER_SERVICE.name());
      auth.inMemoryAuthentication().withUser("admin1").password("admin").roles(Role.CUSTOMER_SERVICE.name());


      auth.inMemoryAuthentication().withUser("user").password("user").roles(Role.CUSTOMER.name());
      auth.inMemoryAuthentication().withUser("user1").password("user").roles(Role.CUSTOMER.name());
      auth.inMemoryAuthentication().withUser("user2").password("user").roles(Role.CUSTOMER.name());
      auth.inMemoryAuthentication().withUser("user3").password("user").roles(Role.CUSTOMER.name());
  }

  @Override
  protected void configure(HttpSecurity http) throws Exception {
      http.csrf().disable()
              .formLogin()
              .and()
              .authorizeRequests()
              .anyRequest()
              .authenticated();
  }
}

相对较为简单,创建2个客户,4个普通用户。
当认证管理器认证后,会将认证后的合法认证安全对象user(即 认证后的token)放入STOMP的header中.
此例中,认证管理认证之后,认证的token为org.springframework.security.authentication.UsernamePasswordAuthenticationToken,
此token认证后,将放入websocket的header中。(即 后边会谈到的安全对象 java.security.Principal)

通讯层设计 – websocket配置

@Order(Ordered.HIGHEST_PRECEDENCE + 99)
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.enableSimpleBroker("/topic", "/queue");

    }
}

此配置中,有几点需进行讲解:
其中端点”portfolio”,用于socktJs进行websocket连接时使用,只用于建立连接。
“/topic”, “/queue”,则为STOMP的语义约束,topic语义为1-n(广播机制),queue语义为1-1(单点机制)
“app”,此为应用级别的映射终点前缀,这样说有些晦涩,一会看一下示例将会清晰很多。

通讯层设计 – 创建连接

用于连接spring websocket的端点为portfolio,它可用于连接,看一下具体实现:

<script src="http://cdn.bootcss.com/sockjs-client/1.1.1/sockjs.min.js"></script>
<script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script>
<script src="http://cdn.bootcss.com/jquery/3.1.1/jquery.min.js"></script>

var socket = new SockJS("/portfolio");
stompClient = Stomp.over(socket);

stompClient.connect({}, function(frame) {
   showGreeting("登录用户: " + frame.headers["user-name"]);
});

这样便建立了连接。 后续的其他操作就可以通过stompClient句柄进行使用了。

通讯层设计 – spring websocket消息模型

见模型图:

message-flow-simple-broker

 

此图来源spring-websocket官方文档

可以看出对于同一定于目标都为:/topic/broadcast,它的发送渠道为两种:/app/broadcast和/topic/broadcast

如果为/topic/broadcast,直接可将消息体发送给定于目标(/topic/broadcast)。

如果是/app/broadcast,它将消息对应在MessageHandler方法中进行处理,处理后的结果发放到broker channel中,最后再讲消息体发送给目标(/topic/broadcast)

当然,这里边所说的app前缀就是刚才我们在websocket配置中的前缀.

看一个例子:

前端订阅:

stompClient.subscribe('/topic/broadcast', function(greeting){
    showGreeting(greeting.body);
});

后端服务:

@Controller
public class ChatWebSocket extends AbstractWebSocket{
 @MessageMapping("broadcast")
 public String broadcast(@Payload @Validated Message message, Principal principal) {
     return "发送人: " + principal.getName() + " 内容: " + message.toString();
 }
}

@Data
public class Message {
    @NotNull(message = "标题不能为空")
    private String title;
    private String content;
}

前端发送:

function sendBroadcast() {
    stompClient.send("/app/broadcast",{},JSON.stringify({'content':'message content'}));
}

这种发送将消息发送给后端带有@MessageMapping注解的方法,然后组合完数据以后,在推送给订阅/topic/broadcast的前端

function sendBroadcast() {
    stompClient.send("/topic/broadcast",{},JSON.stringify({'content':'message content'}));
}

这种发送直接将消息发送给订阅/topic/broadcast的前端,并不通过注解方法进行流转。

我相信上述这个理解已经解释清楚了spring websocket的消息模型图

通讯层设计 – @MessageMapping

带有这个注解的@Controller下的方法,正是对应websocket中的中转数据的处理方法。
那么这个注解下的方法究竟可以获取哪些数据,其中有什么原理呢?

我大概说一下:
Message,@Payload,@Header,@Headers,MessageHeaders,MessageHeaderAccessor, SimpMessageHeaderAccessor,StompHeaderAccessor
以上这些都是获取消息头,消息体,或整个消息的基本对象模型。

@DestinationVariable
这个注解用于动态监听路径,很想rest中的@PathVariable:

e.g.:

@MessageMapping("/queue/chat/{uid}")
public void chat(@Payload @Validated Message message, @DestinationVariable("uid") String uid, Principal principal) {
    String msg = "发送人: " + principal.getName() + " chat ";
    simpMessagingTemplate.convertAndSendToUser(uid,"/queue/chat",msg);
}

java.security.Principal

这个对象我需要重点说一下。
他则是spring security认证之后,产生的Token对象,即本例中的UsernamePasswordAuthenticationToken.

不难发现UsernamePasswordAuthenticationToken是Principal的一个实现.

可以将Principal直接转成授权后的token,进行操作:

UsernamePasswordAuthenticationToken user = (UsernamePasswordAuthenticationToken) principal;

正如前边设计章节所说,整个用户设计都是对org.springframework.security.core.userdetails.User进行操作,那如何拿到User对象呢。
很简单,如下:

UsernamePasswordAuthenticationToken user = (UsernamePasswordAuthenticationToken) principal;
User user = (User) user.getPrincipal()

通讯层设计 – 1-1 && 1-n

1-n topic:
此方式,上述消息模型章节已经讲过,此处不再赘述

1-1 queue:
客服-用户沟通为1-1用户交互的案例

前端:

stompClient.subscribe('/user/queue/chat',function(greeting){
    showGreeting(greeting.body);
});

后端:

@MessageMapping("/queue/chat/{uid}")
public void chat(@Payload @Validated Message message, @DestinationVariable("uid") String uid, Principal principal) {
    String msg = "发送人: " + principal.getName() + " chat ";
    simpMessagingTemplate.convertAndSendToUser(uid,"/queue/chat",msg);
}

发送端:

function chat(uid) {
    stompClient.send("/app/queue/chat/"+uid,{},JSON.stringify({'title':'hello','content':'message content'}));
}

上述的转化,看上去没有topic那样1-n的广播要流畅,因为代码中采用约定的方式进行开发,当然这是由spring约定的。

约定转化的处理器为UserDestinationMessageHandler。

大概的语义逻辑如下:

“An application can send messages targeting a specific user, and Spring’s STOMP support recognizes destinations prefixed with “/user/“ for this purpose. For example, a client might subscribe to the destination “/user/queue/position-updates”. This destination will be handled by the UserDestinationMessageHandler and transformed into a destination unique to the user session, e.g. “/queue/position-updates-user123”. This provides the convenience of subscribing to a generically named destination while at the same time ensuring no collisions with other users subscribing to the same destination so that each user can receive unique stock position updates.”

大致的意思是说:如果是客户端订阅了/user/queue/position-updates,将由UserDestinationMessageHandler转化为一个基于用户会话的订阅地址,比如/queue/position-updates-user123,然后可以进行通讯。

例子中,我们可以把uid当成用户的会话,因为用户1-1通讯是通过spring security授权的,所以我们可以把会话当做授权后的token.
如登录用户token为: UsernamePasswordAuthenticationToken newToken = new UsernamePasswordAuthenticationToken(“admin”,”user”);
且这个token是合法的,那么/user/queue/chat订阅则为/queue/chat-admin

发送时,如果通过/user/admin/queue/chat,则不通过@MessageMapping直接进行推送。
如果通过/app/queue/chat/admin,则将消息由@MessageMapping注解处理,最终发送给/user/admin/queue/chat终点

追踪代码simpMessagingTemplate.convertAndSendToUser:

@Override
public void convertAndSendToUser(String user, String destination, Object payload, Map<String, Object> headers,
    MessagePostProcessor postProcessor) throws MessagingException {

  Assert.notNull(user, "User must not be null");
  user = StringUtils.replace(user, "/", "%2F");
  super.convertAndSend(this.destinationPrefix + user + destination, payload, headers, postProcessor);
}

说明最后的路径依然是/user/admin/queue/chat终点.

通讯层设计 – @SubscribeMapping

@SubscribeMapping注解可以完成订阅即返回的功能。
这个很像HTTP的request-response,但不同的是HTTP的请求和响应是同步的,每次请求必须得到响应。
而@SubscribeMapping则是异步的。意思是说:当订阅时,直到回应可响应时在进行处理。

通讯层设计 – 异常处理

@MessageMapping是支持jsr 303校验的,它支持@Validated注解,可抛出错误异常,如下:

@MessageMapping("broadcast")
public String broadcast(@Payload @Validated Message message, Principal principal) {
    return "发送人: " + principal.getName() + " 内容: " + message.toString();
}

那异常如何处理呢

@MessageExceptionHandler,它可以进行消息层的异常处理

@MessageExceptionHandler
@SendToUser(value = "/queue/error",broadcast = false)
public String handleException(MethodArgumentNotValidException methodArgumentNotValidException) {
    BindingResult bindingResult = methodArgumentNotValidException.getBindingResult();
    if (!bindingResult.hasErrors()) {
        return "未知错误";
    }
    List<FieldError> allErrors = bindingResult.getFieldErrors();
    return "jsr 303 错误: " + allErrors.iterator().next().getDefaultMessage();
}

其中@SendToUser,是指只将消息发送给当前用户,当然,当前用户需要订阅/user/queue/error地址。
注解中broadcast,则表明消息不进行多会话的传播(有可能一个用户登录3个浏览器,有三个会话),如果此broadcast=false,则只传给当前会话,不进行其他会话传播

总结

本文从websocket的原理和协议,以及内容相关协议等不同维度进行了详细介绍。

最终以一个应用场景为例,从项目的结构设计,以及代码策略设计,设计模式等不同方面展示了websocket的通讯功能在项目中的使用。

如何实现某一功能其实并不重要,重要的是得了解理论,深入理论之后,再进行开发。

http://lrwinx.github.io/2017/07/09/%E5%86%8D%E8%B0%88websocket-%E8%AE%BA%E6%9E%B6%E6%9E%84%E8%AE%BE%E8%AE%A1/