Browse code

FEATBL-880 Refactoring in SDS Proxy

Enguerran Colson authored on 24/09/2019 09:35:01
Showing 10 changed files
... ...
@@ -29,26 +29,72 @@
29 29
       <plugin id="com.fasterxml.jackson.core.jackson-core"/>
30 30
       <plugin id="com.fasterxml.jackson.core.jackson-databind"/>
31 31
       <plugin id="com.google.guava"/>
32
+      <plugin id="com.hazelcast"/>
32 33
       <plugin id="com.netflix.spectator.api"/>
34
+      <plugin id="com.ning.async-http-client"/>
33 35
       <plugin id="io.netty"/>
34 36
       <plugin id="io.vertx.core"/>
35 37
       <plugin id="io.vertx.platform"/>
38
+      <plugin id="javax.validation.api"/>
39
+      <plugin id="javax.ws.rs"/>
36 40
       <plugin id="javax.xml"/>
37 41
       <plugin id="jcl.over.slf4j"/>
38 42
       <plugin id="jul.to.slf4j"/>
43
+      <plugin id="net.bluemind.addressbook.api"/>
44
+      <plugin id="net.bluemind.authentication.api"/>
39 45
       <plugin id="net.bluemind.aws.s3"/>
40 46
       <plugin id="net.bluemind.aws.s3.utils"/>
47
+      <plugin id="net.bluemind.common.io"/>
48
+      <plugin id="net.bluemind.common.reflect"/>
49
+      <plugin id="net.bluemind.config"/>
50
+      <plugin id="net.bluemind.core.commons"/>
51
+      <plugin id="net.bluemind.core.container.api"/>
52
+      <plugin id="net.bluemind.core.context"/>
53
+      <plugin id="net.bluemind.core.rest"/>
54
+      <plugin id="net.bluemind.core.rest.common"/>
55
+      <plugin id="net.bluemind.core.rest.http"/>
56
+      <plugin id="net.bluemind.core.sessions"/>
57
+      <plugin id="net.bluemind.core.task.api"/>
58
+      <plugin id="net.bluemind.directory.api"/>
59
+      <plugin id="net.bluemind.backend.mail.api"/>
60
+      <plugin id="net.bluemind.backend.mail.replica.api"/>
61
+      <plugin id="net.bluemind.common.io"/>
62
+      <plugin id="net.bluemind.common.reflect"/>
63
+      <plugin id="net.bluemind.config"/>
64
+      <plugin id="net.bluemind.core.commons"/>
65
+      <plugin id="net.bluemind.core.container.api"/>
66
+      <plugin id="net.bluemind.core.context"/>
67
+      <plugin id="net.bluemind.core.rest"/>
68
+      <plugin id="net.bluemind.core.rest.common"/>
69
+      <plugin id="net.bluemind.core.rest.http"/>
70
+      <plugin id="net.bluemind.core.sessions"/>
71
+      <plugin id="net.bluemind.core.task.api"/>
72
+      <plugin id="net.bluemind.directory.api"/>
41 73
       <plugin id="net.bluemind.eclipse.common"/>
74
+      <plugin id="net.bluemind.group.api"/>
75
+      <plugin id="net.bluemind.hornetq.client"/>
42 76
       <plugin id="net.bluemind.lib.jackson"/>
43 77
       <plugin id="net.bluemind.lib.vertx"/>
78
+      <plugin id="net.bluemind.locator.client"/>
79
+      <plugin id="net.bluemind.mailbox.api"/>
80
+      <plugin id="net.bluemind.mailbox.identity.api"/>
44 81
       <plugin id="net.bluemind.metrics.registry"/>
82
+      <plugin id="net.bluemind.network.topology"/>
83
+      <plugin id="net.bluemind.network.topology.consumer"/>
45 84
       <plugin id="net.bluemind.sds.proxy"/>
46 85
       <plugin id="net.bluemind.sds.proxy.launcher"/>
47 86
       <plugin id="net.bluemind.sds.proxy.store.s3"/>
87
+      <plugin id="net.bluemind.server.api"/>
48 88
       <plugin id="net.bluemind.slf4j"/>
49 89
       <plugin id="net.bluemind.slf4j.configuration" fragment="true"/>
90
+      <plugin id="net.bluemind.system.api"/>
50 91
       <plugin id="net.bluemind.systemd.notify"/>
92
+      <plugin id="net.bluemind.tag.api"/>
93
+      <plugin id="net.bluemind.user.api"/>
94
+      <plugin id="net.bluemind.utils"/>
95
+      <plugin id="net.bluemind.vertx.common"/>
51 96
       <plugin id="net.java.dev.jna"/>
97
+      <plugin id="org.apache.commons.lang"/>
52 98
       <plugin id="org.eclipse.core.contenttype"/>
53 99
       <plugin id="org.eclipse.core.jobs"/>
54 100
       <plugin id="org.eclipse.core.runtime"/>
... ...
@@ -59,6 +105,7 @@
59 105
       <plugin id="org.eclipse.equinox.registry"/>
