3
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Spring AMQP & WebSocket & Stomp(SockJS) - RabbitMQを使ったメッセージキューイング

Last updated at Posted at 2017-05-13

前書き

・Springは最近始めたばっかりで、変な記述があるかも(一応動作検証はしてる)
・ググった結果の集合体なので、知らんうちにいろんなところから引用してると思う(主に公式サイト、QiitaとStack Overflow)
・基本、自分用のメモ目的
・コードを記載しているのは、ただの露出行為
・ここはフレームワークのクラスでできるよーとか突っ込み歓迎

目的

Spring経由でRabbitMQを使ったチャットをしたかった。
今回は基本機能の実装のみで、セキュリティ的なことは考慮してない。

※2017/5/14 追記
ユーザ名をサーバ側でPrincipalから設定するように変更

ライブラリ及びバージョン

Spring Boot 1.5.3
Spring AMQP
Spring MVC
Spring WebSocket
RabbitMQ 3.6.9
stomp-websocket 2.3.3
sockjs-client 1.1.2

用意するもの

  1. AmqpConfigクラス
  2. AbstractWebSocketMessageBrokerConfigurerを継承し、WebSocketConfigurerをimplementしたクラス
  3. RestController
  4. HTMLは自分で

概念図

amqp.png

ログイン時にRabbiAdminを使ってQueueをRabbitMQ上に作成してExchangeにバインドする。
Spring WebSocketはStompとRabbitMQ間の送受信を中継する。

実装

AMQPの設定クラス

※2017/5/17 追記

AmqpConfig.java
@Configuration
public class AmqpConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConfig.class);

    @Bean
    public ConnectionFactory connectionFactory() {
        
        CachingConnectionFactory connectionFactory =  new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    // メッセージ送信時に必要、書くの忘れてた
    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    // RabbitMQにQueueやExchangeを作成するコマンド送る際に必要
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    // こいつがルーティングしてくれる。
    @Bean
    public DirectExchange direct() {
        return new DirectExchange("direct");
    }
}

WebSocketの設定クラス

※2017/5/14 追記

WebSocketConfig.java
@Configuration
@EnableWebSocket
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer implements WebSocketConfigurer {
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(8192);
        container.setMaxBinaryMessageBufferSize(8192);
        return container;
    }

    // WebSocketのエンドポイント
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chat").withSockJS();
    }

    // StompとRabbitMQ間の通信を中継させる
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/exchange/", "/queue/").setRelayHost("localhost");
        // Restコントローラでリクエストを受け取る
        registry.setApplicationDestinationPrefixes("/app");
        registry.setPathMatcher(new AntPathMatcher("."));
    }

    // これ意味なかった
    /*
    // HTTPセッションチェックをWebSocketにも適用させる
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new TextWebSocketHandler(), "/chat")
            .addInterceptors(new HttpSessionHandshakeInterceptor())
                .withSockJS();
    }
    */
    
    @Override
    public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
        messageConverters.add(new StringMessageConverter());
        return false;
    }
}

RestControllerクラス

※2017/5/14 追記

ChatRestController.java
@RestController
public class ChatRestController {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChatRestController.class);
    
    @Autowired
    private AmqpAdmin amqpAdmin;
    
    // これも忘れてた
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private DirectExchange direct;
    
    // ログイン時にキューを作る
    @RequestMapping(value="/api/login", method=RequestMethod.POST, produces="application/json; charset=UTF-8")
    public String login(@RequestBody String json, Locale locale) {
        ...
        addQueue(new Queue(username));
        ...
    }
    
    public void addQueue(Queue queue) {
        amqpAdmin.declareQueue(queue);
    }

    // 入室する(自分のQueueをExchangeにバインドする)
    @RequestMapping(value="/api/enterroom", method=RequestMethod.POST, produces="application/json; charset=UTF-8")
    public String enterRoom(@RequestBody String json, Principal principal, Locale locale) {
        String responseJson = "";
        ObjectMapper mapper = new ObjectMapper();
        try {
            RoomEntry room = mapper.readValue(json, RoomEntry.class);
            amqpAdmin.declareBinding(BindingBuilder.bind(new Queue(principal.getName())).to(direct).with(room.getRoom()));
        }
        catch(JsonProcessingException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        catch(IOException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        return responseJson;
    }
    
    @MessageMapping("/message")
    public void sendMessage(String jsonText, Principal principal) {
        //LOGGER.debug(jsonText);
        ObjectMapper mapper = new ObjectMapper();
        String result = "";
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
        try {
            ChatMessage json = mapper.readValue(jsonText, ChatMessage.class);
            json.setFrom(principal.getName());
            json.setTime(sdf.format(new Date()));
            result = mapper.writeValueAsString(json);
            rabbitTemplate.convertAndSend(direct.getName(), json.getTo(), result);
        }
        catch(JsonParseException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        catch(JsonProcessingException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        catch(IOException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
    }
}

クライアントJS

※2017/5/14 追記

chat.js
var mq_username = "guest",
    mq_password = "guest",
    mq_vhost    = "/",
    mq_url      = 'chat',
    mq_exchange = '/app/message'
    mq_queue    = '/queue/';
var client;
// 接続成功コールバック
function on_connect() {
    // QueueのListenを開始
    client.subscribe(mq_queue + $("#user").val(), on_message);
    // 接続(入室)ボタンクリックイベント
    $("#btn-room").on("click", function(e) {
        enterRoom();
    });
    // 送信ボタンクリックイベント
    $("#btn-chat").on("click", function(e) {
        sendMessage(); 
    });
}

function on_connect_error() {
    console.log('Connection failed');
}

// メッセージ受信
function on_message(msg) {
    console.log("messge reveived");
    console.log(msg.body);
    var obj = JSON.parse(msg.body);
    $(".chat").append('<li class="left clearfix"><div class="chat-body clearfix"><div class="header"><strong class="primary-font">' + obj.from + '</strong><small class="pull-right text-muted">' + obj.time + '</small><p>' + obj.message + '</p></div></div></li>');
}

// メッセージ送信
function sendMessage() {
    console.log("send message");
    var msg = new Object();
    msg.message = $("#message-input").val();
    msg.to = $("#send-room").val();
    client.send(mq_exchange, {"content-type":"text/plain"}, JSON.stringify(msg));
}

// 入室
function enterRoom() {
    $.ajax({
        url: "api/enterroom",
        type:'POST',
        dataType: 'json',
        contentType: "application/json; charset=UTF-8",
        data: JSON.stringify({room : $("#enter-room").val()}),
        timeout:10000,
        beforeSend: function(xhr) {
            xhr.setRequestHeader($("meta[name='_csrf_header']").attr("content"), $("meta[name='_csrf']").attr("content"));
        },
        success: function(data) {
        },
        error: function(XMLHttpRequest, textStatus, errorThrown) {
        }
    });
}

$(function() {
    Stomp.WebSocketClass = SockJS;
    client = Stomp.client(mq_url);
    // サーバに接続
    client.connect(
        mq_username,
        mq_password,
        on_connect,
        on_connect_error,
        mq_vhost
    );
});

サンプルプロジェクト

実行結果

user1(Chrome)

chatchrome.png

user2(Firefox)

chatfirefox.png

3
7
4

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?