前書き
・Springは最近始めたばっかりで、変な記述があるかも(一応動作検証はしてる)
・ググった結果の集合体なので、知らんうちにいろんなところから引用してると思う(主に公式サイト、QiitaとStack Overflow)
・基本、自分用のメモ目的
・コードを記載しているのは、ただの露出行為
・ここはフレームワークのクラスでできるよーとか突っ込み歓迎
目的
Spring経由でRabbitMQを使ったチャットをしたかった。
今回は基本機能の実装のみで、セキュリティ的なことは考慮してない。
※2017/5/14 追記
ユーザ名をサーバ側でPrincipalから設定するように変更
ライブラリ及びバージョン
Spring Boot 1.5.3
Spring AMQP
Spring MVC
Spring WebSocket
RabbitMQ 3.6.9
stomp-websocket 2.3.3
sockjs-client 1.1.2
用意するもの
- AmqpConfigクラス
- AbstractWebSocketMessageBrokerConfigurerを継承し、WebSocketConfigurerをimplementしたクラス
- RestController
- HTMLは自分で
概念図
ログイン時にRabbiAdminを使ってQueueをRabbitMQ上に作成してExchangeにバインドする。
Spring WebSocketはStompとRabbitMQ間の送受信を中継する。
実装
AMQPの設定クラス
※2017/5/17 追記
@Configuration
public class AmqpConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConfig.class);
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
// メッセージ送信時に必要、書くの忘れてた
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
// RabbitMQにQueueやExchangeを作成するコマンド送る際に必要
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
// こいつがルーティングしてくれる。
@Bean
public DirectExchange direct() {
return new DirectExchange("direct");
}
}
WebSocketの設定クラス
※2017/5/14 追記
@Configuration
@EnableWebSocket
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer implements WebSocketConfigurer {
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
return container;
}
// WebSocketのエンドポイント
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat").withSockJS();
}
// StompとRabbitMQ間の通信を中継させる
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/exchange/", "/queue/").setRelayHost("localhost");
// Restコントローラでリクエストを受け取る
registry.setApplicationDestinationPrefixes("/app");
registry.setPathMatcher(new AntPathMatcher("."));
}
// これ意味なかった
/*
// HTTPセッションチェックをWebSocketにも適用させる
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new TextWebSocketHandler(), "/chat")
.addInterceptors(new HttpSessionHandshakeInterceptor())
.withSockJS();
}
*/
@Override
public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
messageConverters.add(new StringMessageConverter());
return false;
}
}
RestControllerクラス
※2017/5/14 追記
@RestController
public class ChatRestController {
private static final Logger LOGGER = LoggerFactory.getLogger(ChatRestController.class);
@Autowired
private AmqpAdmin amqpAdmin;
// これも忘れてた
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private DirectExchange direct;
// ログイン時にキューを作る
@RequestMapping(value="/api/login", method=RequestMethod.POST, produces="application/json; charset=UTF-8")
public String login(@RequestBody String json, Locale locale) {
...
addQueue(new Queue(username));
...
}
public void addQueue(Queue queue) {
amqpAdmin.declareQueue(queue);
}
// 入室する(自分のQueueをExchangeにバインドする)
@RequestMapping(value="/api/enterroom", method=RequestMethod.POST, produces="application/json; charset=UTF-8")
public String enterRoom(@RequestBody String json, Principal principal, Locale locale) {
String responseJson = "";
ObjectMapper mapper = new ObjectMapper();
try {
RoomEntry room = mapper.readValue(json, RoomEntry.class);
amqpAdmin.declareBinding(BindingBuilder.bind(new Queue(principal.getName())).to(direct).with(room.getRoom()));
}
catch(JsonProcessingException ex) {
LOGGER.error(ex.getMessage(), ex.getCause(), ex);
}
catch(IOException ex) {
LOGGER.error(ex.getMessage(), ex.getCause(), ex);
}
return responseJson;
}
@MessageMapping("/message")
public void sendMessage(String jsonText, Principal principal) {
//LOGGER.debug(jsonText);
ObjectMapper mapper = new ObjectMapper();
String result = "";
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
try {
ChatMessage json = mapper.readValue(jsonText, ChatMessage.class);
json.setFrom(principal.getName());
json.setTime(sdf.format(new Date()));
result = mapper.writeValueAsString(json);
rabbitTemplate.convertAndSend(direct.getName(), json.getTo(), result);
}
catch(JsonParseException ex) {
LOGGER.error(ex.getMessage(), ex.getCause(), ex);
}
catch(JsonProcessingException ex) {
LOGGER.error(ex.getMessage(), ex.getCause(), ex);
}
catch(IOException ex) {
LOGGER.error(ex.getMessage(), ex.getCause(), ex);
}
}
}
クライアントJS
※2017/5/14 追記
var mq_username = "guest",
mq_password = "guest",
mq_vhost = "/",
mq_url = 'chat',
mq_exchange = '/app/message'
mq_queue = '/queue/';
var client;
// 接続成功コールバック
function on_connect() {
// QueueのListenを開始
client.subscribe(mq_queue + $("#user").val(), on_message);
// 接続(入室)ボタンクリックイベント
$("#btn-room").on("click", function(e) {
enterRoom();
});
// 送信ボタンクリックイベント
$("#btn-chat").on("click", function(e) {
sendMessage();
});
}
function on_connect_error() {
console.log('Connection failed');
}
// メッセージ受信
function on_message(msg) {
console.log("messge reveived");
console.log(msg.body);
var obj = JSON.parse(msg.body);
$(".chat").append('<li class="left clearfix"><div class="chat-body clearfix"><div class="header"><strong class="primary-font">' + obj.from + '</strong><small class="pull-right text-muted">' + obj.time + '</small><p>' + obj.message + '</p></div></div></li>');
}
// メッセージ送信
function sendMessage() {
console.log("send message");
var msg = new Object();
msg.message = $("#message-input").val();
msg.to = $("#send-room").val();
client.send(mq_exchange, {"content-type":"text/plain"}, JSON.stringify(msg));
}
// 入室
function enterRoom() {
$.ajax({
url: "api/enterroom",
type:'POST',
dataType: 'json',
contentType: "application/json; charset=UTF-8",
data: JSON.stringify({room : $("#enter-room").val()}),
timeout:10000,
beforeSend: function(xhr) {
xhr.setRequestHeader($("meta[name='_csrf_header']").attr("content"), $("meta[name='_csrf']").attr("content"));
},
success: function(data) {
},
error: function(XMLHttpRequest, textStatus, errorThrown) {
}
});
}
$(function() {
Stomp.WebSocketClass = SockJS;
client = Stomp.client(mq_url);
// サーバに接続
client.connect(
mq_username,
mq_password,
on_connect,
on_connect_error,
mq_vhost
);
});
サンプルプロジェクト