Help us understand the problem. What is going on with this article?

Rabbbitmq

More than 3 years have passed since last update.

http://blog.csdn.net/anzhsoft/article/details/19563091#

package rabbitmq.rumei;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

/**
 * Created by Torres on 16/7/21.
 * 消息生产者消费的基类  主要负责获得channel 连接 队列
 */
public abstract class BaseConnection {
    protected Channel           channel;
    protected Connection        connection;
    protected AMQP.Queue        queue;
    protected String            queueName;
    protected ConnectionFactory connectionFactory;

    public BaseConnection(String queueName) throws IOException {
        this.queueName = queueName;
        connectionFactory = new ConnectionFactory();//新建连接工厂
        connection = connectionFactory.newConnection();//获取连接
        channel = connection.createChannel();//获取通道
        channel.queueDeclare(queueName, false, false, false, null);//定义队列

    }

    public void close() throws IOException {
        this.channel.close();
        this.connection.close();
    }
}
package rabbitmq.rumei;

import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by Torres on 16/7/21.
 */
public class MessageConsumer extends BaseConnection implements Runnable, Consumer {

    public MessageConsumer(String queueName) throws IOException {
        super(queueName);
    }

    @Override
    public void run() {
        try {
            this.channel.basicConsume(this.queueName, true, this);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void handleConsumeOk(String consumerTag) {
        System.out.println("Consumer " + consumerTag + " registered");
    }

    @Override
    public void handleCancelOk(String consumerTag) {

    }

    @Override
    public void handleCancel(String consumerTag) throws IOException {

    }

    @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

    }

    @Override
    public void handleRecoverOk(String consumerTag) {

    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
        Map map = (HashMap) SerializationUtils.deserialize(body);
        System.out.println("Message Number " + map.get("message number") + " received");
    }

}
/*
private Consumer consumer = new DefaultConsumer(channel) {
                              @Override
                              public void handleDelivery(String consumerTag, Envelope envelope,
                                                         AMQP.BasicProperties properties, byte[] body)
                                                                                                      throws IOException {
                                  Map map = (HashMap) SerializationUtils.deserialize(body);
                                  System.out.println("Message Number "+map.get("message number")+ " received");
                              }
                          };*/


package rabbitmq.rumei;

import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;
import java.io.Serializable;

/**
 * Created by Torres on 16/7/21.
 */
public class MessageProducer extends BaseConnection {

    public MessageProducer(String queueName) throws IOException {
        super(queueName);//定义队列
    }
     //发布消息  消息对象为一个可序列化的对象
    public void sendMessage(Serializable object) throws IOException {
        this.channel.basicPublish("", queueName, null, SerializationUtils.serialize(object));
    }
}```

```package rabbitmq.rumei;

import java.io.IOException;
import java.util.HashMap;

/**
 * Created by Torres on 16/7/21.
 */
public class Test {
    public static void main(String[] args) throws IOException {

        MessageProducer messageProducer = new MessageProducer("hello");
        //
        MessageConsumer messageConsumer = new MessageConsumer("hello");
        //为了并发的安全考虑  channel 实例不能在多个线程间共享 所以此处
        //  Thread threadConumer1 = new Thread(messageConsumer);
        //  Thread threadConumer2 = new Thread(messageConsumer);  不能共享一个messageConsumer  实例
        Thread threadConumer = new Thread(messageConsumer);
        threadConumer.start();// 消费者开始等待接收消息消费
        //发送信息到  rabbitmq
        for (int i = 0; i <= 10000; i++) {
            HashMap messagemap = new HashMap();
            messagemap.put("message number", i);
            messageProducer.sendMessage(messagemap);
            System.out.println("message number " + i + " is send");


        }

    }

}
xubin
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away