60 106
       <plugin id="org.eclipse.osgi"/>
61 107
       <plugin id="org.eclipse.update.configurator"/>
108
+      <plugin id="org.jboss.netty"/>
62 109
       <plugin id="slf4j.api"/>
63 110
    </plugins>
64 111
 
... ...
@@ -74,7 +74,7 @@ public class S3StoreTests {
74 74
 		ExistRequest er = new ExistRequest();
75 75
 		er.mailbox = "titi";
76 76
 		er.guid = UUID.randomUUID().toString();
77
-		assertFalse(store.exists(er).exist);
77
+		assertFalse(store.exists(er).exists);
78 78
 	}
79 79
 
80 80
 	@Test
... ...
@@ -85,7 +85,7 @@ public class S3StoreTests {
85 85
 		ExistRequest er = new ExistRequest();
86 86
 		er.mailbox = "titi";
87 87
 		er.guid = UUID.randomUUID().toString();
88
-		assertFalse(store.exists(er).exist);
88
+		assertFalse(store.exists(er).exists);
89 89
 
90 90
 		PutRequest pr = new PutRequest();
91 91
 		pr.mailbox = er.mailbox;
... ...
@@ -96,7 +96,7 @@ public class S3StoreTests {
96 96
 		SdsResponse resp = store.upload(pr);
97 97
 		assertNotNull(resp);
98 98
 
99
-		assertTrue(store.exists(er).exist);
99
+		assertTrue(store.exists(er).exists);
100 100
 	}
101 101
 
102 102
 	@Test
... ...
@@ -121,7 +121,7 @@ public class S3StoreTests {
121 121
 		ExistRequest er = new ExistRequest();
122 122
 		er.guid = dr.guid;
123 123
 		er.mailbox = dr.mailbox;
124
-		assertFalse(store.exists(er).exist);
124
+		assertFalse(store.exists(er).exists);
125 125
 
126 126
 	}
127 127
 
... ...
@@ -57,7 +57,7 @@ public class SdsTestStore implements ISdsBackingStore {
57 57
 	public ExistResponse exists(ExistRequest req) {
58 58
 		vertx.eventBus().publish("test.store.exists", req.guid);
59 59
 		ExistResponse er = new ExistResponse();
60
-		er.exist = true;
60
+		er.exists = true;
61 61
 		return er;
62 62
 	}
63 63
 
... ...
@@ -8,7 +8,7 @@
8 8
             impl="net.bluemind.sds.proxy.SdsProxyHttpVerticle$SdsProxyHttpFactory">
9 9
       </verticle>
10 10
       <verticle
11
-            impl="net.bluemind.sds.proxy.events.SdsEventHandlerVerticle$SdsEventFactory">
11
+            impl="net.bluemind.sds.proxy.events.SdsObjectStoreHandlerVerticle$SdsObjectStoreFactory">
12 12
       </verticle>
13 13
       <verticle
14 14
             impl="net.bluemind.sds.proxy.watchdog.WatchdogVerticle$Factory">
... ...
@@ -76,11 +76,11 @@ public class SdsProxyHttpVerticle extends Verticle {
76 76
 			logger.warn("Unknown request to {} {}", req.method(), req.absoluteURI());
77 77
 			req.response().setStatusCode(400).end();
78 78
 		});
79
-		router.head("/sds", req -> doHead(req));
80
-		router.delete("/sds", req -> doDelete(req));
81
-		router.put("/sds", req -> doPut(req));
82
-		router.get("/sds", req -> doGet(req));
83
-		router.post("/configuration", req -> configure(req));
79
+		router.head("/sds", this::exist);
80
+		router.delete("/sds", this::delete);
81
+		router.put("/sds", this::put);
82
+		router.get("/sds", this::get);
83
+		router.post("/configuration", this::configure);
84 84
 
85 85
 		srv.requestHandler(router).listen(8091, result -> {
86 86
 			if (result.succeeded()) {
... ...
@@ -91,24 +91,24 @@ public class SdsProxyHttpVerticle extends Verticle {
91 91
 		});
92 92
 	}
93 93
 
94
-	private void configure(HttpServerRequest req) {
95
-		sendBody(req, SdsAddresses.CONFIG, ConfigureResponse.class, (resp, http) -> http.setStatusCode(200).end());
94
+	private void configure(HttpServerRequest request) {
95
+		sendBody(request, SdsAddresses.CONFIG, ConfigureResponse.class, (resp, http) -> http.setStatusCode(200).end());
96 96
 	}
97 97
 
98
-	private void doHead(HttpServerRequest req) {
99
-		sendBody(req, SdsAddresses.EXIST, ExistResponse.class,
100
-				(resp, http) -> http.setStatusCode(resp.exist ? 200 : 404).end());
98
+	private void exist(HttpServerRequest request) {
99
+		sendBody(request, SdsAddresses.EXIST, ExistResponse.class,
100
+				(resp, http) -> http.setStatusCode(resp.exists ? 200 : 404).end());
101 101
 	}
102 102
 
103
-	private void doDelete(HttpServerRequest req) {
103
+	private void delete(HttpServerRequest req) {
104 104
 		sendBody(req, SdsAddresses.DELETE, SdsResponse.class, (resp, http) -> http.setStatusCode(200).end());
105 105
 	}
106 106
 
107
-	private void doPut(HttpServerRequest req) {
107
+	private void put(HttpServerRequest req) {
108 108
 		sendBody(req, SdsAddresses.PUT, SdsResponse.class, (resp, http) -> http.setStatusCode(200).end());
109 109
 	}
110 110
 
111
-	private void doGet(HttpServerRequest req) {
111
+	private void get(HttpServerRequest req) {
112 112
 		sendBody(req, SdsAddresses.GET, SdsResponse.class, (resp, http) -> http.setStatusCode(200).end());
113 113
 	}
114 114
 
... ...
@@ -116,33 +116,34 @@ public class SdsProxyHttpVerticle extends Verticle {
116 116
 			BiConsumer<T, HttpServerResponse> onSuccess) {
117 117
 		long start = registry.clock().monotonicTime();
118 118
 		HttpServerRequest req = Requests.wrap(httpReq);
119
+
119 120
 		req.bodyHandler(payload -> {
120
-			vertx.eventBus().sendWithTimeout(address, new JsonObject(payload.toString()), 3000,
121
-					(AsyncResult<Message<JsonObject>> res) -> {
122
-						Id timerId = idFactory.name("requestTime")//
123
-								.withTag("method", address)//
124
-								.withTag("status", res.succeeded() ? "OK" : "FAILED");
125
-						registry.timer(timerId).record(registry.clock().monotonicTime() - start, TimeUnit.NANOSECONDS);
126
-
127
-						if (res.succeeded()) {
128
-							String jsonString = res.result().body().encode();
129
-							try {
130
-								T objectResp = JsMapper.get().readValue(jsonString, respClass);
131
-								if (objectResp.succeeded()) {
132
-									Requests.tag(req, "method", address);
133
-									onSuccess.accept(objectResp, req.response());
134
-								} else {
135
-									req.response().setStatusMessage(objectResp.error.message).setStatusCode(500).end();
136
-								}
137
-							} catch (IOException e) {
138
-								logger.error("Error parsing {} response ({})", address, jsonString, e);
139
-								req.response().setStatusCode(500).end();
140
-							}
121
+			JsonObject json = new JsonObject(payload.toString().trim().isEmpty() ? "{}" : payload.toString());
122
+			vertx.eventBus().sendWithTimeout(address, json, 3000, (AsyncResult<Message<JsonObject>> res) -> {
123
+				Id timerId = idFactory.name("requestTime")//
124
+						.withTag("method", address)//
125
+						.withTag("status", res.succeeded() ? "OK" : "FAILED");
126
+				registry.timer(timerId).record(registry.clock().monotonicTime() - start, TimeUnit.NANOSECONDS);
127
+
128
+				if (res.succeeded()) {
129
+					String jsonString = res.result().body().encode();
130
+					try {
131
+						T objectResp = JsMapper.get().readValue(jsonString, respClass);
132
+						if (objectResp.succeeded()) {
133
+							Requests.tag(req, "method", address);
134
+							onSuccess.accept(objectResp, req.response());
141 135
 						} else {
142
-							logger.error("Call over {} failed", address, res.cause());
143
-							req.response().setStatusCode(500).end();
136
+							req.response().setStatusMessage(objectResp.error.message).setStatusCode(500).end();
144 137
 						}
145
-					});
138
+					} catch (IOException e) {
139
+						logger.error("Error parsing {} response ({})", address, jsonString, e);
140
+						req.response().setStatusCode(500).end();
141
+					}
142
+				} else {
143
+					logger.error("Call over {} failed", address, res.cause());
144
+					req.response().setStatusCode(500).end();
145
+				}
146
+			});
146 147
 		});
147 148
 
148 149
 	}
... ...
@@ -19,11 +19,11 @@ package net.bluemind.sds.proxy.dto;
19 19
 
20 20
 public class ExistResponse extends SdsResponse {
21 21
 
22
-	public boolean exist;
22
+	public boolean exists;
23 23
 
24 24
 	public static ExistResponse from(boolean known) {
25 25
 		ExistResponse er = new ExistResponse();
26
-		er.exist = known;
26
+		er.exists = known;
27 27
 		return er;
28 28
 	}
29 29
 
... ...
@@ -32,4 +32,6 @@ public class SdsAddresses {
32 32
 
33 33
 	public static final String DELETE = "sds.delete";
34 34
 
35
+	public static final String VALIDATE = "sds.validate";
36
+
35 37
 }
36 38
deleted file mode 100644
... ...
@@ -1,261 +0,0 @@
1
-/* BEGIN LICENSE
2
- * Copyright © Blue Mind SAS, 2012-2019
3
- *
4
- * This file is part of BlueMind. BlueMind is a messaging and collaborative
5
- * solution.
6
- *
7
- * This program is free software; you can redistribute it and/or modify
8
- * it under the terms of either the GNU Affero General Public License as
9
- * published by the Free Software Foundation (version 3 of the License).
10
- *
11
- * This program is distributed in the hope that it will be useful,
12
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14
- *
15
- * See LICENSE.txt
16
- * END LICENSE
17
- */
18
-package net.bluemind.sds.proxy.events;
19
-
20
-import java.io.File;
21
-import java.io.IOException;
22
-import java.nio.file.FileSystems;
23
-import java.nio.file.Files;
24
-import java.nio.file.LinkOption;
25
-import java.nio.file.Path;
26
-import java.nio.file.Paths;
27
-import java.nio.file.StandardCopyOption;
28
-import java.nio.file.StandardOpenOption;
29
-import java.nio.file.attribute.GroupPrincipal;
30
-import java.nio.file.attribute.PosixFileAttributeView;
31
-import java.nio.file.attribute.PosixFilePermissions;
32
-import java.nio.file.attribute.UserPrincipal;
33
-import java.nio.file.attribute.UserPrincipalLookupService;
34
-import java.util.List;
35
-import java.util.Map;
36
-import java.util.Optional;
37
-import java.util.concurrent.atomic.AtomicReference;
38
-import java.util.stream.Collectors;
39
-
40
-import org.slf4j.Logger;
41
-import org.slf4j.LoggerFactory;
42
-import org.vertx.java.core.eventbus.Message;
43
-import org.vertx.java.core.json.JsonObject;
44
-import org.vertx.java.platform.Verticle;
45
-
46
-import net.bluemind.eclipse.common.RunnableExtensionLoader;
47
-import net.bluemind.lib.vertx.IVerticleFactory;
48
-import net.bluemind.sds.proxy.dto.ConfigureResponse;
49
-import net.bluemind.sds.proxy.dto.DeleteRequest;
50
-import net.bluemind.sds.proxy.dto.ExistRequest;
51
-import net.bluemind.sds.proxy.dto.GetRequest;
52
-import net.bluemind.sds.proxy.dto.JsMapper;
53
-import net.bluemind.sds.proxy.dto.PutRequest;
54
-import net.bluemind.sds.proxy.dto.SdsRequest;
55
-import net.bluemind.sds.proxy.dto.SdsResponse;
56
-import net.bluemind.sds.proxy.store.ISdsBackingStore;
57
-import net.bluemind.sds.proxy.store.ISdsBackingStoreFactory;
58
-import net.bluemind.sds.proxy.store.SdsException;
59
-import net.bluemind.sds.proxy.store.dummy.DummyBackingStore;
60
-
61
-public class SdsEventHandlerVerticle extends Verticle {
62
-
63
-	private static final Logger logger = LoggerFactory.getLogger(SdsEventHandlerVerticle.class);
64
-	private static final Path config = new File("/etc/bm-sds-proxy/config.json").toPath();
65
-
66
-	public static class SdsEventFactory implements IVerticleFactory {
67
-
68
-		@Override
69
-		public boolean isWorker() {
70
-			return true;
71
-		}
72
-
73
-		@Override
74
-		public Verticle newInstance() {
75
-			return new SdsEventHandlerVerticle();
76
-		}
77
-
78
-	}
79
-
80
-	private AtomicReference<ISdsBackingStore> sdsStore = new AtomicReference<>();
81
-	private final Map<String, ISdsBackingStoreFactory> factories;
82
-	private JsonObject storeConfig;
83
-
84
-	public SdsEventHandlerVerticle() {
85
-		this.factories = loadStoreFactories();
86
-		this.storeConfig = loadConfig();
87
-		sdsStore.set(loadStore());
88
-	}
89
-
90
-	private JsonObject loadConfig() {
91
-
92
-		if (Files.exists(config)) {
93
-			try {
94
-				return new JsonObject(new String(Files.readAllBytes(config)));
95
-			} catch (IOException e) {
96
-				throw new SdsException(e);
97
-			}
98
-		} else {
99
-			logger.info("Configuration {} is missing, using defaults.", config.toFile().getAbsolutePath());
100
-			return new JsonObject();
101
-		}
102
-	}
103
-
104
-	private ISdsBackingStore loadStore() {
105
-		String storeType = storeConfig.getString("storeType");
106
-		if (storeType == null || storeType.equals("dummy") || !factories.containsKey(storeType)) {
107
-			logger.info("Defaulting to dummy store (requested: {})", storeType);
108
-			return DummyBackingStore.FACTORY.create(vertx, storeConfig);
109
-		} else {
110
-			logger.info("Loading store {}", storeType);
111
-			return factories.get(storeType).create(vertx, storeConfig);
112
-		}
113
-
114
-	}
115
-
116
-	private Map<String, ISdsBackingStoreFactory> loadStoreFactories() {
117
-		RunnableExtensionLoader<ISdsBackingStoreFactory> rel = new RunnableExtensionLoader<>();
118
-		List<ISdsBackingStoreFactory> stores = rel.loadExtensions("net.bluemind.sds.proxy", "store", "store",
119
-				"factory");
120
-		logger.info("Found {} backing store(s)", stores.size());
121
-		return stores.stream().collect(Collectors.toMap(f -> f.name(), f -> f));
122
-	}
123
-
124
-	@Override
125
-	public void start() {
126
-
127
-		UserPrincipalLookupService lookupService = FileSystems.getDefault().getUserPrincipalLookupService();
128
-		UserPrincipal cyrusUser = null;
129
-		GroupPrincipal mailGroup = null;
130
-		try {
131
-			cyrusUser = lookupService.lookupPrincipalByName("cyrus");
132
-			mailGroup = lookupService.lookupPrincipalByGroupName("mail");
133
-			logger.info("Found cyrus user {}, group {}", cyrusUser, mailGroup);
134
-		} catch (IOException e) {
135
-			logger.warn("Error looking up cyrus user: {}", e.getMessage());
136
-		}
137
-		final Optional<UserPrincipal> optCyrusUser = Optional.ofNullable(cyrusUser);
138
-		final Optional<GroupPrincipal> optMailGroup = Optional.ofNullable(mailGroup);
139
-
140
-		registerForJsonSdsRequest(SdsAddresses.EXIST, ExistRequest.class, r -> sdsStore.get().exists(r));
141
-
142
-		registerForJsonSdsRequest(SdsAddresses.DELETE, DeleteRequest.class, r -> sdsStore.get().delete(r));
143
-
144
-		registerForJsonSdsRequest(SdsAddresses.PUT, PutRequest.class, r -> sdsStore.get().upload(r));
145
-
146
-		registerForJsonSdsRequest(SdsAddresses.CONFIG, this::reConfigure);
147
-
148
-		registerForJsonSdsRequest(SdsAddresses.GET, GetRequest.class, get -> {
149
-			String finalPath = get.filename;
150
-			Path tmp = Files.createTempFile("sds", ".eml");
151
-			get.filename = tmp.toFile().getAbsolutePath();
152
-			SdsResponse resp = sdsStore.get().download(get);
153
-			if (resp.succeeded()) {
154
-				optCyrusUser.ifPresent(cyrus -> {
155
-					optMailGroup.ifPresent(mail -> {
156
-						File tempFile = tmp.toFile();
157
-						mkdirAndChown(tempFile, cyrus, mail);
158
-						move(tmp, Paths.get(finalPath));
159
-					});
160
-				});
161
-				if (!optCyrusUser.isPresent() || !optMailGroup.isPresent()) {
162
-					move(tmp, Paths.get(finalPath));
163
-				}
164
-			}
165
-			return resp;
166
-		});
167
-
168
-	}
169
-
170
-	private static void move(Path src, Path dest) {
171
-		try {
172
-			Files.move(src, dest, StandardCopyOption.REPLACE_EXISTING);
173
-		} catch (IOException e) {
174
-			logger.error(e.getMessage(), e);
175
-		}
176
-	}
177
-
178
-	private ConfigureResponse reConfigure(JsonObject req) {
179
-		logger.info("Apply configuration {}", req);
180
-		storeConfig = req;
181
-		sdsStore.set(loadStore());
182
-
183
-		try {
184
-			Files.write(config, req.encode().getBytes(), StandardOpenOption.CREATE,
185
-					StandardOpenOption.TRUNCATE_EXISTING);
186
-		} catch (IOException e) {
187
-			logger.warn("Failed to save configuration to {}", config.toFile().getAbsolutePath());
188
-		}
189
-
190
-		// for unit tests
191
-		vertx.eventBus().publish("sds.events.configuration.updated", true);
192
-
193
-		return new ConfigureResponse();
194
-	}
195
-
196
-	private void mkdirAndChown(File dest, UserPrincipal user, GroupPrincipal g) {
197
-		try {
198
-			createParentDirs(dest, user, g);
199
-			PosixFileAttributeView view = Files.getFileAttributeView(dest.toPath(), PosixFileAttributeView.class,
200
-					LinkOption.NOFOLLOW_LINKS);
201
-			view.setOwner(user);
202
-			view.setGroup(g);
203
-			if (logger.isDebugEnabled()) {
204
-				logger.debug("{} owner set to {} {}", dest.getAbsolutePath(), user, g);
205
-			}
206
-		} catch (IOException e) {
207
-			logger.error(e.getMessage(), e);
208
-		}
209
-
210
-	}
211
-
212
-	private void createParentDirs(File dest, UserPrincipal user, GroupPrincipal g) throws IOException {
213
-		File parentDir = dest.getParentFile();
214
-		if (!parentDir.exists()) {
215
-			Files.createDirectories(parentDir.toPath(),
216
-					PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxr-x---")));
217
-			while (!parentDir.getAbsolutePath().equals("/var/spool/cyrus/data")) {
218
-				PosixFileAttributeView view = Files.getFileAttributeView(parentDir.toPath(),
219
-						PosixFileAttributeView.class, LinkOption.NOFOLLOW_LINKS);
220
-				view.setOwner(user);
221
-				view.setGroup(g);
222
-				parentDir = parentDir.getParentFile();
223
-			}
224
-		}
225
-	}
226
-
227
-	private static interface UnsafeFunction<T, R> {
228
-		R apply(T param) throws Exception;
229
-	}
230
-
231
-	private <T extends SdsRequest, R extends SdsResponse> void registerForJsonSdsRequest(String address,
232
-			Class<T> reqType, UnsafeFunction<T, R> process) {
233
-		vertx.eventBus().registerHandler(address, (Message<JsonObject> msg) -> {
234
-			String jsonString = msg.body().encode();
235
-			try {
236
-				T sdsReq = JsMapper.get().readValue(jsonString, reqType);
237
-				R sdsResp = process.apply(sdsReq);
238
-				JsonObject jsResp = new JsonObject(JsMapper.get().writeValueAsString(sdsResp));
239
-				msg.reply(jsResp);
240
-			} catch (Exception e) {
241
-				logger.error("{} Error processing payload {}", address, jsonString, e);
242
-				// let the event bus timeout trigger, an http 500 will be returned
243
-			}
244
-		});
245
-	}
246
-
247
-	private <R extends SdsResponse> void registerForJsonSdsRequest(String address,
248
-			UnsafeFunction<JsonObject, R> process) {
249
-		vertx.eventBus().registerHandler(address, (Message<JsonObject> msg) -> {
250
-			try {
251
-				R sdsResp = process.apply(msg.body());
252
-				JsonObject jsResp = new JsonObject(JsMapper.get().writeValueAsString(sdsResp));
253
-				msg.reply(jsResp);
254
-			} catch (Exception e) {
255
-				logger.error("{} Error processing payload {}", address, msg.body(), e);
256
-				// let the event bus timeout trigger, an http 500 will be returned
257
-			}
258
-		});
259
-	}
260
-
261
-}
262 0
new file mode 100644
... ...
@@ -0,0 +1,261 @@
1
+/* BEGIN LICENSE
2
+ * Copyright © Blue Mind SAS, 2012-2019
3
+ *
4
+ * This file is part of BlueMind. BlueMind is a messaging and collaborative
5
+ * solution.
6
+ *
7
+ * This program is free software; you can redistribute it and/or modify
8
+ * it under the terms of either the GNU Affero General Public License as
9
+ * published by the Free Software Foundation (version 3 of the License).
10
+ *
11
+ * This program is distributed in the hope that it will be useful,
12
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14
+ *
15
+ * See LICENSE.txt
16
+ * END LICENSE
17
+ */
18
+package net.bluemind.sds.proxy.events;
19
+
20
+import java.io.File;
21
+import java.io.IOException;
22
+import java.nio.file.FileSystems;
23
+import java.nio.file.Files;
24
+import java.nio.file.LinkOption;
25
+import java.nio.file.Path;
26
+import java.nio.file.Paths;
27
+import java.nio.file.StandardCopyOption;
28
+import java.nio.file.StandardOpenOption;
29
+import java.nio.file.attribute.GroupPrincipal;
30
+import java.nio.file.attribute.PosixFileAttributeView;
31
+import java.nio.file.attribute.PosixFilePermissions;
32
+import java.nio.file.attribute.UserPrincipal;
33
+import java.nio.file.attribute.UserPrincipalLookupService;
34
+import java.util.List;
35
+import java.util.Map;
36
+import java.util.Optional;
37
+import java.util.concurrent.atomic.AtomicReference;
38
+import java.util.stream.Collectors;
39
+
40
+import org.slf4j.Logger;
41
+import org.slf4j.LoggerFactory;
42
+import org.vertx.java.core.eventbus.Message;
43
+import org.vertx.java.core.json.JsonObject;
44
+import org.vertx.java.platform.Verticle;
45
+
46
+import net.bluemind.eclipse.common.RunnableExtensionLoader;
47
+import net.bluemind.lib.vertx.IVerticleFactory;
48
+import net.bluemind.sds.proxy.dto.ConfigureResponse;
49
+import net.bluemind.sds.proxy.dto.DeleteRequest;
50
+import net.bluemind.sds.proxy.dto.ExistRequest;
51
+import net.bluemind.sds.proxy.dto.GetRequest;
52
+import net.bluemind.sds.proxy.dto.JsMapper;
53
+import net.bluemind.sds.proxy.dto.PutRequest;
54
+import net.bluemind.sds.proxy.dto.SdsRequest;
55
+import net.bluemind.sds.proxy.dto.SdsResponse;
56
+import net.bluemind.sds.proxy.store.ISdsBackingStore;
57
+import net.bluemind.sds.proxy.store.ISdsBackingStoreFactory;
58
+import net.bluemind.sds.proxy.store.SdsException;
59
+import net.bluemind.sds.proxy.store.dummy.DummyBackingStore;
60
+
61
+public class SdsObjectStoreHandlerVerticle extends Verticle {
62
+
63
+	private static final Logger logger = LoggerFactory.getLogger(SdsObjectStoreHandlerVerticle.class);
64
+	private static final Path config = new File("/etc/bm-sds-proxy/config.json").toPath();
65
+
66
+	public static class SdsObjectStoreFactory implements IVerticleFactory {
67
+
68
+		@Override
69
+		public boolean isWorker() {
70
+			return true;
71
+		}
72
+
73
+		@Override
74
+		public Verticle newInstance() {
75
+			return new SdsObjectStoreHandlerVerticle();
76
+		}
77
+
78
+	}
79
+
80
+	private AtomicReference<ISdsBackingStore> sdsStore = new AtomicReference<>();
81
+	private final Map<String, ISdsBackingStoreFactory> factories;
82
+	private JsonObject storeConfig;
83
+
84
+	public SdsObjectStoreHandlerVerticle() {
85
+		this.factories = loadStoreFactories();
86
+		this.storeConfig = loadConfig();
87
+		sdsStore.set(loadStore());
88
+	}
89
+
90
+	private JsonObject loadConfig() {
91
+
92
+		if (Files.exists(config)) {
93
+			try {
94
+				return new JsonObject(new String(Files.readAllBytes(config)));
95
+			} catch (IOException e) {
96
+				throw new SdsException(e);
97
+			}
98
+		} else {
99
+			logger.info("Configuration {} is missing, using defaults.", config.toFile().getAbsolutePath());
100
+			return new JsonObject();
101
+		}
102
+	}
103
+
104
+	private ISdsBackingStore loadStore() {
105
+		String storeType = storeConfig.getString("storeType");
106
+		if (storeType == null || storeType.equals("dummy") || !factories.containsKey(storeType)) {
107
+			logger.info("Defaulting to dummy store (requested: {})", storeType);
108
+			return DummyBackingStore.FACTORY.create(vertx, storeConfig);
109
+		} else {
110
+			logger.info("Loading store {}", storeType);
111
+			return factories.get(storeType).create(vertx, storeConfig);
112
+		}
113
+
114
+	}
115
+
116
+	private Map<String, ISdsBackingStoreFactory> loadStoreFactories() {
117
+		RunnableExtensionLoader<ISdsBackingStoreFactory> rel = new RunnableExtensionLoader<>();
118
+		List<ISdsBackingStoreFactory> stores = rel.loadExtensions("net.bluemind.sds.proxy", "store", "store",
119
+				"factory");
120
+		logger.info("Found {} backing store(s)", stores.size());
121
+		return stores.stream().collect(Collectors.toMap(f -> f.name(), f -> f));
122
+	}
123
+
124
+	@Override
125
+	public void start() {
126
+
127
+		UserPrincipalLookupService lookupService = FileSystems.getDefault().getUserPrincipalLookupService();
128
+		UserPrincipal cyrusUser = null;
129
+		GroupPrincipal mailGroup = null;
130
+		try {
131
+			cyrusUser = lookupService.lookupPrincipalByName("cyrus");
132
+			mailGroup = lookupService.lookupPrincipalByGroupName("mail");
133
+			logger.info("Found cyrus user {}, group {}", cyrusUser, mailGroup);
134
+		} catch (IOException e) {
135
+			logger.warn("Error looking up cyrus user: {}", e.getMessage());
136
+		}
137
+		final Optional<UserPrincipal> optCyrusUser = Optional.ofNullable(cyrusUser);
138
+		final Optional<GroupPrincipal> optMailGroup = Optional.ofNullable(mailGroup);
139
+
140
+		registerForJsonSdsRequest(SdsAddresses.EXIST, ExistRequest.class, r -> sdsStore.get().exists(r));
141
+
142
+		registerForJsonSdsRequest(SdsAddresses.DELETE, DeleteRequest.class, r -> sdsStore.get().delete(r));
143
+
144
+		registerForJsonSdsRequest(SdsAddresses.PUT, PutRequest.class, r -> sdsStore.get().upload(r));
145
+
146
+		registerForJsonSdsRequest(SdsAddresses.CONFIG, this::reConfigure);
147
+
148
+		registerForJsonSdsRequest(SdsAddresses.GET, GetRequest.class, get -> {
149
+			String finalPath = get.filename;
150
+			Path tmp = Files.createTempFile("sds", ".eml");
151
+			get.filename = tmp.toFile().getAbsolutePath();
152
+			SdsResponse resp = sdsStore.get().download(get);
153
+			if (resp.succeeded()) {
154
+				optCyrusUser.ifPresent(cyrus -> {
155
+					optMailGroup.ifPresent(mail -> {
156
+						File tempFile = tmp.toFile();
157
+						mkdirAndChown(tempFile, cyrus, mail);
158
+						move(tmp, Paths.get(finalPath));
159
+					});
160
+				});
161
+				if (!optCyrusUser.isPresent() || !optMailGroup.isPresent()) {
162
+					move(tmp, Paths.get(finalPath));
163
+				}
164
+			}
165
+			return resp;
166
+		});
167
+
168
+	}
169
+
170
+	private static void move(Path src, Path dest) {
171
+		try {
172
+			Files.move(src, dest, StandardCopyOption.REPLACE_EXISTING);
173
+		} catch (IOException e) {
174
+			logger.error(e.getMessage(), e);
175
+		}
176
+	}
177
+
178
+	private ConfigureResponse reConfigure(JsonObject req) {
179
+		logger.info("Apply configuration {}", req);
180
+		storeConfig = req;
181
+		sdsStore.set(loadStore());
182
+
183
+		try {
184
+			Files.write(config, req.encode().getBytes(), StandardOpenOption.CREATE,
185
+					StandardOpenOption.TRUNCATE_EXISTING);
186
+		} catch (IOException e) {
187
+			logger.warn("Failed to save configuration to {}", config.toFile().getAbsolutePath());
188
+		}
189
+
190
+		// for unit tests
191
+		vertx.eventBus().publish("sds.events.configuration.updated", true);
192
+
193
+		return new ConfigureResponse();
194
+	}
195
+
196
+	private void mkdirAndChown(File dest, UserPrincipal user, GroupPrincipal g) {
197
+		try {
198
+			createParentDirs(dest, user, g);
199
+			PosixFileAttributeView view = Files.getFileAttributeView(dest.toPath(), PosixFileAttributeView.class,
200
+					LinkOption.NOFOLLOW_LINKS);
201
+			view.setOwner(user);
202
+			view.setGroup(g);
203
+			if (logger.isDebugEnabled()) {
204
+				logger.debug("{} owner set to {} {}", dest.getAbsolutePath(), user, g);
205
+			}
206
+		} catch (IOException e) {
207
+			logger.error(e.getMessage(), e);
208
+		}
209
+
210
+	}
211
+
212
+	private void createParentDirs(File dest, UserPrincipal user, GroupPrincipal g) throws IOException {
213
+		File parentDir = dest.getParentFile();
214
+		if (!parentDir.exists()) {
215
+			Files.createDirectories(parentDir.toPath(),
216
+					PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxr-x---")));
217
+			while (!parentDir.getAbsolutePath().equals("/var/spool/cyrus/data")) {
218
+				PosixFileAttributeView view = Files.getFileAttributeView(parentDir.toPath(),
219
+						PosixFileAttributeView.class, LinkOption.NOFOLLOW_LINKS);
220
+				view.setOwner(user);
221
+				view.setGroup(g);
222
+				parentDir = parentDir.getParentFile();
223
+			}
224
+		}
225
+	}
226
+
227
+	private static interface UnsafeFunction<T, R> {
228
+		R apply(T param) throws Exception;
229
+	}
230
+
231
+	private <T extends SdsRequest, R extends SdsResponse> void registerForJsonSdsRequest(String address,
232
+			Class<T> reqType, UnsafeFunction<T, R> process) {
233
+		vertx.eventBus().registerHandler(address, (Message<JsonObject> msg) -> {
234
+			String jsonString = msg.body().encode();
235
+			try {
236
+				T sdsReq = JsMapper.get().readValue(jsonString, reqType);
237
+				R sdsResp = process.apply(sdsReq);
238
+				JsonObject jsResp = new JsonObject(JsMapper.get().writeValueAsString(sdsResp));
239
+				msg.reply(jsResp);
240
+			} catch (Exception e) {
241
+				logger.error("{} Error processing payload {}", address, jsonString, e);
242
+				// let the event bus timeout trigger, an http 500 will be returned
243
+			}
244
+		});
245
+	}
246
+
247
+	private <R extends SdsResponse> void registerForJsonSdsRequest(String address,
248
+			UnsafeFunction<JsonObject, R> process) {
249
+		vertx.eventBus().registerHandler(address, (Message<JsonObject> msg) -> {
250
+			try {
251
+				R sdsResp = process.apply(msg.body());
252
+				JsonObject jsResp = new JsonObject(JsMapper.get().writeValueAsString(sdsResp));
253
+				msg.reply(jsResp);
254
+			} catch (Exception e) {
255
+				logger.error("{} Error processing payload {}", address, msg.body(), e);
256
+				// let the event bus timeout trigger, an http 500 will be returned
257
+			}
258
+		});
259
+	}
260
+
261
+}
... ...
@@ -55,7 +55,7 @@ public class DummyBackingStore implements ISdsBackingStore {
55 55
 	@Override
56 56
 	public ExistResponse exists(ExistRequest exist) {
57 57
 		ExistResponse resp = new ExistResponse();
58
-		resp.exist = new File("/dummy-sds/", exist.guid).exists();
58
+		resp.exists = new File("/dummy-sds/", exist.guid).exists();
59 59
 		return resp;
60 60
 	}
61 61