Browse code

[node] BM-13610 Fix: implement a websocket retry policy

Thomas Cataldo authored on 08/10/2018 09:47:25
Showing 12 changed files
... ...
@@ -57,6 +57,7 @@
57 57
       <plugin id="net.bluemind.cli.index"/>
58 58
       <plugin id="net.bluemind.cli.launcher"/>
59 59
       <plugin id="net.bluemind.cli.metrics"/>
60
+      <plugin id="net.bluemind.cli.node"/>
60 61
       <plugin id="net.bluemind.cli.utils"/>
61 62
       <plugin id="net.bluemind.common.io"/>
62 63
       <plugin id="net.bluemind.common.reflect"/>
63 64
new file mode 100644
... ...
@@ -0,0 +1,7 @@
0
+<?xml version="1.0" encoding="UTF-8"?>
1
+<classpath>
2
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"/>
3
+	<classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
4
+	<classpathentry kind="src" path="src"/>
5
+	<classpathentry kind="output" path="bin"/>
6
+</classpath>
0 7
new file mode 100644
... ...
@@ -0,0 +1,28 @@
0
+<?xml version="1.0" encoding="UTF-8"?>
1
+<projectDescription>
2
+	<name>net.bluemind.cli.node</name>
3
+	<comment></comment>
4
+	<projects>
5
+	</projects>
6
+	<buildSpec>
7
+		<buildCommand>
8
+			<name>org.eclipse.jdt.core.javabuilder</name>
9
+			<arguments>
10
+			</arguments>
11
+		</buildCommand>
12
+		<buildCommand>
13
+			<name>org.eclipse.pde.ManifestBuilder</name>
14
+			<arguments>
15
+			</arguments>
16
+		</buildCommand>
17
+		<buildCommand>
18
+			<name>org.eclipse.pde.SchemaBuilder</name>
19
+			<arguments>
20
+			</arguments>
21
+		</buildCommand>
22
+	</buildSpec>
23
+	<natures>
24
+		<nature>org.eclipse.pde.PluginNature</nature>
25
+		<nature>org.eclipse.jdt.core.javanature</nature>
26
+	</natures>
27
+</projectDescription>
0 28
new file mode 100644
... ...
@@ -0,0 +1,15 @@
0
+Manifest-Version: 1.0
1
+Bundle-ManifestVersion: 2
2
+Bundle-Name: net.bluemind.cli.node
3
+Bundle-SymbolicName: net.bluemind.cli.node;singleton:=true
4
+Bundle-Version: 3.1.0.qualifier
5
+Bundle-Activator: net.bluemind.cli.node.Activator
6
+Bundle-Vendor: bluemind.net
7
+Require-Bundle: org.eclipse.core.runtime,
8
+ net.bluemind.cli.cmd.api,
9
+ net.bluemind.server.api,
10
+ net.bluemind.core.container.api;bundle-version="3.1.0",
11
+ net.bluemind.config
12
+Bundle-RequiredExecutionEnvironment: JavaSE-1.8
13
+Automatic-Module-Name: net.bluemind.cli.node
14
+Bundle-ActivationPolicy: lazy
0 15
new file mode 100644
... ...
@@ -0,0 +1,5 @@
0
+source.. = src/
1
+output.. = bin/
2
+bin.includes = META-INF/,\
3
+               .,\
4
+               plugin.xml
0 5
new file mode 100644
... ...
@@ -0,0 +1,12 @@
0
+<?xml version="1.0" encoding="UTF-8"?>
1
+<?eclipse version="3.4"?>
2
+<plugin>
3
+   <extension
4
+         point="net.bluemind.cli.cmd.api.cmdlet">
5
+      <registration
6
+            impl="net.bluemind.cli.node.StatusCommand$Reg"
7
+            priority="500">
8
+      </registration>
9
+   </extension>
10
+
11
+</plugin>
0 12
new file mode 100644
... ...
@@ -0,0 +1,10 @@
0
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
1
+  <modelVersion>4.0.0</modelVersion>
2
+  <parent>
3
+    <groupId>net.bluemind</groupId>
4
+    <version>3.1.0-SNAPSHOT</version>
5
+    <artifactId>net.bluemind.cli.plugins</artifactId>
6
+  </parent>
7
+  <artifactId>net.bluemind.cli.node</artifactId>
8
+  <packaging>eclipse-plugin</packaging>
9
+</project>
0 10
new file mode 100644
... ...
@@ -0,0 +1,94 @@
0
+/* BEGIN LICENSE
1
+  * Copyright © Blue Mind SAS, 2012-2018
2
+  *
3
+  * This file is part of BlueMind. BlueMind is a messaging and collaborative
4
+  * solution.
5
+  *
6
+  * This program is free software; you can redistribute it and/or modify
7
+  * it under the terms of either the GNU Affero General Public License as
8
+  * published by the Free Software Foundation (version 3 of the License).
9
+  *
10
+  * This program is distributed in the hope that it will be useful,
11
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13
+  *
14
+  * See LICENSE.txt
15
+  * END LICENSE
16
+  */
17
+package net.bluemind.cli.node;
18
+
19
+import java.util.List;
20
+import java.util.concurrent.CompletionService;
21
+import java.util.concurrent.ExecutorCompletionService;
22
+import java.util.concurrent.ExecutorService;
23
+import java.util.concurrent.Executors;
24
+import java.util.stream.Collectors;
25
+import java.util.stream.Stream;
26
+
27
+import io.airlift.airline.Option;
28
+import net.bluemind.cli.cmd.api.CliContext;
29
+import net.bluemind.cli.cmd.api.ICmdLet;
30
+import net.bluemind.config.InstallationId;
31
+import net.bluemind.core.container.model.ItemValue;
32
+import net.bluemind.server.api.IServer;
33
+import net.bluemind.server.api.Server;
34
+
35
+public abstract class AbstractNodeOperation implements ICmdLet, Runnable {
36
+
37
+	@Override
38
+	public final Runnable forContext(CliContext ctx) {
39
+		this.ctx = ctx;
40
+		return this;
41
+	}
42
+
43
+	protected CliContext ctx;
44
+
45
+	@Option(name = "--tag", description = "select servers tagged X")
46
+	public String tag;
47
+
48
+	@Option(name = "--uid", description = "select server with given uid")
49
+	public String uid;
50
+
51
+	@Option(name = "--addr", description = "select server with given address")
52
+	public String address;
53
+
54
+	@Option(name = "--workers", description = "run with X workers")
55
+	public int workers = 1;
56
+
57
+	@Override
58
+	public final void run() {
59
+		IServer serversApi = ctx.adminApi().instance(IServer.class, InstallationId.getIdentifier());
60
+		List<ItemValue<Server>> allServers = serversApi.allComplete();
61
+		Stream<ItemValue<Server>> stream = allServers.stream();
62
+		if (tag != null) {
63
+			stream = stream.filter(iv -> iv.value.tags.contains(tag));
64
+		}
65
+		if (uid != null) {
66
+			stream = stream.filter(iv -> uid.equals(iv.uid));
67
+		}
68
+		if (address != null) {
69
+			stream = stream.filter(iv -> address.equals(iv.value.address()));
70
+		}
71
+		List<ItemValue<Server>> serversList = stream.collect(Collectors.toList());
72
+		// create executor & completion service with workers thread
73
+		ExecutorService pool = Executors.newFixedThreadPool(workers);
74
+		CompletionService<Void> opsWatcher = new ExecutorCompletionService<>(pool);
75
+
76
+		for (ItemValue<Server> srv : serversList) {
77
+			opsWatcher.submit(() -> {
78
+				synchronousServerOperation(serversApi, srv);
79
+				return null;
80
+			});
81
+		}
82
+		serversList.forEach(de -> {
83
+			try {
84
+				opsWatcher.take().get();
85
+			} catch (Exception e) {
86
+				e.printStackTrace();
87
+			}
88
+		});
89
+	}
90
+
91
+	protected abstract void synchronousServerOperation(IServer serversApi, ItemValue<Server> srv);
92
+
93
+}
0 94
new file mode 100644
... ...
@@ -0,0 +1,30 @@
0
+package net.bluemind.cli.node;
1
+
2
+import org.osgi.framework.BundleActivator;
3
+import org.osgi.framework.BundleContext;
4
+
5
+public class Activator implements BundleActivator {
6
+
7
+	private static BundleContext context;
8
+
9
+	static BundleContext getContext() {
10
+		return context;
11
+	}
12
+
13
+	/*
14
+	 * (non-Javadoc)
15
+	 * @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext)
16
+	 */
17
+	public void start(BundleContext bundleContext) throws Exception {
18
+		Activator.context = bundleContext;
19
+	}
20
+
21
+	/*
22
+	 * (non-Javadoc)
23
+	 * @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext)
24
+	 */
25
+	public void stop(BundleContext bundleContext) throws Exception {
26
+		Activator.context = null;
27
+	}
28
+
29
+}
0 30
new file mode 100644
... ...
@@ -0,0 +1,76 @@
0
+/* BEGIN LICENSE
1
+  * Copyright © Blue Mind SAS, 2012-2018
2
+  *
3
+  * This file is part of BlueMind. BlueMind is a messaging and collaborative
4
+  * solution.
5
+  *
6
+  * This program is free software; you can redistribute it and/or modify
7
+  * it under the terms of either the GNU Affero General Public License as
8
+  * published by the Free Software Foundation (version 3 of the License).
9
+  *
10
+  * This program is distributed in the hope that it will be useful,
11
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13
+  *
14
+  * See LICENSE.txt
15
+  * END LICENSE
16
+  */
17
+package net.bluemind.cli.node;
18
+
19
+import java.util.Optional;
20
+
21
+import io.airlift.airline.Command;
22
+import net.bluemind.cli.cmd.api.ICmdLet;
23
+import net.bluemind.cli.cmd.api.ICmdLetRegistration;
24
+import net.bluemind.core.api.fault.ServerFault;
25
+import net.bluemind.core.container.model.ItemValue;
26
+import net.bluemind.server.api.IServer;
27
+import net.bluemind.server.api.Server;
28
+
29
+/**
30
+ * This is the defaut command on node related stuff to ensure we don't run a
31
+ * destructive op by default
32
+ *
33
+ */
34
+@Command(name = "status", description = "Show node(s) availability")
35
+public class StatusCommand extends AbstractNodeOperation {
36
+
37
+	public static class Reg implements ICmdLetRegistration {
38
+
39
+		@Override
40
+		public Optional<String> group() {
41
+			return Optional.of("node");
42
+		}
43
+
44
+		@Override
45
+		public Class<? extends ICmdLet> commandClass() {
46
+			return StatusCommand.class;
47
+		}
48
+
49
+	}
50
+
51
+	@Override
52
+	protected void synchronousServerOperation(IServer serversApi, ItemValue<Server> srv) {
53
+		try {
54
+			byte[] content = serversApi.readFile(srv.uid, "/etc/bm/bm.ini");
55
+			if (content != null && content.length > 0) {
56
+				reportSuccess(srv);
57
+			} else {
58
+				reportFailure(srv);
59
+			}
60
+		} catch (ServerFault sf) {
61
+			reportFailure(srv);
62
+		}
63
+	}
64
+
65
+	private void reportFailure(ItemValue<Server> srv) {
66
+		System.out.println(ctx.ansi().a("Server " + srv.value.address() + " (" + srv.uid + ") ").fgBrightRed()
67
+				.a("FAILED").reset());
68
+	}
69
+
70
+	private void reportSuccess(ItemValue<Server> srv) {
71
+		System.out.println(
72
+				ctx.ansi().a("Server " + srv.value.address() + " (" + srv.uid + ") ").fgBrightGreen().a("OK").reset());
73
+	}
74
+
75
+}
... ...
@@ -17,6 +17,7 @@
17 17
 		<module>net.bluemind.cli.directory.common</module>
