消息事务
参考: https://github.com/zjpjohn/ReliableMeageSystem
消息记录和业务DB在一个库中,业务DB操作完成后(事务完成后),开始发送消息。需要用到Spring中的事务同步TransactionSynchronizationAdapter
MessageProducer
1 2 3 4 5 6 7 8 9
| public void sendMessage(Map<String, String> data) { Preconditions.checkArgument(data != null && data.size() != 0, "message must not be empty..."); transactionSynchronize(); QMessage message = convertMessage(data); int result = qMessageService.addQMessage(message); if (result != 0) { MessageHolder.set(message.getMessageId()); } }
|
1 2 3 4 5 6 7 8 9
|
private void transactionSynchronize() { MessageTransactionSynchronizationAdapter synchronizationAdapter = new MessageTransactionSynchronizationAdapter(); synchronizationAdapter.setqMessageService(qMessageService); synchronizationAdapter.setTransactionMessageProducer(transactionMessageProducer); TransactionSynchronizationManager.registerSynchronization(synchronizationAdapter); }
|
MessageTransactionSynchronizationAdapter
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override public void afterCompletion(int status) { try { if (STATUS_COMMITTED == status) { List<String> messageIds = MessageHolder.get(); sendMessageToBroker(messageIds); } else if (STATUS_ROLLED_BACK == status) { log.warn("事务提交失败,数据库回滚后,清空缓存中的消息:{}", MessageHolder.get()); } } finally { MessageHolder.clear(); } }
|
TransactionMessageProducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public void sendMessage(QMessage qMessage) { Session session = null; try { session = connection.createSession(qMessage.getTransaction() != 0, ActiveMQSession.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(qMessage.getDestination()); MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(qMessage.getPersistent() != 0 ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); MapMessage message = session.createMapMessage(); message.setString("messageId", qMessage.getMessageId()); message.setString("data", qMessage.getMessageContent()); message.setString("timeStamp", String.valueOf(qMessage.getTimeStamp())); if (qMessage.getN2() != 0) { if (StringUtils.isNotBlank(qMessage.getBusinessMark())) { message.setString("businessMark", qMessage.getBusinessMark()); } else { throw new RuntimeException("n2 level message require businessMark not empty..."); } } producer.send(message); if (qMessage.getTransaction() != 0) { session.commit(); } messageCallback.onSuccess(qMessage.getMessageId()); } catch (JMSException e) { log.error("send message to broker error:{}", e); messageCallback.onFail(e,qMessage.getMessageId()); } finally { if (session != null) { try { session.close(); } catch (JMSException e) { log.error("close session error:{}", e); } } } }
|