public class MQProvider
extends java.lang.Object
本类的代码经过高度提炼和内聚,不限于MobileIMSDK使用,可用于任意合适场景。
【本类中实现的功能有】:
限定符和类型 | 字段和说明 |
---|---|
protected com.rabbitmq.client.Connection |
_connection
一旦首次连接成功后,本连接对象将不会被重置,它会依靠自身的自动恢复功能,在断线后自动恢底层的连接,应用层不需要额外代码
|
protected com.rabbitmq.client.ConnectionFactory |
_factory
连接工厂类,提取出来的目的是为了重用
|
protected com.rabbitmq.client.Channel |
_pubChannel
生产者的Chnannel对象,连接断开再次恢复时,本变量被重新赋值
|
protected com.rabbitmq.client.Channel |
_workerChannel
消息者的Chnannel对象,连接断开再次恢复时,本变量被重新赋值
|
protected java.util.Observer |
consumerObserver
本类中消费者收到的消息通过此观察者进行回调通知
|
protected java.lang.String |
consumFromQueue
消费者消息中转队列名:本类是此队列的消费者,将从其中读取消息
|
protected java.lang.String |
decodeCharset
收到消息的字符解码格式
|
static java.lang.String |
DEFAULT_DECODE_CHARSET
收到消息的字符解码格式。
|
static java.lang.String |
DEFAULT_ENCODE_CHARSET
发出消息的字符编码格式。
|
protected java.lang.String |
encodeCharset
发出消息的字符编码格式
|
protected java.lang.String |
mqURI
消息队列服务器连接URI,形如:“amqp://admin:123456789@192.168.1.190”
|
protected java.lang.String |
publishToQueue
生产者消息中转队列名:本类是此队列的生产者,会将消息发送至此
|
protected java.util.concurrent.ConcurrentLinkedQueue<java.lang.String[]> |
publishTrayAgainCache
本地生产者用的暂存消息队列:因为当发送消息时,可能连接等原因导致此次消息没有成功发出,
那么暂存至此列表中,以备下次连接恢复时,再次由本类自动完成发送,从而确保消息不丢并确保送达。
|
protected boolean |
publishTrayAgainEnable
是否支持再次发送:true表示本地生产者发送失败时(比如连接MQ服务器不成功等情况)暂时缓存到本地内
存
publishTrayAgainCache 中,等到与MQ中间件的连接成功时,自动再次尝试发送此缓存队列,从而保
证生产者本来要发生送的消息在异常发生的情况下能再次发送直到成功。 |
protected boolean |
retryWorkerRunning
此标识仅用于防止worker失败重试时因TimeTask的异步执行而发生重复执行的可能,仅此而已
|
protected boolean |
startRunning
此标识仅用于防止首次连接失败重试时因TimeTask的异步执行而发生重复执行的可能,仅此而已
|
protected java.lang.String |
TAG
TAG for log
|
protected java.util.Timer |
timerForRetryWorker
本定时的作用是当worker启动或运行过程中出错时,可以自动进行恢复,而不至于丧失功能
|
protected java.util.Timer |
timerForStartAgain
【说明】:RabittMQ的Java客户端的automaticRecovery只在连接成功后才会启动,像
这种首次连接时服务器根本就没开或者本地网络故障等,首次无法成功建立Connection的
,connction返回直接是null,当然就不存在automaticRecovery能力了,所以需要自已
来尝试重新start,一定要注意思路哦,别理解乱了。
|
构造器和说明 |
---|
MQProvider(java.lang.String mqURI,
java.lang.String publishToQueue,
java.lang.String consumFromQueue,
java.lang.String TAG,
boolean publishTrayAgainEnable)
新建一个MQProvider对象(使用默认字符编码)。
|
MQProvider(java.lang.String mqURI,
java.lang.String publishToQueue,
java.lang.String consumFromQueue,
java.lang.String encodeCharset,
java.lang.String decodeCharset,
java.lang.String TAG,
boolean publishTrayAgainEnable)
新建一个MQProvider对象。
|
限定符和类型 | 方法和说明 |
---|---|
protected boolean |
init() |
boolean |
publish(java.lang.String message)
Method to publish a message, will queue messages internally
if the connection is down and resend later.
|
protected boolean |
publish(java.lang.String exchangeName,
java.lang.String routingKey,
java.lang.String message)
Method to publish a message, will queue messages internally
if the connection is down and resend later.
|
void |
start()
调用者必须显示调用本方法才能启动本provider的整个执行策略。
|
protected void |
startPublisher(com.rabbitmq.client.Connection conn) |
protected void |
startWorker(com.rabbitmq.client.Connection conn) |
protected com.rabbitmq.client.Connection |
tryGetConnection()
返回Connection实例,如果connection不存在则新建一个。
|
protected void |
whenConnected(com.rabbitmq.client.Connection conn) |
protected boolean |
work(byte[] contentBody)
处理接收到的消息。
|
public static final java.lang.String DEFAULT_ENCODE_CHARSET
public static final java.lang.String DEFAULT_DECODE_CHARSET
protected com.rabbitmq.client.ConnectionFactory _factory
protected com.rabbitmq.client.Connection _connection
protected com.rabbitmq.client.Channel _pubChannel
protected com.rabbitmq.client.Channel _workerChannel
protected final java.util.Timer timerForStartAgain
protected boolean startRunning
protected final java.util.Timer timerForRetryWorker
protected boolean retryWorkerRunning
protected java.util.concurrent.ConcurrentLinkedQueue<java.lang.String[]> publishTrayAgainCache
protected boolean publishTrayAgainEnable
publishTrayAgainCache
中,等到与MQ中间件的连接成功时,自动再次尝试发送此缓存队列,从而保
证生产者本来要发生送的消息在异常发生的情况下能再次发送直到成功。默认false。
【注意】:不是所有情况下都需要在发送失败的情况下保存在缓存中以备再次尝试,比如MobileIMSDKX的跨服桥接消息的发送: ##### 为了让publish调用时能准确地反应出是否发送成功(而不需要在没发送成功时自动再次发送) ##### ,暂时关闭了此离线再发功能,也许以后的其它逻辑中用的上,但还是暂时取消了相关代码!! ##### 【JS认为】桥接转发如不成功就应该立即由调用者知道,不然有此离线功能的话就会导致为何消息 ##### 报已发出,但就是没有收到的问题,这对调试和用户体验都不利!
protected java.util.Observer consumerObserver
protected java.lang.String encodeCharset
protected java.lang.String decodeCharset
protected java.lang.String mqURI
protected java.lang.String publishToQueue
protected java.lang.String consumFromQueue
protected java.lang.String TAG
public MQProvider(java.lang.String mqURI, java.lang.String publishToQueue, java.lang.String consumFromQueue, java.lang.String TAG, boolean publishTrayAgainEnable)
mqURI
- 消息队列服务器连接URI,形如:“amqp://admin:123456789@192.168.1.190”publishToQueue
- 生产者消息中转队列名:本类是此队列的生产者,会将消息发送至此consumFromQueue
- 消费者消息中转队列名:本类是此队列的消费者,将从其中读取消息publishTrayAgainEnable
- 是否支持再次发送:true表示本地生产者发送失败时(比如连接MQ服务器不成功等情况)暂时缓存到本地内
存 publishTrayAgainCache
中,等到与MQ中间件的连接成功时,自动再次尝试发送此缓存队列。默认false。MQProvider(String, String, String, String, String, String, boolean)
public MQProvider(java.lang.String mqURI, java.lang.String publishToQueue, java.lang.String consumFromQueue, java.lang.String encodeCharset, java.lang.String decodeCharset, java.lang.String TAG, boolean publishTrayAgainEnable)
mqURI
- 消息队列服务器连接URI,形如:“amqp://admin:123456789@192.168.1.190”publishToQueue
- 生产者消息中转队列名:本类是此队列的生产者,会将消息发送至此consumFromQueue
- 消费者消息中转队列名:本类是此队列的消费者,将从其中读取消息encodeCharset
- 发出消息的字符编码格式decodeCharset
- 收到消息的字符解码格式TAG
- 用于log显示时的前缀,仅此而已publishTrayAgainEnable
- 是否支持再次发送:true表示本地生产者发送失败时(比如连接MQ服务器不成功等情况)暂时缓存到本地内
存 publishTrayAgainCache
中,等到与MQ中间件的连接成功时,自动再次尝试发送此缓存队列。默认false。protected boolean init()
protected com.rabbitmq.client.Connection tryGetConnection()
public void start()
protected void whenConnected(com.rabbitmq.client.Connection conn)
protected void startPublisher(com.rabbitmq.client.Connection conn)
public boolean publish(java.lang.String message)
向默认的exchange和队列进行发送。
message
- #IMMQ_QUEUE_APP2WEB
protected boolean publish(java.lang.String exchangeName, java.lang.String routingKey, java.lang.String message)
exchangeName
- routingKey
- message
- protected void startWorker(com.rabbitmq.client.Connection conn)
protected boolean work(byte[] contentBody)
特别注意:本方法一旦返回false则消息将被MQ服务器重新放入队列, 请一定注意false是你需要的,不然消息会重复哦。
contentBody
- 从MQ服务器取到的消息内容byte数组