18 18
                 <module>net.bluemind.cli.adm</module>
19 19
 		<module>net.bluemind.cli.index</module>
20
+		<module>net.bluemind.cli.node</module>
20 21
 
21 22
 	</modules>
22 23
 
... ...
@@ -18,11 +18,17 @@
18 18
  */
19 19
 package net.bluemind.node.client.impl;
20 20
 
21
+import java.util.Arrays;
21 22
 import java.util.Map;
23
+import java.util.Timer;
24
+import java.util.TimerTask;
22 25
 import java.util.concurrent.CompletableFuture;
23 26
 import java.util.concurrent.ConcurrentHashMap;
27
+import java.util.concurrent.ExecutionException;
24 28
 import java.util.concurrent.TimeUnit;
29
+import java.util.concurrent.TimeoutException;
25 30
 import java.util.concurrent.atomic.AtomicLong;
31
+import java.util.concurrent.atomic.AtomicReference;
26 32
 
27 33
 import org.slf4j.Logger;
28 34
 import org.slf4j.LoggerFactory;
... ...
@@ -32,68 +38,113 @@ import com.ning.http.client.ws.WebSocket;
32 32
 import com.ning.http.client.ws.WebSocketTextListener;
33 33
 import com.ning.http.client.ws.WebSocketUpgradeHandler;
