Browse code

Feat: might help with our topic listeners getting dropped when the cluster state is unstable

Thomas Cataldo authored on 09/03/2018 17:50:17
Showing 2 changed files
... ...
@@ -46,6 +46,7 @@ import com.hazelcast.core.MemberAttributeEvent;
46 46
 import com.hazelcast.core.MembershipEvent;
47 47
 import com.hazelcast.core.MembershipListener;
48 48
 import com.hazelcast.core.Message;
49
+import com.hazelcast.core.MessageListener;
49 50
 
50 51
 import net.bluemind.config.BmIni;
51 52
 import net.bluemind.hornetq.client.Consumer;
... ...
@@ -250,7 +251,7 @@ public abstract class ClusterNode {
250 251
 		hzStart.thenAccept(hz -> {
251 252
 			try {
252 253
 				ITopic<String> hzTopic = hz.getReliableTopic(topic);
253
-				String regId = hzTopic.addMessageListener((Message<String> message) -> {
254
+				MessageListener<String> basicListener = (Message<String> message) -> {
254 255
 					JsonObject payload = new JsonObject(message.getMessageObject());
255 256
 					if (filter != null) {
256 257
 						if (filter.test(payload)) {
... ...
@@ -261,7 +262,8 @@ public abstract class ClusterNode {
261 262
 					} else {
262 263
 						handler.handle(new OOPMessage(payload));
263 264
 					}
264
-				});
265
+				};
266
+				String regId = hzTopic.addMessageListener(new TopicListener(basicListener));
265 267
 				consumerRegistrations.put(regId, rc);
266 268
 				cons.complete(new Consumer(() -> {
267 269
 					hz.removeDistributedObjectListener(regId);
268 270
new file mode 100644
... ...
@@ -0,0 +1,34 @@
1
+/* BEGIN LICENSE
2
+  * Copyright © Blue Mind SAS, 2012-2018
3
+  *
4
+  * This file is part of BlueMind. BlueMind is a messaging and collaborative
5
+  * solution.
6
+  *
7
+  * This program is free software; you can redistribute it and/or modify
8
+  * it under the terms of either the GNU Affero General Public License as
9
+  * published by the Free Software Foundation (version 3 of the License).
10
+  *
11
+  * This program is distributed in the hope that it will be useful,
12
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14
+  *
15
+  * See LICENSE.txt
16
+  * END LICENSE
17
+  */
18
+package net.bluemind.hornetq.client.impl;
19
+
20
+import com.hazelcast.core.MessageListener;
21
+import com.hazelcast.topic.impl.reliable.ReliableMessageListenerAdapter;
22
+
23
+public class TopicListener extends ReliableMessageListenerAdapter<String> {
24
+
25
+	public TopicListener(MessageListener<String> ml) {
26
+		super(ml);
27
+	}
28
+
29
+	@Override
30
+	public boolean isLossTolerant() {
31
+		return true;
32
+	}
33
+
34
+}