网络编程与RabbitMQ消息通信
public class Producer {
private static final String EXCHANGE_name="rabbit_test_exchange";
private static final String EXCHANGE_ROOTING_KEY="rabbit_test_routingkey";
public static void main(String[] args)throws Exception{
//先和RabbitMQ Server建立連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("duanzx");
factory.setPassword("duanzx");
factory.setVirtualHost("/duanzx_host");
Connection connection = factory.newConnection();
//创建出连接通道
Channel channel = connection.createChannel();
//声明交换机名称和类型(直接根据路由键)
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//向RabbitMQ发送消息
channel.basicPublish(EXCHANGE_NAME,EXCHANGE_ROOTING_KEY, MessageProperties.TEXT_PLAIN,"just for test".getBytes());
//关闭和RabbitMQ Server之间的通道
channel.close();
//关闭连接
connection.close();
}
}
public class ConsumerTest {
private static final String EXCHANGE_name="rabbit_test_exchange";
private static final String EXCHANGE_ROOTING_KEY="rabbit_test_routingkey";
private static final String QUEUE_name="rabbit_test_queue";
public static void main(String[] args)throws Exception{
//先和RabbitMQ Server建立連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("duanzx");
factory.setPassword("duanzx");
factory.setVirtualHost("/duanzx_host");
Connection connection = factory.newConnection();
//创建出连接通道
Channel channel = connection.createChannel();
//声明交换机名称和类型(直接根据路由键)
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//声明队列名称
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROOTING_KEY);
//读取队列里的数据并进行处理
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
try {
System.out.println(new String(body,"UTF-8"));
} catch (Exception e) {
e.printStackTrace();
}finally {
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = null;
try {
socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8080));
String message = "just for test";
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(message.getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
socketChannel.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (socketChannel != null) {
socketChannel.close();
}
}
}
private Selector selector;
private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
public SocketClass(int port) {
try {
//声明Selector
selector = Selector.open();
//声明ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//声明ServerSocketChannel通信方式为非阻塞
serverSocketChannel.configureBlocking(false);
//ServerSocketChannel端口号码
serverSocketChannel.bind(new InetSocketAddress(port));
//注册到Selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("start server port:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//一直执行该线程
while (true) {
try {
//阻塞等待,直到声明READ事件的那些通道已经就绪。
selector.select();
//遍历所有已经注册的通道
Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
while (selectionKeyIterator.hasNext()) {
SelectionKey key = selectionKeyIterator.next();
selectionKeyIterator.remove();
if (key.isValid()) {
//如果是ACCEPT事件,代表ServerSocketChannel,此时应该接收请求
if (key.isAcceptable()) {
this.accept(key);
}
//如果是READ事件,代表SocketChannel,此时应该处理请求
if (key.isReadable()) {
this.read(key);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void accept(SelectionKey key) {
//获取服务端通道
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
try {
//接收客户端请求,并创建通道
SocketChannel socketChannel = serverSocketChannel.accept();
//声明SocketChannel通信方式为非阻塞
socketChannel.configureBlocking(false);
//注册到Selector
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
private void read(SelectionKey key) {
try {
//获取客户端连接通道
SocketChannel socketChannel = (SocketChannel) key.channel();
//先清空缓冲区里的数据
byteBuffer.clear();
//读取通道里的数据并写入到缓冲区
int count = socketChannel.read(byteBuffer);
//如果通道里没有数据,关闭通道并取消SelectionKey,
if (count == -1) {
socketChannel.close();
key.cancel();
return;
}
//将缓冲区由写模式切换到读模式
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
//读取缓冲区里的数据并放入byte数组里
byteBuffer.get(bytes);
System.out.println("has receive request:"+new String(bytes));
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
new SocketClass(8080).run();
}