我已经构建了一个小型Java程序,它使用MQ客户机类与Websphere MQ服务器及其上的队列进行通信.我想要的是以规则的间隔(例如每10秒)查询某个队列的大小,错误深度.
我做了什么,示意图:
qm = new MQQManager()
q = qm.accessQueue()
depth = q.getCurrentDepth()
(做点什么depth
)
q.close()
qm.close()
这就像一个魅力!
出于懒惰,我编写了我的代码,每次都要经历上述序列的所有6个步骤.所以我最后每10秒钟重复这个循环.在大约半小时内,监控服务器的人告诉我,我打开了153个实例.很明显,仅仅close()
在队列和QM上进行操作并不足以在我自己之后进行清理.
我将做一个明显的修复并坚持QM并为程序的生命周期排队.但有人可以告诉我为什么我以前的方法是泄漏资源,如果我选择遵循这条道路,我能做些什么呢?
更新(2017-01-25)
时间和接受
Roger提供了一些漂亮的代码和一些有关性能和未提交消息主题的有用解释.这很酷,但Eugene的回答恰好(并且最低限度)足以解决我的问题,而且他的速度要快一些.我根据Eugene的建议改变close()
了我的小应用程序现在正在做我想要disconnect()
的.对于谁应该获得复选标记我现在有点不知所措了.我向你们两个道歉!
我得到了我想要的东西,我想我已经完成了这个问题,但我应该补充一点,我很欣赏罗杰为这个问题的未来其他读者提供的有用信息的深思熟虑的答案.希望其他人会倾听他的意见而不是跟随我.
性能
说实话,我不会对表现有所了解.服务器比目前所做的要多得多,我的监控客户端(这个问题的主题)运行在通常为空的开发机器上.我希望它会在一两天内达到目的,然后我就把它扔掉.与此同时,如果一个连接每10秒导致一个性能问题,他们应该负载平衡到Raspberry Pi或其他什么.
未提交的消息
我认为这也是一个非问题.发送应用程序在几毫秒内以小批量约1-5条消息排队并立即提交.接收应用程序(我编写的客户端,可能有问题)持续监听队列,单独接收和确认(提交)每条消息.所以我有理由相信,永远不会有超过1条,可能还有5条未经确认的消息.与......的参数相比,这个数字是微不足道的.
我的实际问题,
这涉及到少数几百条消息,每天早上大约在同一时间处理5到20分钟的处理延迟.我怀疑我的接收客户端被一个"挂起"的网络IO操作所阻止,而不是MQS.我想要探索的一个步骤是证明MQS上确实有一个等待消息的队列,并且它没有被接收.我想看看那个队列什么时候开始建立起来,它在那里停留了多长时间以及一旦我的客户再次醒来它会多快消退.
这种连接每天可以看到2,000到10,000条消息,其中的峰值对应于下午和晚上的批处理过程,但延迟只发生在早上,通常实际上只有很小的量.我想查看队列大小的日志,以便更好地了解正在发生的事情.下一步,我可能需要在接收客户端中进行更多的检测.
环境
对于它的价值,服务器正在运行WSMQ 6,因此我的客户端使用阻塞和等待队列获取而不是我更喜欢使用的spiffy异步通知进程.我的接收客户端是在VM中在RHEL下运行的独立C应用程序.
哇!代码效率不高.连接是FAR最昂贵的MQ API调用.此外,获取队列深度毫无意义.它为您提供队列中所有未提交和已提交的消息.即队列深度可能会说队列中有5条消息,但您的程序可能只能检索3条!你会说MQ被破坏了,但实际上,2条消息是未提交的,因此对于消费者来说是不可用的.
如果你真的想每10秒获取并记录一次信息,那么就保持连接状态.
这是一种更好的方法:
boolean working = true; int openOptions = CMQC.MQOO_INQUIRE + CMQC.MQOO_FAIL_IF_QUIESCING; int depth = 0; MQQueueManager qm = null; MQQueue q = null; try { qm = new MQQueueManager("MQA1"); while (working) { try { q = qm.accessQueue("TEST.Q1", openOptions); depth = q.getCurrentDepth(); // (do something with depth) } catch (MQException e) { System.out.println("CC = " + e.completionCode + " : RC=" + e.reasonCode); System.out.println(e.getLocalizedMessage() ); working = false; } finally { if (q != null) { q.close(); q = null; } } try { if (working) Thread.sleep(10*1000); // time in milliseconds } catch (InterruptedException ie) {} } } catch (MQException e) { System.out.println("CC = " + e.completionCode + " : RC=" + e.reasonCode); System.out.println(e.getLocalizedMessage() ); } finally { try { if (q != null) q.close(); } catch (MQException e) { System.out.println("CC = " + e.completionCode + " : RC=" + e.reasonCode); System.out.println(e.getLocalizedMessage() ); } try { if (qm != null) qm.disconnect(); } catch (MQException e) { System.out.println("CC = " + e.completionCode + " : RC=" + e.reasonCode); System.out.println(e.getLocalizedMessage() ); } }