package rabbitmq.rumei;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
/**
* Created by Torres on 16/7/21.
* 消息生产者消费的基类 主要负责获得channel 连接 队列
*/
public abstract class BaseConnection {
protected Channel channel;
protected Connection connection;
protected AMQP.Queue queue;
protected String queueName;
protected ConnectionFactory connectionFactory;
public BaseConnection(String queueName) throws IOException {
this.queueName = queueName;
connectionFactory = new ConnectionFactory();//新建连接工厂
connection = connectionFactory.newConnection();//获取连接
channel = connection.createChannel();//获取通道
channel.queueDeclare(queueName, false, false, false, null);//定义队列
}
public void close() throws IOException {
this.channel.close();
this.connection.close();
}
}
package rabbitmq.rumei;
import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Created by Torres on 16/7/21.
*/
public class MessageConsumer extends BaseConnection implements Runnable, Consumer {
public MessageConsumer(String queueName) throws IOException {
super(queueName);
}
@Override
public void run() {
try {
this.channel.basicConsume(this.queueName, true, this);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void handleConsumeOk(String consumerTag) {
System.out.println("Consumer " + consumerTag + " registered");
}
@Override
public void handleCancelOk(String consumerTag) {
}
@Override
public void handleCancel(String consumerTag) throws IOException {
}
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
}
@Override
public void handleRecoverOk(String consumerTag) {
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
Map map = (HashMap) SerializationUtils.deserialize(body);
System.out.println("Message Number " + map.get("message number") + " received");
}
}
/*
private Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
Map map = (HashMap) SerializationUtils.deserialize(body);
System.out.println("Message Number "+map.get("message number")+ " received");
}
};*/
package rabbitmq.rumei;
import org.apache.commons.lang.SerializationUtils;
import java.io.IOException;
import java.io.Serializable;
/**
* Created by Torres on 16/7/21.
*/
public class MessageProducer extends BaseConnection {
public MessageProducer(String queueName) throws IOException {
super(queueName);//定义队列
}
//发布消息 消息对象为一个可序列化的对象
public void sendMessage(Serializable object) throws IOException {
this.channel.basicPublish("", queueName, null, SerializationUtils.serialize(object));
}
}```
```package rabbitmq.rumei;
import java.io.IOException;
import java.util.HashMap;
/**
* Created by Torres on 16/7/21.
*/
public class Test {
public static void main(String[] args) throws IOException {
MessageProducer messageProducer = new MessageProducer("hello");
//
MessageConsumer messageConsumer = new MessageConsumer("hello");
//为了并发的安全考虑 channel 实例不能在多个线程间共享 所以此处
// Thread threadConumer1 = new Thread(messageConsumer);
// Thread threadConumer2 = new Thread(messageConsumer); 不能共享一个messageConsumer 实例
Thread threadConumer = new Thread(messageConsumer);
threadConumer.start();// 消费者开始等待接收消息消费
//发送信息到 rabbitmq
for (int i = 0; i <= 10000; i++) {
HashMap messagemap = new HashMap();
messagemap.put("message number", i);
messageProducer.sendMessage(messagemap);
System.out.println("message number " + i + " is send");
}
}
}