Browse code

[replication] Feat: when the replication goes wild, start introducing delays (metrics are reported when the circuit break triggers)

Thomas Cataldo authored on 10/04/2019 15:53:43
Showing 3 changed files
... ...
@@ -66,6 +66,7 @@ import net.bluemind.backend.cyrus.replication.server.cmd.IAsyncReplicationComman
66 66
 import net.bluemind.backend.cyrus.replication.server.state.ReplicationState;
67 67
 import net.bluemind.backend.cyrus.replication.server.state.StorageApiLink;
68 68
 import net.bluemind.lib.vertx.VertxPlatform;
69
+import net.bluemind.lib.vertx.utils.CircuitBreaker;
69 70
 import net.bluemind.metrics.registry.IdFactory;
70 71
 import net.bluemind.metrics.registry.MetricsRegistry;
71 72
 
... ...
@@ -94,6 +95,8 @@ public class ReplicationSession {
94 95
 
95 96
 	private List<IReplicationObserver> observers;
96 97
 
98
+	private final CircuitBreaker<ReplicationSession> circuitBreaker;
99
+
97 100
 	public ReplicationSession(Vertx vertx, NetSocket client, StorageApiLink storage,
98 101
 			List<IReplicationObserver> observers) {
99 102
 		this.vertx = vertx;
... ...
@@ -106,6 +109,7 @@ public class ReplicationSession {
106 109
 		this.stopFuture = new CompletableFuture<>();
107 110
 		this.observers = observers;
108 111
 		logger.debug("Created for vertx {}", this.vertx);
112
+		this.circuitBreaker = new CircuitBreaker<>(session -> session.client.writeHandlerID());
109 113
 	}
110 114
 
111 115
 	public ReplicationState state() {
... ...
@@ -173,8 +177,12 @@ public class ReplicationSession {
173 177
 
174 178
 	private CompletableFuture<CommandResult> run(IAsyncReplicationCommand cmd, Token t, ReplicationFrame frame) {
175 179
 		try {
176
-			return cmd.doIt(this, t, frame).exceptionally(error -> {
180
+			return cmd.doIt(this, t, frame).thenApply(result -> {
181
+				circuitBreaker.noticeSuccess(this);
182
+				return result;
183
+			}).exceptionally(error -> {
177 184
 				logger.error("NO {}", error.getMessage(), error);
185
+				circuitBreaker.noticeError(this);
178 186
 				return CommandResult.error(error);
179 187
 			});
180 188
 		} catch (Exception e) {
... ...
@@ -194,7 +202,7 @@ public class ReplicationSession {
194 202
 		} else {
195 203
 			logger.info("REPL S: {}", forLog);
196 204
 		}
197
-		return write(res);
205
+		return circuitBreaker.apply(vertx, this, () -> write(res));
198 206
 	}
199 207
 
200 208
 	private CompletableFuture<Void> write(String s) {
... ...
@@ -42,7 +42,7 @@ public class AnnotationStore extends JdbcAbstractStore {
42 42
 				+ AnnotationColumns.COLUMNS.values() + ") ON CONFLICT (mbox, user_id, entry) DO UPDATE SET ("
43 43
 				+ AnnotationColumns.COLUMNS.names() + ") = (" + AnnotationColumns.COLUMNS.values() + ")";
44 44
 		insert(query, qr, Arrays.asList(AnnotationColumns.values(), AnnotationColumns.values()));
45
-		logger.info("annot {} inserted.", qr);
45
+		logger.info("annot {} upserted.", qr);
46 46
 	}
47 47
 
48 48
 	public void delete(MailboxAnnotation qr) throws SQLException {
... ...
@@ -20,6 +20,7 @@ package net.bluemind.lib.vertx.utils;
20 20
 import java.util.concurrent.Callable;
21 21
 import java.util.concurrent.CompletableFuture;
22 22
 import java.util.function.Function;
23
+import java.util.function.Supplier;
23 24
 
24 25
 import org.slf4j.Logger;
25 26
 import org.slf4j.LoggerFactory;
... ...
@@ -80,4 +81,27 @@ public class CircuitBreaker<T> {
80 81
 		return future;
81 82
 	}
82 83
 
84
+	public <R> CompletableFuture<R> apply(Vertx vertx, T partitionable, Supplier<CompletableFuture<R>> to) {
85
+		String partKey = partition.apply(partitionable);
86
+		int count = errorCounts.count(partKey);
87
+		if (count == 0) {
88
+			return to.get();
89
+		} else {
90
+			CompletableFuture<R> future = new CompletableFuture<>();
91
+			long delayMs = Math.min(5, count) * 500;
92
+			registry.counter(idFactory.name("circuitBreakerDelays", "delay", Long.toString(delayMs))).increment();
93
+			logger.warn("[{}] Adding a {}ms delay to error-prone operation, errorCount: {}", partKey, delayMs, count);
94
+			vertx.setTimer(delayMs, tid -> {
95
+				to.get().whenComplete((R res, Throwable ex) -> {
96
+					if (ex != null) {
97
+						future.completeExceptionally(ex);
98
+					} else {
99
+						future.complete(res);
100
+					}
101
+				});
102
+			});
103
+			return future;
104
+		}
105
+	}
106
+
83 107
 }