34 34
 
35
-import net.bluemind.core.api.fault.ServerFault;
36 35
 import net.bluemind.node.api.ProcessHandler;
37 36
 
38 37
 public class WebsocketLink {
39 38
 
39
+	private static class NodeTextListener implements WebSocketTextListener {
40
+
41
+		private final AtomicReference<WebSocket> wsRef;
42
+		private final WebsocketLink link;
43
+		private final CompletableFuture<Void> firstConnect;
44
+
45
+		public NodeTextListener(AtomicReference<WebSocket> socketQueue, CompletableFuture<Void> firstConnect,
46
+				WebsocketLink link) {
47
+			this.wsRef = socketQueue;
48
+			this.link = link;
49
+			this.firstConnect = firstConnect;
50
+		}
51
+
52
+		@Override
53
+		public void onOpen(WebSocket ws) {
54
+			logger.debug("websocket opened: {}", ws);
55
+			wsRef.set(ws);
56
+			if (!firstConnect.isDone()) {
57
+				firstConnect.complete(null);
58
+			}
59
+		}
60
+
61
+		@Override
62
+		public void onError(Throwable t) {
63
+			logger.error("websocket error: {}", t.getMessage());
64
+			retryLater();
65
+		}
66
+
67
+		@Override
68
+		public void onClose(WebSocket websocket) {
69
+			logger.info("ws closed {}", websocket);
70
+			retryLater();
71
+		}
72
+
73
+		private void retryLater() {
74
+			logger.info("Queue retry in 1sec...");
75
+			new Timer("ws-retry-" + System.nanoTime(), true).schedule(new TimerTask() {
76
+
77
+				@Override
78
+				public void run() {
79
+					link.retry();
80
+				}
81
+			}, 1000L);
82
+		}
83
+
84
+		@Override
85
+		public void onMessage(String message) {
86
+			link.onMessage(message);
87
+		}
88
+
89
+	}
90
+
91
+	private static class NodeSocketHandler extends WebSocketUpgradeHandler {
92
+
93
+		public NodeSocketHandler(NodeTextListener listener) {
94
+			super(Arrays.asList(listener));
95
+		}
96
+
97
+	}
98
+
40 99
 	private static final Logger logger = LoggerFactory.getLogger(WebsocketLink.class);
41 100
 	private static final AtomicLong wsIdGen = new AtomicLong();
42 101
 
43
-	private final CompletableFuture<WebSocket> webSocket;
102
+	private final AtomicReference<WebSocket> webSocket;
103
+	private final CompletableFuture<Void> firstConnect;
44 104
 	private final Map<Long, ProcessHandler> execHandlers;
105
+	private final NodeTextListener msgListener;
106
+	private final NodeSocketHandler upgradeHandler;
107
+	private final String wsUrl;
108
+	private final HostPortClient cli;
45 109
 
46 110
 	public WebsocketLink(HostPortClient cli) {
47
-		this.webSocket = new CompletableFuture<>();
48 111
 		this.execHandlers = new ConcurrentHashMap<>();
112
+		this.webSocket = new AtomicReference<>();
113
+		this.firstConnect = new CompletableFuture<>();
114
+		this.wsUrl = (cli.isSSL() ? "wss" : "ws") + "://" + cli.getHost() + ":" + cli.getPort() + "/ws";
115
+		this.cli = cli;
116
+		this.msgListener = new NodeTextListener(webSocket, firstConnect, this);
117
+		this.upgradeHandler = new NodeSocketHandler(msgListener);
118
+
119
+		retry();
120
+		cli.setWebsocketLink(this);
121
+	}
49 122
 
50
-		WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler.Builder()
51
-				.addWebSocketListener(new WebSocketTextListener() {
52
-
53
-					@Override
54
-					public void onOpen(WebSocket ws) {
55
-						logger.debug("websocket opened: {}", ws);
56
-						webSocket.complete(ws);
57
-					}
58
-
59
-					@Override
60
-					public void onError(Throwable t) {
61
-						logger.error("websocket error: {}", t.getMessage());
62
-						webSocket.completeExceptionally(t);
63
-					}
64
-
65
-					@Override
66
-					public void onClose(WebSocket websocket) {
67
-						logger.info("ws closed {}", websocket);
68
-					}
69
-
70
-					@Override
71
-					public void onMessage(String message) {
72
-						logger.debug("onMessage: {}", message);
73
-						JsonObject msg = new JsonObject(message);
74
-						long rid = msg.getLong("ws-rid", 0L);
75
-						ProcessHandler ph = execHandlers.get(rid);
76
-						if (ph != null) {
77
-							handleWebSocketFrame(rid, ph, msg);
78
-						}
79
-					}
80
-				}).build();
81
-		String wsUrl = (cli.isSSL() ? "wss" : "ws") + "://" + cli.getHost() + ":" + cli.getPort() + "/ws";
82
-		logger.debug("wsUrl: {}", wsUrl);
83
-		try {
84
-			cli.getClient().prepareGet(wsUrl).execute(upgradeHandler);
85
-		} catch (Exception e) {
86
-			logger.warn("Node websocket is not available {}", e.getMessage(), e);
87
-			webSocket.completeExceptionally(e);
123
+	public void retry() {
124
+		cli.getClient().prepareGet(wsUrl).execute(upgradeHandler);
125
+	}
126
+
127
+	private void onMessage(String message) {
128
+		logger.debug("onMessage: {}", message);
129
+		JsonObject msg = new JsonObject(message);
130
+		long rid = msg.getLong("ws-rid", 0L);
131
+		ProcessHandler ph = execHandlers.get(rid);
132
+		if (ph != null) {
133
+			handleWebSocketFrame(rid, ph, msg);
88 134
 		}
89
-		cli.setWebsocketLink(this);
90 135
 	}
91 136
 
92 137
 	public void waitAvailable(long time, TimeUnit unit) {
93 138
 		try {
94
-			webSocket.get(time, unit);
95
-		} catch (Exception e) {
96
-			throw new ServerFault(e);
139
+			firstConnect.get(time, unit);
140
+		} catch (InterruptedException | ExecutionException | TimeoutException e) {
141
+			logger.error(e.getMessage(), e);
97 142
 		}
98 143
 	}
99 144
 
... ...
@@ -121,23 +172,21 @@ public class WebsocketLink {
121 121
 	}
122 122
 
123 123
 	public void startWsAction(JsonObject wsReq, ProcessHandler ph) {
124
-		webSocket.whenComplete((ws, t) -> {
125
-			if (t != null) {
126
-				// reject directly as there is a socket problem
124
+		WebSocket ws = webSocket.get();
125
+		if (ws == null) {
126
+			logger.error("Error command as websocket is missing");
127
+			ph.completed(1);
128
+		} else {
129
+			if (!ws.isOpen()) {
130
+				logger.error("Rejecting command as websocket is closed.");
127 131
 				ph.completed(1);
128 132
 			} else {
129
-				if (!ws.isOpen()) {
130
-					logger.error("Rejecting command as websocket is closed.");
131
-					ph.completed(1);
132
-				} else {
133
-
134
-					long rid = wsIdGen.incrementAndGet();
135
-					wsReq.putNumber("ws-rid", rid);
136
-					execHandlers.put(rid, ph);
137
-					ws.sendMessage(wsReq.encode());
138
-				}
133
+				long rid = wsIdGen.incrementAndGet();
134
+				wsReq.putNumber("ws-rid", rid);
135
+				execHandlers.put(rid, ph);
136
+				ws.sendMessage(wsReq.encode());
139 137
 			}
140
-		});
138
+		}
141 139
 
142 140
 	}
143 141