Browse code

[milter] BM-10807 Feat: vertx based io layer for milter instead of hand crafted nio stuff

Thomas Cataldo authored on 08/01/2018 14:57:10
Showing 11 changed files
... ...
@@ -7,6 +7,8 @@
7 7
    </configIni>
8 8
 
9 9
    <launcherArgs>
10
+      <vmArgs>-Dosgi.noShutdown=true -Djava.security.egd=file:/dev/urandom -Duser.timezone=GMT -Djava.awt.headless=true -Dnet.bluemind.property.product=bm-milter
11
+      </vmArgs>
10 12
       <vmArgsMac>-XstartOnFirstThread -Dorg.eclipse.swt.internal.carbon.smallFonts
11 13
       </vmArgsMac>
12 14
    </launcherArgs>
... ...
@@ -31,6 +33,7 @@
31 31
       <plugin id="com.fasterxml.jackson.dataformat.jackson-dataformat-cbor"/>
32 32
       <plugin id="com.fasterxml.jackson.dataformat.jackson-dataformat-smile"/>
33 33
       <plugin id="com.google.guava"/>
34
+      <plugin id="com.hazelcast"/>
34 35
       <plugin id="com.hazelcast.client" fragment="true"/>
35 36
       <plugin id="com.netflix.servo.core"/>
36 37
       <plugin id="com.ning.async-http-client"/>
... ...
@@ -41,6 +44,7 @@
41 41
       <plugin id="io.netty"/>
42 42
       <plugin id="io.vertx.core"/>
43 43
       <plugin id="io.vertx.platform"/>
44
+      <plugin id="javax.validation.api"/>
44 45
       <plugin id="javax.ws.rs"/>
45 46
       <plugin id="javax.xml"/>
46 47
       <plugin id="jcl.over.slf4j"/>
... ...
@@ -74,6 +78,7 @@
74 74
       <plugin id="net.bluemind.lib.servo"/>
75 75
       <plugin id="net.bluemind.lib.vertx"/>
76 76
       <plugin id="net.bluemind.locator.client"/>
77
+      <plugin id="net.bluemind.locator.vertxclient"/>
77 78
       <plugin id="net.bluemind.mailbox.api"/>
78 79
       <plugin id="net.bluemind.mailbox.hook"/>
79 80
       <plugin id="net.bluemind.mailbox.identity.api"/>
... ...
@@ -81,7 +86,6 @@
81 81
       <plugin id="net.bluemind.mailflow.common.api"/>
82 82
       <plugin id="net.bluemind.mailflow.rbe"/>
83 83
       <plugin id="net.bluemind.mailflow.rules"/>
84
-      <plugin id="net.bluemind.mailindex"/>
85 84
       <plugin id="net.bluemind.mailshare.api"/>
86 85
       <plugin id="net.bluemind.milter"/>
87 86
       <plugin id="net.bluemind.milter.action.signature"/>
... ...
@@ -89,9 +93,12 @@
89 89
       <plugin id="net.bluemind.neko.common"/>
90 90
       <plugin id="net.bluemind.node.api"/>
91 91
       <plugin id="net.bluemind.node.client"/>
92
+      <plugin id="net.bluemind.node.shared"/>
92 93
       <plugin id="net.bluemind.osgi.log"/>
94
+      <plugin id="net.bluemind.role.api"/>
93 95
       <plugin id="net.bluemind.server.api"/>
94 96
       <plugin id="net.bluemind.slf4j"/>
97
+      <plugin id="net.bluemind.slf4j.configuration" fragment="true"/>
95 98
       <plugin id="net.bluemind.system.api"/>
96 99
       <plugin id="net.bluemind.system.stateobserver"/>
97 100
       <plugin id="net.bluemind.tag.api"/>
... ...
@@ -120,8 +127,7 @@
120 120
       <plugin id="org.eclipse.osgi.compatibility.state" fragment="true"/>
121 121
       <plugin id="org.eclipse.update.configurator"/>
122 122
       <plugin id="org.elasticsearch"/>
123
-      <plugin id="org.hornetq.all"/>
124
-      <plugin id="org.jboss.logging.jboss-logging"/>
123
+      <plugin id="org.elasticsearch.module.reindex"/>
125 124
       <plugin id="org.jboss.netty"/>
126 125
       <plugin id="slf4j.api"/>
127 126
    </plugins>
... ...
@@ -21,12 +21,14 @@ package net.bluemind.eclipse.common;
21 21
 import java.util.Collections;
