問(wèn)題描述
我知道這個(gè)問(wèn)題與問(wèn)題重復(fù)使用rabbitmq發(fā)送消息不是字符串而是結(jié)構(gòu)一個(gè)>
I Understand that this question duplicates question at using rabbitmq to send a message not string but struct
如果使用第一種方法來(lái)做到這一點(diǎn)
if to do this using the first way
第一種方式
我有以下痕跡:
java.io.EOFException
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2304)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:798)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:298)
at com.mdnaRabbit.worker.data.Data.fromBytes(Data.java:78)
at com.mdnaRabbit.worker.App.main(App.java:41)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
我已經(jīng)檢查并確認(rèn)該消息在發(fā)送者類(lèi)中絕對(duì)可以很好地轉(zhuǎn)換為字節(jié),但消費(fèi)者無(wú)法接收它.
I've checked and shure that message is transformd to bytes absolutely well in sender class, but the consumer can't receive it.
這是我的制作人課程:
package com.mdnaRabbit.newt;
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.apache.commons.lang.SerializationUtils;
import com.mdnaRabbit.worker.data.Data;
public class App {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main( String[] argv) throws IOException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
int i = 0;
do {
Data message = getMessage();
byte [] byteMessage = message.getBytes();
//System.out.println(byteMessage);
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, byteMessage);
System.out.println(" [" + (i+1) + "] message Sent" + Data.fromBytes(byteMessage).getBody());
i++;
} while (i<15);
channel.close();
connection.close();
}
private static Data getMessage(){
Data data = new Data();
data.setHeader("header");
data.setDomainId("abc.com");
data.setReceiver("me");
data.setSender("he");
data.setBody("body");
return data;
}
private static String joinStrings(String[] strings, String delimiter){
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++){
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
這是我的消費(fèi)類(lèi):
package com.mdnaRabbit.worker;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.mdnaRabbit.worker.data.Data;
import org.apache.commons.lang.SerializationUtils;
public class App {
private static final String TASK_QUEUE_NAME = "task_queue";
private static int i = 0;
public static void main( String[] argv )
throws IOException,
InterruptedException{
ExecutorService threader = Executors.newFixedThreadPool(20);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection(threader);
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(20);
final QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
try {
while (true) {
try {QueueingConsumer.Delivery delivery = consumer.nextDelivery();
Data message = Data.fromBytes(delivery.getBody());
//Data message = (Data) SerializationUtils.deserialize(delivery.getBody());
System.out.println(" [" + (i++) +"] Received" + message.getBody());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}catch (Exception e){
}
}
} catch (Exception e){
e.printStackTrace();
}
channel.close();
connection.close();
}
}
這是我的數(shù)據(jù)類(lèi):
package com.mdnaRabbit.worker.data;
import java.io.*;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Data implements Serializable{
public String header;
public String body;
public String domainId;
public String sender;
public String receiver;
public void setHeader(String head){
this.header = head;
}
public String getHeader(){
return header;
}
public void setBody(String body){
this.body = body;
}
public String getBody(){
return body;
}
public void setDomainId(String domainId){
this.domainId = domainId;
}
public String getDomainId(){
return domainId;
}
public void setSender(String sender){
this.sender = sender;
}
public String getSender(){
return sender;
}
public String getReceiver(){
return receiver;
}
public void setReceiver(String receiver){
this.receiver = receiver;
}
public byte[] getBytes() {
byte[]bytes;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try{
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(this);
oos.flush();
oos.reset();
bytes = baos.toByteArray();
oos.close();
baos.close();
} catch(IOException e){
bytes = new byte[] {};
Logger.getLogger("bsdlog").log(Level.ALL, "unable to write to output stream" + e);
}
return bytes;
}
public static Data fromBytes(byte[] body) {
Data obj = null;
try {
ByteArrayInputStream bis = new ByteArrayInputStream(body);
ObjectInputStream ois = new ObjectInputStream(bis);
obj = (Data) ois.readObject();
ois.close();
bis.close();
}
catch (IOException e) {
e.printStackTrace();
}
catch (ClassNotFoundException ex) {
ex.printStackTrace();
}
return obj;
}
}
我似乎總是消費(fèi)者收到消息,因?yàn)楫?dāng)我不嘗試將其轉(zhuǎn)換為對(duì)象時(shí),只是寫(xiě)System.out.println(delivery.getBody)
它顯示字節(jié)
I always seems that consumer receives messages, because when I'm not trying to transform it into the object and just write
System.out.println(delivery.getBody)
it shows bytes
推薦答案
你收到的字節(jié)數(shù)組好像是空的.這是因?yàn)?
It looks like the byte array you receive is empty. This happens because of this:
} catch(IOException e){
bytes = new byte[] {};
}
產(chǎn)生異常時(shí),代碼不會(huì)警告您某些內(nèi)容已損壞,而是發(fā)送一個(gè)空數(shù)組.您至少應(yīng)該記錄錯(cuò)誤.
When an exception is produced, the code doesn't warn you that something is broken and just sends an empty array instead. You should at least log the error.
產(chǎn)生異常可能是因?yàn)槟噲D序列化一個(gè)不可序列化的類(lèi).要使類(lèi)可序列化,您必須在其聲明中添加implements Serializable":
The exception is being produced probably because you are trying to serialize a class that is not serializable. To make a class serializable you have to add "implements Serializable" to its declaration:
public class Data implements Serializable {
這篇關(guān)于使用 RabbitMQ 發(fā)送對(duì)象的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!