1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Rabbbitmq

Posted at

package rabbitmq.rumei;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

/**
 * Created by Torres on 16/7/21.
 * 消息生产者消费的基类  主要负责获得channel 连接 队列
 */
public abstract class BaseConnection {
    protected Channel           channel;
    protected Connection        connection;
    protected AMQP.Queue        queue;
    protected String            queueName;
    protected ConnectionFactory connectionFactory;

    public BaseConnection(String queueName) throws IOException {
        this.queueName = queueName;
        connectionFactory = new ConnectionFactory();//新建连接工厂
        connection = connectionFactory.newConnection();//获取连接
        channel = connection.createChannel();//获取通道
        channel.queueDeclare(queueName, false, false, false, null);//定义队列

    }

    public void close() throws IOException {
        this.channel.close();
        this.connection.close();
    }
}
package rabbitmq.rumei;

import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by Torres on 16/7/21.
 */
public class MessageConsumer extends BaseConnection implements Runnable, Consumer {

    public MessageConsumer(String queueName) throws IOException {
        super(queueName);
    }

    @Override
    public void run() {
        try {
            this.channel.basicConsume(this.queueName, true, this);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void handleConsumeOk(String consumerTag) {
        System.out.println("Consumer " + consumerTag + " registered");
    }

    @Override
    public void handleCancelOk(String consumerTag) {

    }

    @Override
    public void handleCancel(String consumerTag) throws IOException {

    }

    @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

    }

    @Override
    public void handleRecoverOk(String consumerTag) {

    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
        Map map = (HashMap) SerializationUtils.deserialize(body);
        System.out.println("Message Number " + map.get("message number") + " received");
    }

}
/*
private Consumer consumer = new DefaultConsumer(channel) {
                              @Override
                              public void handleDelivery(String consumerTag, Envelope envelope,
                                                         AMQP.BasicProperties properties, byte[] body)
                                                                                                      throws IOException {
                                  Map map = (HashMap) SerializationUtils.deserialize(body);
                                  System.out.println("Message Number "+map.get("message number")+ " received");
                              }
                          };*/

                                                    
package rabbitmq.rumei;

import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;
import java.io.Serializable;

/**
 * Created by Torres on 16/7/21.
 */
public class MessageProducer extends BaseConnection {

    public MessageProducer(String queueName) throws IOException {
        super(queueName);//定义队列
    }
     //发布消息  消息对象为一个可序列化的对象
    public void sendMessage(Serializable object) throws IOException {
        this.channel.basicPublish("", queueName, null, SerializationUtils.serialize(object));
    }
}```

```package rabbitmq.rumei;

import java.io.IOException;
import java.util.HashMap;

/**
 * Created by Torres on 16/7/21.
 */
public class Test {
    public static void main(String[] args) throws IOException {

        MessageProducer messageProducer = new MessageProducer("hello");
        //
        MessageConsumer messageConsumer = new MessageConsumer("hello");
        //为了并发的安全考虑  channel 实例不能在多个线程间共享 所以此处
        //  Thread threadConumer1 = new Thread(messageConsumer);
        //  Thread threadConumer2 = new Thread(messageConsumer);  不能共享一个messageConsumer  实例
        Thread threadConumer = new Thread(messageConsumer);
        threadConumer.start();// 消费者开始等待接收消息消费
        //发送信息到  rabbitmq
        for (int i = 0; i <= 10000; i++) {
            HashMap messagemap = new HashMap();
            messagemap.put("message number", i);
            messageProducer.sendMessage(messagemap);
            System.out.println("message number " + i + " is send");


        }

    }

}
1
1
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?