22 22
 import java.util.LinkedList;
23 23
 import java.util.List;
24
+import java.util.Objects;
24 25
 import java.util.stream.Collectors;
25 26
 
26 27
 import org.eclipse.core.runtime.CoreException;
27 28
 import org.eclipse.core.runtime.IConfigurationElement;
28 29
 import org.eclipse.core.runtime.IExtension;
29 30
 import org.eclipse.core.runtime.IExtensionPoint;
31
+import org.eclipse.core.runtime.IExtensionRegistry;
30 32
 import org.eclipse.core.runtime.Platform;
31 33
 import org.slf4j.Logger;
32 34
 import org.slf4j.LoggerFactory;
... ...
@@ -103,7 +105,9 @@ public class RunnableExtensionLoader<T> {
103 103
 	 */
104 104
 	public List<T> loadExtensions(String pluginId, String pointName, String element, String attribute) {
105 105
 		List<T> factories = new LinkedList<T>();
106
-		IExtensionPoint point = Platform.getExtensionRegistry().getExtensionPoint(pluginId, pointName);
106
+		IExtensionRegistry registry = Platform.getExtensionRegistry();
107
+		Objects.requireNonNull(registry, "OSGi registry is null");
108
+		IExtensionPoint point = registry.getExtensionPoint(pluginId, pointName);
107 109
 		if (point == null) {
108 110
 			logger.error("point " + pluginId + "." + pointName + " [" + element + " " + attribute + "=XXX] not found.");
109 111
 			return factories;
... ...
@@ -15,6 +15,8 @@ import org.apache.log4j.Category;
15 15
 public class JilterPacket {
16 16
 	private static Category log = Category.getInstance(JilterPacket.class.getName());
17 17
 
18
+	public static final int MAX_PACKET_SIZE = 60 * 1024 * 1024;
19
+
18 20
 	private static final int STATE_COLLECTING_LENGTH = 0;
19 21
 	private static final int STATE_COLLECTING_COMMAND = 1;
20 22
 	private static final int STATE_COLLECTING_DATA = 2;
... ...
@@ -47,6 +49,10 @@ public class JilterPacket {
47 47
 				}
48 48
 
49 49
 				if (this.currentLengthLength == 4) {
50
+					if (currentLength > MAX_PACKET_SIZE || currentLength < 0) {
51
+						throw new IOException("Invalid packet length (" + currentLength + ")");
52
+					}
53
+
50 54
 					currentState = STATE_COLLECTING_COMMAND;
51 55
 					--this.currentLength; // Minus one for the command byte
52 56
 					log.debug("Collected length is " + this.currentLength);
... ...
@@ -8,7 +8,6 @@ Require-Bundle: org.eclipse.core.runtime,
8 8
  net.bluemind.jilter;bundle-version="1.0.0",
9 9
  net.bluemind.slf4j;bundle-version="1.0.0",
10 10
  com.google.guava;bundle-version="19.0.0",
11
- net.bluemind.lib.servo;bundle-version="1.0.0",
12 11
  net.bluemind.mime4j.common;bundle-version="1.0.0",
13 12
  net.bluemind.mime4j.common;bundle-version="1.0.0",
14 13
  net.bluemind.eclipse.common;bundle-version="1.0.0",
... ...
@@ -3,14 +3,13 @@
3 3
 <plugin>
4 4
    <extension-point id="net.bluemind.milter.milterfactory" name="milterfactory" schema="schema/net.bluemind.milter.milterfactory.exsd"/>
5 5
    <extension-point id="net.bluemind.milter.actionfactory" name="actionfactory" schema="schema/net.bluemind.milter.actionfactory.exsd"/>
6
-   <extension-point id="net.bluemind.milter.rulefactory" name="rule_factory" schema="schema/net.bluemind.milter.rulefactory.exsd"/>
7 6
    <extension
8 7
          id="milter"
9 8
          name="Blue Mind milter"
10 9
          point="org.eclipse.core.runtime.applications">
11 10
       <application
12 11
             cardinality="singleton-global"
13
-            thread="main"
12
+            thread="any"
14 13
             visible="true">
15 14
          <run
16 15
                class="net.bluemind.milter.MilterApplication">
... ...
@@ -41,12 +40,18 @@
41 41
             impl="net.bluemind.milter.impl.DomainAliasCacheFactory">
42 42
       </verticle>
43 43
    </extension>
44
-      <extension
44
+   <extension
45 45
          point="net.bluemind.lib.vertx.verticles">
46 46
       <verticle
47 47
             impl="net.bluemind.milter.impl.RuleAssignmentCacheFactory">
48 48
       </verticle>
49 49
    </extension>
50
+   <extension
51
+         point="net.bluemind.lib.vertx.verticles">
52
+      <verticle
53
+            impl="net.bluemind.milter.impl.MilterMainVerticle$Factory">
54
+      </verticle>
55
+   </extension>
50 56
       <extension
51 57
             point="net.bluemind.system.state">
52 58
          <state-listener
... ...
@@ -1,12 +1,6 @@
1 1
 package net.bluemind.milter;
2 2
 
3
-import java.io.IOException;
4
-import java.net.InetSocketAddress;
5
-import java.nio.channels.ServerSocketChannel;
6
-import java.nio.channels.SocketChannel;
7 3
 import java.util.concurrent.CompletableFuture;
8
-import java.util.concurrent.ExecutorService;
9
-import java.util.concurrent.Executors;
10 4
 
11 5
 import org.eclipse.equinox.app.IApplication;
12 6
 import org.eclipse.equinox.app.IApplicationContext;
... ...
@@ -15,45 +9,21 @@ import org.slf4j.LoggerFactory;
15 15
 
16 16
 import net.bluemind.hornetq.client.MQ;
17 17
 import net.bluemind.lib.vertx.VertxPlatform;
18
-import net.bluemind.milter.impl.MilterHandler;
19
-import net.bluemind.milter.impl.MilterRunnable;
20 18
 
21 19
 public class MilterApplication implements IApplication {
22 20
 
23 21
 	private static final Logger logger = LoggerFactory.getLogger(MilterApplication.class);
24 22
 
25
-	private ServerSocketChannel channel;
26
-	private boolean stopped;
27
-
28
-	private ExecutorService tp;
29
-
30 23
 	@Override
31 24
 	public Object start(IApplicationContext context) throws Exception {
25
+		logger.info("Starting MILTER...");
32 26
 		launchVerticles().whenComplete((v, ex) -> {
33
-			if (null != ex) {
27
+			if (ex != null) {
34 28
 				logger.warn("Cannot spawn verticles", ex);
29
+			} else {
30
+				logger.info("Startup complete.");
35 31
 			}
36 32
 
37
-			this.tp = Executors.newCachedThreadPool();
38
-
39
-			try {
40
-				channel = ServerSocketChannel.open();
41
-				channel.configureBlocking(true);
42
-				InetSocketAddress isa = new InetSocketAddress("0.0.0.0", 2500);
43
-				channel.socket().bind(isa);
44
-
45
-				this.stopped = false;
46
-				logger.info("Starting milter server on port 2500...");
47
-				MilterHandler.init();
48
-				while (!stopped) {
49
-					SocketChannel con = channel.accept();
50
-					MilterRunnable mr = new MilterRunnable(con);
51
-					logger.info("submitting MilterRunnable");
52
-					tp.execute(mr);
53
-				}
54
-			} catch (Exception e) {
55
-				logger.warn("Cannot initiate server socket", e);
56
-			}
57 33
 		});
58 34
 
59 35
 		return IApplication.EXIT_OK;
... ...
@@ -79,13 +49,6 @@ public class MilterApplication implements IApplication {
79 79
 
80 80
 	@Override
81 81
 	public void stop() {
82
-		stopped = true;
83
-		logger.info("Shutting down thread pool...");
84
-		tp.shutdown();
85
-		try {
86
-			channel.close();
87
-		} catch (IOException e) {
88
-			logger.error(e.getMessage(), e);
89
-		}
82
+		logger.info("Shutting down.");
90 83
 	}
91 84
 }
... ...
@@ -28,7 +28,6 @@ import com.sendmail.jilter.JilterStatus;
28 28
 import net.bluemind.core.container.model.ItemValue;
29 29
 import net.bluemind.core.rest.IServiceProvider;
30 30
 import net.bluemind.domain.api.Domain;
31
-import net.bluemind.lib.servo.ServoRegistry;
32 31
 import net.bluemind.mailflow.api.ExecutionMode;
33 32
 import net.bluemind.mailflow.api.MailRuleActionAssignment;
34 33
 import net.bluemind.mailflow.common.api.SendingAs;
... ...
@@ -47,11 +46,7 @@ public class MilterHandler implements JilterHandler {
47 47
 
48 48
 	private static final Logger logger = LoggerFactory.getLogger(MilterHandler.class);
49 49
 
50
-	private static final SmtpdStats stats;
51
-
52 50
 	static {
53
-		stats = new SmtpdStats();
54
-		ServoRegistry.setupMonitoring("milter", stats);
55 51
 		logger.info("JMX stats registered.");
56 52
 	}
57 53
 
58 54
new file mode 100644
... ...
@@ -0,0 +1,65 @@
0
+/* BEGIN LICENSE
1
+  * Copyright © Blue Mind SAS, 2012-2018
2
+  *
3
+  * This file is part of BlueMind. BlueMind is a messaging and collaborative
4
+  * solution.
5
+  *
6
+  * This program is free software; you can redistribute it and/or modify
7
+  * it under the terms of either the GNU Affero General Public License as
8
+  * published by the Free Software Foundation (version 3 of the License).
9
+  *
10
+  * This program is distributed in the hope that it will be useful,
11
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13
+  *
14
+  * See LICENSE.txt
15
+  * END LICENSE
16
+  */
17
+package net.bluemind.milter.impl;
18
+
19
+import org.slf4j.Logger;
20
+import org.slf4j.LoggerFactory;
21
+import org.vertx.java.core.Future;
22
+import org.vertx.java.core.net.NetServer;
23
+import org.vertx.java.platform.Verticle;
24
+
25
+import net.bluemind.lib.vertx.IVerticleFactory;
26
+
27
+public class MilterMainVerticle extends Verticle {
28
+
29
+	private static final Logger logger = LoggerFactory.getLogger(MilterMainVerticle.class);
30
+
31
+	public static class Factory implements IVerticleFactory {
32
+
33
+		@Override
34
+		public boolean isWorker() {
35
+			return false;
36
+		}
37
+
38
+		@Override
39
+		public Verticle newInstance() {
40
+			return new MilterMainVerticle();
41
+		}
42
+
43
+	}
44
+
45
+	public void start(Future<Void> start) {
46
+		NetServer srv = vertx.createNetServer();
47
+		srv.setUsePooledBuffers(true).setTCPNoDelay(true).setTCPKeepAlive(true);
48
+
49
+		srv.connectHandler(socket -> {
50
+			MilterSession session = new MilterSession(vertx, socket);
51
+			session.start();
52
+		});
53
+		srv.listen(2500, ar -> {
54
+			if (ar.succeeded()) {
55
+				logger.info("Milter verticle listening.");
56
+				start.setResult(null);
57
+			} else {
58
+				start.setFailure(ar.cause());
59
+			}
60
+		});
61
+
62
+	}
63
+
64
+}
0 65
deleted file mode 100644
... ...
@@ -1,52 +0,0 @@
1
-package net.bluemind.milter.impl;
2
-
3
-import java.io.IOException;
4
-import java.nio.ByteBuffer;
5
-import java.nio.channels.SocketChannel;
6
-
7
-import org.slf4j.Logger;
8
-import org.slf4j.LoggerFactory;
9
-
10
-import com.sendmail.jilter.JilterProcessor;
11
-
12
-public class MilterRunnable implements Runnable {
13
-
14
-	private static final Logger logger = LoggerFactory.getLogger(MilterRunnable.class);
15
-
16
-	private SocketChannel con;
17
-	private JilterProcessor jp;
18
-
19
-	public MilterRunnable(SocketChannel con) {
20
-		this.con = con;
21
-		try {
22
-			con.configureBlocking(true);
23
-		} catch (IOException e) {
24
-			logger.error(e.getMessage(), e);
25
-		}
26
-		MilterHandler handler = new MilterHandler(MLRegistry.getFactories());
27
-		this.jp = new JilterProcessor(handler);
28
-	}
29
-
30
-	@Override
31
-	public void run() {
32
-		ByteBuffer dataBuffer = ByteBuffer.allocateDirect(4096);
33
-		try {
34
-			while (jp.process(con, (ByteBuffer) dataBuffer.flip())) {
35
-				dataBuffer.compact();
36
-				if (con.read(dataBuffer) == -1) {
37
-					logger.info("socket reports EOF, exiting read loop");
38
-					break;
39
-				}
40
-			}
41
-		} catch (IOException e) {
42
-			logger.error(e.getMessage(), e);
43
-		} finally {
44
-			try {
45
-				con.close();
46
-			} catch (IOException e) {
47
-				logger.error(e.getMessage(), e);
48
-			}
49
-			con = null;
50
-		}
51
-	}
52
-}
53 1
new file mode 100644
... ...
@@ -0,0 +1,100 @@
0
+/* BEGIN LICENSE
1
+  * Copyright © Blue Mind SAS, 2012-2018
2
+  *
3
+  * This file is part of BlueMind. BlueMind is a messaging and collaborative
4
+  * solution.
5
+  *
6
+  * This program is free software; you can redistribute it and/or modify
7
+  * it under the terms of either the GNU Affero General Public License as
8
+  * published by the Free Software Foundation (version 3 of the License).
9
+  *
10
+  * This program is distributed in the hope that it will be useful,
11
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13
+  *
14
+  * See LICENSE.txt
15
+  * END LICENSE
16
+  */
17
+package net.bluemind.milter.impl;
18
+
19
+import java.io.IOException;
20
+import java.nio.ByteBuffer;
21
+import java.nio.channels.WritableByteChannel;
22
+
23
+import org.slf4j.Logger;
24
+import org.slf4j.LoggerFactory;
25
+import org.vertx.java.core.Context;
26
+import org.vertx.java.core.Vertx;
27
+import org.vertx.java.core.buffer.Buffer;
28
+import org.vertx.java.core.net.NetSocket;
29
+
30
+import com.sendmail.jilter.JilterProcessor;
31
+
32
+import io.netty.buffer.ByteBuf;
33
+import io.netty.buffer.Unpooled;
34
+
35
+public class MilterSession {
36
+
37
+	private static final Logger logger = LoggerFactory.getLogger(MilterSession.class);
38
+	private final NetSocket socket;
39
+	private final JilterProcessor jp;
40
+	private final Vertx vertx;
41
+
42
+	public MilterSession(Vertx vertx, NetSocket socket) {
43
+		this.socket = socket;
44
+		this.vertx = vertx;
45
+		MilterHandler handler = new MilterHandler(MLRegistry.getFactories());
46
+		this.jp = new JilterProcessor(handler);
47
+	}
48
+
49
+	public void start() {
50
+		Context ctx = vertx.currentContext();
51
+		WritableByteChannel sink = new WritableByteChannel() {
52
+
53
+			@Override
54
+			public boolean isOpen() {
55
+				return true;
56
+			}
57
+
58
+			@Override
59
+			public void close() throws IOException {
60
+				socket.close();
61
+			}
62
+
63
+			@Override
64
+			public int write(ByteBuffer src) throws IOException {
65
+				ByteBuf netty = Unpooled.wrappedBuffer(src);
66
+				int ret = netty.readableBytes();
67
+				ctx.runOnContext(v -> {
68
+					socket.write(new Buffer(netty));
69
+				});
70
+				return ret;
71
+			}
72
+		};
73
+
74
+		socket.dataHandler(buf -> {
75
+			ByteBuf nettyBuffer = buf.getByteBuf();
76
+			logger.debug("Process {}", nettyBuffer);
77
+			ByteBuffer nioBuffer = nettyBuffer.nioBuffer();
78
+
79
+			try {
80
+				boolean ret = jp.process(sink, nioBuffer);
81
+				logger.debug("processed: {}", ret);
82
+			} catch (IOException e) {
83
+				logger.error(e.getMessage(), e);
84
+				socket.close();
85
+			}
86
+
87
+		});
88
+		socket.closeHandler(v -> {
89
+			logger.info("{} closed.", socket.writeHandlerID());
90
+			stop();
91
+		});
92
+		logger.info("Session started for {}", socket.writeHandlerID());
93
+	}
94
+
95
+	public void stop() {
96
+		logger.info("{} stopped.", socket.writeHandlerID());
97
+	}
98
+
99
+}
0 100
deleted file mode 100644
... ...
@@ -1,21 +0,0 @@
1
-package net.bluemind.milter.impl;
2
-
3
-import java.util.concurrent.atomic.AtomicLong;
4
-
5
-import com.netflix.servo.annotations.DataSourceType;
6
-import com.netflix.servo.annotations.Monitor;
7
-
8
-public final class SmtpdStats {
9
-
10
-	@Monitor(name = "accepted", type = DataSourceType.COUNTER)
11
-	private AtomicLong accepted;
12
-
13
-	public SmtpdStats() {
14
-		this.accepted = new AtomicLong(0);
15
-	}
16
-
17
-	public void newAccept() {
18
-		accepted.incrementAndGet();
19
-	}
20
-
21
-}