Help us understand the problem. What is going on with this article?

Spring AMQP & WebSocket & Stomp(SockJS) - RabbitMQを使ったメッセージキューイング

More than 1 year has passed since last update.

前書き

・Springは最近始めたばっかりで、変な記述があるかも(一応動作検証はしてる)
・ググった結果の集合体なので、知らんうちにいろんなところから引用してると思う(主に公式サイト、QiitaとStack Overflow)
・基本、自分用のメモ目的
・コードを記載しているのは、ただの露出行為
・ここはフレームワークのクラスでできるよーとか突っ込み歓迎

目的

Spring経由でRabbitMQを使ったチャットをしたかった。
今回は基本機能の実装のみで、セキュリティ的なことは考慮してない。

※2017/5/14 追記
ユーザ名をサーバ側でPrincipalから設定するように変更

ライブラリ及びバージョン

Spring Boot 1.5.3
Spring AMQP
Spring MVC
Spring WebSocket
RabbitMQ 3.6.9
stomp-websocket 2.3.3
sockjs-client 1.1.2

用意するもの

  1. AmqpConfigクラス
  2. AbstractWebSocketMessageBrokerConfigurerを継承し、WebSocketConfigurerをimplementしたクラス
  3. RestController
  4. HTMLは自分で

概念図

amqp.png

ログイン時にRabbiAdminを使ってQueueをRabbitMQ上に作成してExchangeにバインドする。
Spring WebSocketはStompとRabbitMQ間の送受信を中継する。

実装

AMQPの設定クラス

※2017/5/17 追記

AmqpConfig.java
@Configuration
public class AmqpConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConfig.class);

    @Bean
    public ConnectionFactory connectionFactory() {

        CachingConnectionFactory connectionFactory =  new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    // メッセージ送信時に必要、書くの忘れてた
    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    // RabbitMQにQueueやExchangeを作成するコマンド送る際に必要
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    // こいつがルーティングしてくれる。
    @Bean
    public DirectExchange direct() {
        return new DirectExchange("direct");
    }
}

WebSocketの設定クラス

※2017/5/14 追記

WebSocketConfig.java
@Configuration
@EnableWebSocket
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer implements WebSocketConfigurer {
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(8192);
        container.setMaxBinaryMessageBufferSize(8192);
        return container;
    }

    // WebSocketのエンドポイント
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chat").withSockJS();
    }

    // StompとRabbitMQ間の通信を中継させる
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/exchange/", "/queue/").setRelayHost("localhost");
        // Restコントローラでリクエストを受け取る
        registry.setApplicationDestinationPrefixes("/app");
        registry.setPathMatcher(new AntPathMatcher("."));
    }

    // これ意味なかった
    /*
    // HTTPセッションチェックをWebSocketにも適用させる
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new TextWebSocketHandler(), "/chat")
            .addInterceptors(new HttpSessionHandshakeInterceptor())
                .withSockJS();
    }
    */

    @Override
    public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
        messageConverters.add(new StringMessageConverter());
        return false;
    }
}

RestControllerクラス

※2017/5/14 追記

ChatRestController.java
@RestController
public class ChatRestController {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChatRestController.class);

    @Autowired
    private AmqpAdmin amqpAdmin;

    // これも忘れてた
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private DirectExchange direct;

    // ログイン時にキューを作る
    @RequestMapping(value="/api/login", method=RequestMethod.POST, produces="application/json; charset=UTF-8")
    public String login(@RequestBody String json, Locale locale) {
        ...
        addQueue(new Queue(username));
        ...
    }

    public void addQueue(Queue queue) {
        amqpAdmin.declareQueue(queue);
    }

    // 入室する(自分のQueueをExchangeにバインドする)
    @RequestMapping(value="/api/enterroom", method=RequestMethod.POST, produces="application/json; charset=UTF-8")
    public String enterRoom(@RequestBody String json, Principal principal, Locale locale) {
        String responseJson = "";
        ObjectMapper mapper = new ObjectMapper();
        try {
            RoomEntry room = mapper.readValue(json, RoomEntry.class);
            amqpAdmin.declareBinding(BindingBuilder.bind(new Queue(principal.getName())).to(direct).with(room.getRoom()));
        }
        catch(JsonProcessingException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        catch(IOException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        return responseJson;
    }

    @MessageMapping("/message")
    public void sendMessage(String jsonText, Principal principal) {
        //LOGGER.debug(jsonText);
        ObjectMapper mapper = new ObjectMapper();
        String result = "";
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
        try {
            ChatMessage json = mapper.readValue(jsonText, ChatMessage.class);
            json.setFrom(principal.getName());
            json.setTime(sdf.format(new Date()));
            result = mapper.writeValueAsString(json);
            rabbitTemplate.convertAndSend(direct.getName(), json.getTo(), result);
        }
        catch(JsonParseException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        catch(JsonProcessingException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        catch(IOException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
    }
}

クライアントJS

※2017/5/14 追記

chat.js
var mq_username = "guest",
    mq_password = "guest",
    mq_vhost    = "/",
    mq_url      = 'chat',
    mq_exchange = '/app/message'
    mq_queue    = '/queue/';
var client;
// 接続成功コールバック
function on_connect() {
    // QueueのListenを開始
    client.subscribe(mq_queue + $("#user").val(), on_message);
    // 接続(入室)ボタンクリックイベント
    $("#btn-room").on("click", function(e) {
        enterRoom();
    });
    // 送信ボタンクリックイベント
    $("#btn-chat").on("click", function(e) {
        sendMessage(); 
    });
}

function on_connect_error() {
    console.log('Connection failed');
}

// メッセージ受信
function on_message(msg) {
    console.log("messge reveived");
    console.log(msg.body);
    var obj = JSON.parse(msg.body);
    $(".chat").append('<li class="left clearfix"><div class="chat-body clearfix"><div class="header"><strong class="primary-font">' + obj.from + '</strong><small class="pull-right text-muted">' + obj.time + '</small><p>' + obj.message + '</p></div></div></li>');
}

// メッセージ送信
function sendMessage() {
    console.log("send message");
    var msg = new Object();
    msg.message = $("#message-input").val();
    msg.to = $("#send-room").val();
    client.send(mq_exchange, {"content-type":"text/plain"}, JSON.stringify(msg));
}

// 入室
function enterRoom() {
    $.ajax({
        url: "api/enterroom",
        type:'POST',
        dataType: 'json',
        contentType: "application/json; charset=UTF-8",
        data: JSON.stringify({room : $("#enter-room").val()}),
        timeout:10000,
        beforeSend: function(xhr) {
            xhr.setRequestHeader($("meta[name='_csrf_header']").attr("content"), $("meta[name='_csrf']").attr("content"));
        },
        success: function(data) {
        },
        error: function(XMLHttpRequest, textStatus, errorThrown) {
        }
    });
}

$(function() {
    Stomp.WebSocketClass = SockJS;
    client = Stomp.client(mq_url);
    // サーバに接続
    client.connect(
        mq_username,
        mq_password,
        on_connect,
        on_connect_error,
        mq_vhost
    );
});

サンプルプロジェクト

https://github.com/yossypandamaster/sample-spring-amqp

実行結果

user1(Chrome)

chatchrome.png

user2(Firefox)

chatfirefox.png

pandamaster
自分が書いたコードを露出するのが好きな変態です。
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした