• 注册
  • 查看作者
  • Rocketmq消息过滤器使用消息丢失问题

    Linux rockemq 4。7 .1

    火箭MQ-弹簧-启动-启动器2。1 .1

    生产者代码

    公共类FilterProducerTag {

    公共静态void main(String[]args)引发MQClientException、UnsupportedEncodingException、RemotingException、InterruptedException、MQBrokerException {

    default mqproductor=new default mqproductor('请_ rename _ unique _ group _ name’);

    制片人。SetNamesrVaddr(' 192。168 .42 .131:9876’);

    制片人。start();

    string[]StringGarr={ ' TagA ',' TagB ',' TagC ' };

    for(int I=0;i6;i ) {

    消息消息=新消息();

    消息。settopic(' TopicTestFilter ');

    //消息。settags(string gar[I % string gar。length]);

    消息。settags(' TagA ');

    消息。SetKeys(' KEY ' I);

    消息。setbody(' Hello RockemQ ' I).getBytes(RemotingHelper .DEFAULT _ CHARSET));

    //设置一些属性。

    发送结果发送结果=生产者。发送(消息);

    系统。出去。println('==============================================');

    系统。出去。println(SendResult);

    System.out.println(消息);

    系统。出去。println('==============================================');

    {}

    制片人。关机();

    {}

    {}

    消费者代码

    如下

    公共类FilterConsumerTag {

    公共静态void main(String[]参数)引发MQClientException {

    DefaultMQPushConsumer=new DefaultMQPushConsumer('请_ rename _ unique _ group _ name _ 4’);

    消费者。SetNamesrVaddr(' 192。168 .42 .131:9876’);

    消费者。从哪里消费.CONSUME _ FROM _ FIRST _ OFFSET);

    //只有子页面消息具有属性a,而且a=0和a=3

    //消费者。订阅(' TopicTestFilter ',消息选择器。QL(' 0到2之间的一个);

    消费者。subscribe(' TopicTestFilter ',' TagA | | TagB ');

    消费者。注册message listener(new message listener ComPany(){

    @覆盖

    公共消费当前系统状态消费消息(列表消息正文消息正文列表,消费当前上下文上下文){

    for(MessageExt MessageExt : messageExtList){

    系统。出去。println('===============================================================');

    系统。出去。println(messageExt);

    系统。出去。println(新字符串(messageext。getbody()));

    系统。出去。println('===============================================================');

    {}

    返回消费当前系统状态消费_成功

    {}

    });

    消费者。start();

    系统。出去。println(' consumer ');

    {}

    {}

    问题

    我在消费者方只收到生产者的四条消息,按道理我应该收到6 条,全部

    日志

    生产者日志

    火箭Qlog :警告找不到记录器的附加程序(io。net ty。util。内部。InternalthreadLocalMap).

    火箭日志:警告请正确初始化日志系统。

    ============================

    发送结果[发送状态=发送_确定,MSGid=64774 C6 b 275 C18 B4 AAC 25 a 56 EFC 60000,偏移量MSGid=c0a 82 a 8300002 a9 f 000000000 cf 5c 0,消息队列=消息队列[topctestfilter,broker name=localhost。本地域,排队标识=0],排队偏移量=31]

    消息{ TopicTestFilter ',flag=0,properties={KEYS=KEY0,UNIQ _ KEY=64774 C6 b 275 C18 B4 AAC 25 a 56 EFC 60000,TAGS=TagA},body=[72,101,108,108,111,32,82,111,99,107,101,116,77,81,32,48],transactionId='null'}

    ============================

    ============================

    SendResult [sendStatus=SEND_OK,MSGid=64774 C6 b 275 C18 B4 AAC 25 a 56 efee 0001,offsetMSGid=c0a 82 a 8300002 a9 f 0000000 cf 68 f,消息队列=消息队列[topctestfilter,broker name=localhost。本地域,排队标识=1],排队偏移量=30]

    消息{ TopicTestFilter ',flag=0,properties={KEYS=KEY1,UNIQ _ KEY=64774 C6 b 275 C18 B4 AAC 25 a 56 efee 0001,TAGS=TagA},body=[72,101,108,108,111,32,82,111,99,107,101,116,77,81,32,49],transactionId='null'}

    ===ytet-伊甸园字幕组=-翻译

    ===ytet-伊甸园字幕组=-翻译

    发送结果[发送状态=发送_确定,msgid=644 C6 b275 c18b 4 AAC 25 a 56 eff 80002,偏移量msgid=c0a 82 a 8300002 a 9f 0000000 cf 75e,消息队列=消息队列[主题=topctestfilter,代理名称=localhost。本地域,排队标识=2],排队偏移量=23]

    消息{主题=' topictestfilter ',标志=0,属性={KEYS=KEY2,unique _ KEY=644 C6 b275 c18b 4 AAC 25 a 56 eff 80002,TAGS=TagA},正文=[72,101,108,111,32,82,111,99,107,101,116,77,81,32,50],transactionId='null'}

    ===ytet-伊甸园字幕组=-翻译

    ===ytet-伊甸园字幕组=-翻译

    发送结果[发送状态=发送_确定,msgid=644 C6 b275 c18b 4 AAC 25 a 56 effb 0003,offsetmsgid=c0a 82 a 8300002 a 9f 0000000000 cf 82d,消息队列=消息队列[主题=topctestfilter,代理名称=localhost。本地域,排队标识=3],排队偏移量=22]

    消息{主题=' topictestfilter ',标志=0,属性={KEYS=KEY3,unique _ KEY=64774 c 6 b275 c 18 B4 AAC 25 a 56 effb 0003,TAGS=TagA},正文=[72,101,108,111,32,82,111,99,107,101,116,77,81,32,51],transactionId='null'}

    ===ytet-伊甸园字幕组=-翻译

    ===ytet-伊甸园字幕组=-翻译

    SendResult [sendStatus=SEND_OK,msgid=644 c6b 275 C18 b 4 AAC 25 a 56 effd 0004,offsetmsgid=c0a 82 a 8300002 a 9f 000000000 cf 8fc,message queue=message queue[topctestfilter,broker name=localhost。本地域,排队标识=0],排队偏移量=32]

    消息{主题=' topictestfilter ',标志=0,属性={KEYS=KEY4,unique _ KEY=64774 c 6 b275 c 18 B4 AAC 25 a 56 effd 0004,TAGS=TagA},正文=[72,101,108,111,32,82,111,99,107,101,116,77,81,32,52],transactionId='null'}

    ===ytet-伊甸园字幕组=-翻译

    ===ytet-伊甸园字幕组=-翻译

    SendResult [sendStatus=SEND_OK,msgid=644 C6 b275 c18b 4 AAC 25 a 56 f 0020005,offsetmsgid=c0a 82 a 8300002 a 9f 0000000 cf 9cb,message queue=message queue[topctestfilter,broker name=localhost。本地域,排队标识=1],排队偏移量=31]

    消息{主题=' topictestfilter ',标志=0,属性=& gt

    KEYS=KEY5,UNIQ _ KEY=64774 c 6b 275 C18 B4 AAC 25 a 56 f 0020005,TAGS=TagA},body=[72,101,108,108,111,32,82,111,99,107,101,116,77,81,32,53],transactionId='null'}

    ============================

    1:00:47.034[净周期客户端选择器_ 1]INFO Rockemqrmemoting-closechannel 3360关闭到远程地址的连接[192。168 .44444443361

    1:00:47.036[净周期客户端选择器_ 1]INFO Rockemqrmemoting-closechannel 3360关闭到远程地址的连接[192。168 .42]结果为真:33363636466

    进程已完成,退出代码为0

    消费者日志

    =====================================

    =====================================

    message ext[broker name=localhost。本地域,queueId=0,storeSize=207,queueOffset=31,sysFlag=0,bornTimestamp=1602997246918,bornHost=/192。168 .42 .133606700,storeTimestamp=1602997246973,store host=/192。168 .42 .1313:10911

    你好火箭MQ 0

    message ext[broker name=localhost。本地域,queueId=0,storeSize=207,queueOffset=32,sysFlag=0,bornTimestamp=1602997246973,bornHost=/192。168 .42 .133606700,storeTimestamp=1602997247002,store host=/192。168 .42 .13133:10911

    =====================================

    =====================================

    =====================================

    你好RocketMQ 4

    =====================================

    message ext[broker name=localhost。本地域,queueId=1,storeSize=207,queueOffset=31,sysFlag=0,bornTimestamp=1602997246978,bornHost=/192。168 .42 .133606700,storeTimestamp=1602997247011,store host=/192。168 .42 .13133:10911

    message ext[broker name=localhost。本地域,queueId=1,storeSize=207,queueOffset=30,sysFlag=0,bornTimestamp=1602997246958,bornHost=/192。168 .42 .133606700,storeTimestamp=1602997246992,store host=/192。168 .42 .1313:10911

    你好RocketMQ 5

    =====================================

    你好火箭MQ 1

    =====================================

  • 0
  • 0
  • 0
  • 2
  • 请登录之后再进行评论

    登录
  • 单栏布局 侧栏位置: