Browse code

BM-11536 Fix: cleanup ScheduledJobStore

David Phan authored on 10/01/2018 07:50:21
Showing 3 changed files
... ...
@@ -90,7 +90,7 @@ public class ScheduledJobStoreTests {
90 90
 
91 91
 	@After
92 92
 	public void after() throws Exception {
93
-		// JdbcTestHelper.getInstance().afterTest();
93
+		JdbcTestHelper.getInstance().afterTest();
94 94
 	}
95 95
 
96 96
 	@Test
... ...
@@ -24,14 +24,31 @@ import java.sql.ResultSet;
24 24
 import java.sql.SQLException;
25 25
 import java.sql.Timestamp;
26 26
 import java.sql.Types;
27
+import java.text.ParseException;
28
+import java.util.Calendar;
27 29
 import java.util.Date;
30
+import java.util.List;
31
+import java.util.Map;
32
+
33
+import org.quartz.CronExpression;
34
+import org.slf4j.Logger;
35
+import org.slf4j.LoggerFactory;
28 36
 
29 37
 import net.bluemind.core.jdbc.Columns;
38
+import net.bluemind.scheduledjob.api.Job;
39
+import net.bluemind.scheduledjob.api.JobDomainStatus;
30 40
 import net.bluemind.scheduledjob.api.JobExecution;
31 41
 import net.bluemind.scheduledjob.api.JobExitStatus;
42
+import net.bluemind.scheduledjob.api.JobPlanification;
43
+import net.bluemind.scheduledjob.api.JobRec;
44
+import net.bluemind.scheduledjob.api.LogEntry;
45
+import net.bluemind.scheduledjob.api.LogLevel;
46
+import net.bluemind.scheduledjob.api.PlanKind;
32 47
 
33 48
 public class JobExecutionColumn {
34 49
 
50
+	private static final Logger logger = LoggerFactory.getLogger(JobExecutionColumn.class);
51
+
35 52
 	public static final Columns cols = Columns.create() //
36 53
 			.col("exec_group") //
37 54
 			.col("domain_name")//
... ...
@@ -66,17 +83,32 @@ public class JobExecutionColumn {
66 66
 
67 67
 	}
68 68
 
69
-	public static ScheduledJobStore.EntityPopulator<JobExecution> populator() {
69
+	public static ScheduledJobStore.Creator<JobExecution> jobExecutionCreator() {
70
+		return new ScheduledJobStore.Creator<JobExecution>() {
71
+
72
+			@Override
73
+			public JobExecution create(ResultSet con) throws SQLException {
74
+				return new JobExecution();
75
+			}
76
+
77
+		};
78
+	}
79
+
80
+	public static ScheduledJobStore.EntityPopulator<JobExecution> jobExecutionPopulator() {
70 81
 		return new ScheduledJobStore.EntityPopulator<JobExecution>() {
71 82
 
72 83
 			@Override
73 84
 			public int populate(ResultSet rs, int index, JobExecution value) throws SQLException {
74 85
 
75 86
 				value.id = rs.getInt(index++);
76
-				// value.domain = rs.getWTF(index++);
87
+				value.execGroup = rs.getString(index++);
88
+				value.domainName = rs.getString(index++);
77 89
 				value.jobId = rs.getString(index++);
78 90
 				value.startDate = new Date(rs.getTimestamp(index++).getTime());
79
-				value.endDate = new Date(rs.getTimestamp(index++).getTime());
91
+				Timestamp endDate = rs.getTimestamp(index++);
92
+				if (endDate != null) {
93
+					value.endDate = new Date(endDate.getTime());
94
+				}
80 95
 				value.status = JobExitStatus.valueOf(rs.getString(index++));
81 96
 
82 97
 				return index;
... ...
@@ -85,4 +117,160 @@ public class JobExecutionColumn {
85 85
 		};
86 86
 	}
87 87
 
88
+	public static ScheduledJobStore.Creator<Job> jobCreator() {
89
+		return new ScheduledJobStore.Creator<Job>() {
90
+
91
+			@Override
92
+			public Job create(ResultSet con) throws SQLException {
93
+				return new Job();
94
+			}
95
+
96
+		};
97
+	}
98
+
99
+	public static ScheduledJobStore.EntityPopulator<Job> jobPopulator() {
100
+		return new ScheduledJobStore.EntityPopulator<Job>() {
101
+
102
+			@Override
103
+			public int populate(ResultSet rs, int index, Job value) throws SQLException {
104
+
105
+				value.id = rs.getString(index++);
106
+				value.sendReport = rs.getBoolean(index++);
107
+				String recipients = rs.getString(index++);
108
+				if (recipients != null && !recipients.isEmpty()) {
109
+					value.recipients = recipients;
110
+				}
111
+
112
+				return index;
113
+			}
114
+
115
+		};
116
+	}
117
+
118
+	public static ScheduledJobStore.EntityPopulator<Job> jobStatusAndPlansPopulator(Map<String, Job> idIndex) {
119
+		return new ScheduledJobStore.EntityPopulator<Job>() {
120
+
121
+			@Override
122
+			public int populate(ResultSet rs, int index, Job value) throws SQLException {
123
+
124
+				String domainName = rs.getString(index++);
125
+				String jid = rs.getString(index++);
126
+
127
+				value = idIndex.get(jid);
128
+
129
+				List<JobDomainStatus> domainStatus = value.domainStatus;
130
+				List<JobPlanification> domainPlanification = value.domainPlanification;
131
+				JobPlanification jp = new JobPlanification();
132
+				jp.kind = PlanKind.valueOf(rs.getString(index++));
133
+				jp.lastRun = rs.getTimestamp(index++);
134
+				jp.domain = domainName;
135
+				String cs = rs.getString(index++);
136
+				if (jp.kind == PlanKind.SCHEDULED) {
137
+					JobRec rec = new JobRec();
138
+					rec.cronString = cs;
139
+					jp.rec = rec;
140
+					if (cs != null) {
141
+						try {
142
+							CronExpression ce = new CronExpression(cs);
143
+							Date nextRun = null;
144
+							if (jp.lastRun != null) {
145
+								logger.debug("lastRun: " + jp.lastRun);
146
+								nextRun = ce.getNextValidTimeAfter(jp.lastRun);
147
+							} else {
148
+								Calendar cal = Calendar.getInstance();
149
+								cal.add(Calendar.MINUTE, -1);
150
+								nextRun = ce.getNextValidTimeAfter(cal.getTime());
151
+							}
152
+							jp.nextRun = nextRun;
153
+						} catch (ParseException pe) {
154
+							logger.error("Invalid cron string: '" + cs + "' (" + pe.getMessage() + ")");
155
+						}
156
+					}
157
+				}
158
+				domainPlanification.add(jp);
159
+
160
+				String statusString = rs.getString(index++);
161
+				if (statusString != null) {
162
+					JobDomainStatus ds = new JobDomainStatus();
163
+					ds.domain = domainName;
164
+					ds.status = JobExitStatus.valueOf(statusString);
165
+					domainStatus.add(ds);
166
+				} else {
167
+					logger.warn("No recorded execution in database for " + value.id + "@" + domainName);
168
+				}
169
+				value.sendReport = rs.getBoolean(index++);
170
+				value.recipients = rs.getString(index++);
171
+
172
+				return index;
173
+			}
174
+
175
+		};
176
+	}
177
+
178
+	public static ScheduledJobStore.Creator<LogEntry> logEntryCreator() {
179
+		return new ScheduledJobStore.Creator<LogEntry>() {
180
+
181
+			@Override
182
+			public LogEntry create(ResultSet con) throws SQLException {
183
+				return new LogEntry();
184
+			}
185
+
186
+		};
187
+	}
188
+
189
+	public static ScheduledJobStore.StatementValues<LogEntry> logEntryValues(int jobExececutionId) {
190
+
191
+		return (conn, statement, index, currentRow, le) -> {
192
+
193
+			statement.setInt(index++, jobExececutionId);
194
+			statement.setString(index++, le.severity.name());
195
+			statement.setTimestamp(index++, new Timestamp(le.timestamp));
196
+			statement.setString(index++, le.locale);
197
+			statement.setString(index++, le.content);
198
+
199
+			return index;
200
+
201
+		};
202
+
203
+	}
204
+
205
+	public static ScheduledJobStore.EntityPopulator<LogEntry> logEntryPopulator() {
206
+		return new ScheduledJobStore.EntityPopulator<LogEntry>() {
207
+
208
+			@Override
209
+			public int populate(ResultSet rs, int index, LogEntry value) throws SQLException {
210
+
211
+				value.severity = LogLevel.valueOf(rs.getString(index++));
212
+				value.timestamp = rs.getTimestamp(index++).getTime();
213
+				value.locale = rs.getString(index++);
214
+				value.content = rs.getString(index++);
215
+
216
+				return index;
217
+			}
218
+
219
+		};
220
+	}
221
+
222
+	public static ScheduledJobStore.StatementValues<JobPlanification> planValues(Job job) {
223
+
224
+		return (conn, statement, index, currentRow, jp) -> {
225
+
226
+			statement.setString(index++, jp.kind.name());
227
+
228
+			if (jp.kind == PlanKind.SCHEDULED) {
229
+				statement.setString(index++, jp.rec.cronString);
230
+			} else {
231
+				statement.setNull(index++, Types.VARCHAR);
232
+			}
233
+
234
+			statement.setBoolean(index++, job.sendReport);
235
+			statement.setString(index++, job.recipients);
236
+			statement.setString(index++, job.id);
237
+			statement.setString(index++, jp.domain);
238
+
239
+			return index;
240
+
241
+		};
242
+	}
243
+
88 244
 }
... ...
@@ -18,18 +18,11 @@
18 18
  */
19 19
 package net.bluemind.scheduledjob.persistance;
20 20
 
21
-import java.sql.Connection;
22
-import java.sql.PreparedStatement;
23 21
 import java.sql.ResultSet;
24 22
 import java.sql.SQLException;
25
-import java.sql.Statement;
26 23
 import java.sql.Timestamp;
27
-import java.sql.Types;
28
-import java.text.ParseException;
29 24
 import java.util.ArrayList;
30
-import java.util.Calendar;
31 25
 import java.util.Collection;
32
-import java.util.Date;
33 26
 import java.util.HashMap;
34 27
 import java.util.LinkedHashSet;
35 28
 import java.util.List;
... ...
@@ -38,7 +31,6 @@ import java.util.Set;
38 38
 
39 39
 import javax.sql.DataSource;
40 40
 
41
-import org.quartz.CronExpression;
42 41
 import org.slf4j.Logger;
43 42
 import org.slf4j.LoggerFactory;
44 43
 
... ...
@@ -46,22 +38,23 @@ import net.bluemind.core.api.ListResult;
46 46
 import net.bluemind.core.api.fault.ServerFault;
47 47
 import net.bluemind.core.context.SecurityContext;
48 48
 import net.bluemind.core.jdbc.JdbcAbstractStore;
49
-import net.bluemind.core.jdbc.JdbcHelper;
50 49
 import net.bluemind.scheduledjob.api.Job;
51
-import net.bluemind.scheduledjob.api.JobDomainStatus;
52 50
 import net.bluemind.scheduledjob.api.JobExecution;
53 51
 import net.bluemind.scheduledjob.api.JobExecutionQuery;
54 52
 import net.bluemind.scheduledjob.api.JobExitStatus;
55 53
 import net.bluemind.scheduledjob.api.JobPlanification;
56 54
 import net.bluemind.scheduledjob.api.JobQuery;
57
-import net.bluemind.scheduledjob.api.JobRec;
58 55
 import net.bluemind.scheduledjob.api.LogEntry;
59
-import net.bluemind.scheduledjob.api.LogLevel;
60
-import net.bluemind.scheduledjob.api.PlanKind;
61 56
 
62 57
 public class ScheduledJobStore extends JdbcAbstractStore {
63 58
 
64 59
 	private static final Logger logger = LoggerFactory.getLogger(ScheduledJobStore.class);
60
+	private static final Creator<Integer> INTEGER_CREATOR = new Creator<Integer>() {
61
+		@Override
62
+		public Integer create(ResultSet con) throws SQLException {
63
+			return con.getInt(1);
64
+		}
65
+	};
65 66
 
66 67
 	public ScheduledJobStore(DataSource dataSource) {
67 68
 		super(dataSource);
... ...
@@ -81,52 +74,26 @@ public class ScheduledJobStore extends JdbcAbstractStore {
81 81
 		logger.debug("recording execution for {}", je);
82 82
 
83 83
 		return inTransaction(() -> {
84
-			Connection con = null;
85
-			PreparedStatement ps = null;
86
-
87
-			try {
88
-				con = getConnection();
89
-				ps = con.prepareStatement("INSERT INTO t_job_execution"
90
-						+ " (exec_group, domain_name, job_id, exec_start, exec_end, status)"
91
-						+ " VALUES (?, ?, ?, ?, ?, ?::t_job_exit_status)");
92
-
93
-				int idx = 1;
94
-				ps.setString(idx++, je.execGroup);
95
-				ps.setString(idx++, je.domainName);
96
-				ps.setString(idx++, je.jobId);
97
-				if (je.startDate != null) {
98
-					ps.setTimestamp(idx++, new Timestamp(je.startDate.getTime()));
99
-				} else {
100
-					ps.setNull(idx++, Types.TIMESTAMP);
101
-				}
84
+			String query = "insert into t_job_execution (" + JobExecutionColumn.cols.names() + ") values ("
85
+					+ JobExecutionColumn.cols.values() + ")";
102 86
 
103
-				if (je.endDate != null) {
104
-					ps.setTimestamp(idx++, new Timestamp(je.endDate.getTime()));
105
-				} else {
106
-					ps.setNull(idx++, Types.TIMESTAMP);
107
-				}
108
-				ps.setString(idx++, je.status.name());
109
-				ps.executeUpdate();
110
-				ps.close();
111
-				ps = null;
112
-
113
-				int id = lastInsertId(con);
114
-				je.id = id;
115
-
116
-				idx = 1;
117
-				ps = con.prepareStatement("UPDATE t_job_plan SET last_run = ? WHERE domain_name = ? AND job_id = ?");
118
-				ps.setInt(idx++, id);
119
-				ps.setString(idx++, je.domainName);
120
-				ps.setString(idx++, je.jobId);
121
-				ps.executeUpdate();
122
-
123
-				logger.debug("run plan for {}@{} updated with last_run set to id {} (start: {})", je.jobId,
124
-						je.domainName, id, je.startDate);
125
-
126
-				return je;
127
-			} finally {
128
-				JdbcHelper.cleanup(con, null, ps);
87
+			Timestamp startDate = null;
88
+			if (je.startDate != null) {
89
+				startDate = new Timestamp(je.startDate.getTime());
90
+			}
91
+
92
+			Timestamp endDate = null;
93
+			if (je.endDate != null) {
94
+				endDate = new Timestamp(je.endDate.getTime());
129 95
 			}
96
+
97
+			je.id = insertWithSerial(query,
98
+					new Object[] { je.execGroup, je.domainName, je.jobId, startDate, endDate, je.status.name() });
99
+
100
+			String updateQuery = "update t_job_plan set last_run = ? where domain_name = ? and job_id = ?";
101
+			update(updateQuery, new Object[] { je.id, je.domainName, je.jobId });
102
+
103
+			return je;
130 104
 		});
131 105
 
132 106
 	}
... ...
@@ -137,27 +104,8 @@ public class ScheduledJobStore extends JdbcAbstractStore {
137 137
 	 */
138 138
 	public void delete(int jobExecutionId) throws ServerFault {
139 139
 		inTransaction(() -> {
140
-			StringBuilder q2 = new StringBuilder();
141
-			q2.append(" UPDATE t_job_plan SET last_run=NULL WHERE last_run=").append(jobExecutionId);
142
-
143
-			StringBuilder q = new StringBuilder();
144
-			q.append(" DELETE FROM t_job_execution WHERE id=").append(jobExecutionId);
145
-
146
-			String upd = q2.toString();
147
-			String query = q.toString();
148
-			Connection con = null;
149
-			Statement ps = null;
150
-
151
-			try {
152
-				con = getConnection();
153
-				ps = con.createStatement();
154
-				ps.executeUpdate(upd);
155
-
156
-				int cnt = ps.executeUpdate(query);
157
-				logger.debug("{} execution(s) deleted.", cnt);
158
-			} finally {
159
-				JdbcHelper.cleanup(con, null, ps);
160
-			}
140
+			update("update t_job_plan set last_run = null where last_run = ?", new Object[] { jobExecutionId });
141
+			delete("delete from t_job_execution where id = ?", new Object[] { jobExecutionId });
161 142
 			return null;
162 143
 		});
163 144
 	}
... ...
@@ -165,104 +113,31 @@ public class ScheduledJobStore extends JdbcAbstractStore {
165 165
 	/**
166 166
 	 * @param jobId
167 167
 	 * @return
168
-	 * @throws ServerFault
168
+	 * @throws SQLException
169 169
 	 */
170
-	public Job getJobFromId(String jobId) throws ServerFault {
171
-		Job j = null;
172
-
173
-		StringBuilder q = new StringBuilder();
174
-		q.append(" SELECT");
175
-		q.append(" kind,");
176
-		q.append(" send_report,");
177
-		q.append(" report_recipients");
178
-		q.append(" FROM t_job_plan");
179
-		q.append(" WHERE job_id = ?");
180
-
181
-		Connection con = null;
182
-		PreparedStatement ps = null;
183
-		ResultSet rs = null;
184
-
185
-		try {
186
-			con = getConnection();
187
-			ps = con.prepareStatement(q.toString());
188
-			ps.setString(1, jobId);
189
-
190
-			rs = ps.executeQuery();
191
-			if (rs.next()) {
192
-				j = new Job();
193
-				j.id = jobId;
194
-				j.sendReport = rs.getBoolean(2);
195
-				String recipients = rs.getString(3);
196
-				if (recipients != null && !recipients.isEmpty()) {
197
-					j.recipients = recipients;
198
-				}
199
-
200
-				// FIXME: load status here ?
201
-			}
202
-		} catch (Exception t) {
203
-			throw new ServerFault(t);
204
-		} finally {
205
-			JdbcHelper.cleanup(con, rs, ps);
206
-		}
207
-
208
-		return j;
170
+	public Job getJobFromId(String jobId) throws SQLException {
171
+		String query = "select job_id, send_report, report_recipients from t_job_plan where job_id = ?";
172
+		return unique(query, JobExecutionColumn.jobCreator(), JobExecutionColumn.jobPopulator(),
173
+				new Object[] { jobId });
209 174
 	}
210 175
 
211 176
 	/**
212
-	 * @param con
213
-	 * @param domainName
177
+	 * @param d
214 178
 	 * @param jid
215
-	 * @throws ServerFault
216 179
 	 */
217
-	private void ensureDefaultPlan(Connection con, String domainName, String jid) throws ServerFault {
218
-		PreparedStatement ps = null;
219
-		ResultSet rs = null;
220
-
221
-		String query = "SELECT 1 FROM t_job_plan WHERE domain_name=? AND job_id=?";
222
-
223
-		try {
224
-			ps = con.prepareStatement(query);
225
-			ps.setString(1, domainName);
226
-			ps.setString(2, jid);
227
-			rs = ps.executeQuery();
228
-			if (!rs.next()) {
229
-				rs.close();
230
-				rs = null;
231
-				ps.close();
232
-				ps = null;
180
+	public void ensureDefaultPlan(String domainName, String jid) {
181
+		inTransaction(() -> {
182
+			String query = "SELECT 1 FROM t_job_plan WHERE domain_name=? AND job_id=?";
183
+			Integer id = unique(query, INTEGER_CREATOR, new ArrayList<EntityPopulator<Integer>>(0),
184
+					new Object[] { domainName, jid });
233 185
 
186
+			if (id == null) {
234 187
 				query = "INSERT INTO t_job_plan (domain_name, job_id) VALUES (?, ?)";
235
-				ps = con.prepareStatement(query);
236
-				ps.setString(1, domainName);
237
-				ps.setString(2, jid);
238
-				ps.executeUpdate();
239
-
240
-				logger.debug("ensureDefaultPlan success !");
188
+				insert(query, new Object[] { domainName, jid });
241 189
 			}
242
-		} catch (Exception t) {
243
-			logger.error("ensureDefaultPlan fail !", t);
244
-			throw new ServerFault(t);
245
-		} finally {
246
-			JdbcHelper.cleanup(null, rs, ps);
247
-		}
248
-
249
-	}
190
+			return null;
191
+		});
250 192
 
251
-	/**
252
-	 * @param d
253
-	 * @param jid
254
-	 * @throws ServerFault
255
-	 */
256
-	public void ensureDefaultPlan(String domainName, String jid) throws ServerFault {
257
-		Connection con = null;
258
-		try {
259
-			con = getConnection();
260
-			ensureDefaultPlan(con, domainName, jid);
261
-		} catch (Exception t) {
262
-			throw new ServerFault(t);
263
-		} finally {
264
-			JdbcHelper.cleanup(con, null, null);
265
-		}
266 193
 	}
267 194
 
268 195
 	/**
... ...
@@ -271,55 +146,20 @@ public class ScheduledJobStore extends JdbcAbstractStore {
271 271
 	 * @throws ServerFault
272 272
 	 */
273 273
 	public ListResult<JobExecution> findExecutions(JobExecutionQuery jeq) throws ServerFault {
274
-		Connection con = null;
275
-		PreparedStatement ps = null;
276
-		ResultSet rs = null;
277
-		try {
278
-			con = getConnection();
279
-
280
-			int total = 0;
281
-
282
-			if (jeq.size != -1) {
283
-				ps = con.prepareStatement(composeQuery(jeq, true));
284
-				if (jeq.id <= 0 && jeq.jobId != null) {
285
-					ps.setString(1, jeq.jobId);
286
-				}
287
-				rs = ps.executeQuery();
288
-				rs.next();
289
-				total = rs.getInt(1);
290
-				rs.close();
291
-				rs = null;
292
-				ps.close();
293
-				ps = null;
294
-			}
295
-
296
-			ListResult<JobExecution> ret = new ListResult<JobExecution>();
297
-			ps = con.prepareStatement(composeQuery(jeq, false));
274
+		return inTransaction(() -> {
298 275
 
276
+			Object[] params = new Object[] {};
299 277
 			if (jeq.id <= 0 && jeq.jobId != null) {
300
-				ps.setString(1, jeq.jobId);
278
+				params = new Object[] { jeq.jobId };
301 279
 			}
302 280
 
303
-			List<JobExecution> jobs = new ArrayList<JobExecution>();
304
-
305
-			rs = ps.executeQuery();
306
-			while (rs.next()) {
307
-				JobExecution je = new JobExecution();
308
-
309
-				int idx = 1;
310
-				je.id = rs.getInt(idx++);
311
-				je.execGroup = rs.getString(idx++);
312
-				je.domainName = rs.getString(idx++);
313
-				je.jobId = rs.getString(idx++);
314
-				je.startDate = new Date(rs.getTimestamp(idx++).getTime());
315
-				Timestamp endDate = rs.getTimestamp(idx++);
316
-				if (endDate != null) {
317
-					je.endDate = new Date(endDate.getTime());
318
-				}
319
-				je.status = JobExitStatus.valueOf(rs.getString(idx++));
281
+			Integer total = unique(composeQuery(jeq, true), INTEGER_CREATOR, new ArrayList<EntityPopulator<Integer>>(0),
282
+					params);
320 283
 
321
-				jobs.add(je);
322
-			}
284
+			List<JobExecution> jobs = select(composeQuery(jeq, false), JobExecutionColumn.jobExecutionCreator(),
285
+					JobExecutionColumn.jobExecutionPopulator(), params);
286
+
287
+			ListResult<JobExecution> ret = new ListResult<JobExecution>();
323 288
 
324 289
 			ret.values = jobs;
325 290
 
... ...
@@ -329,14 +169,8 @@ public class ScheduledJobStore extends JdbcAbstractStore {
329 329
 				ret.total = jobs.size();
330 330
 			}
331 331
 
332
-			logger.debug("fetched {} job executions.", ret.total);
333 332
 			return ret;
334
-		} catch (Exception t) {
335
-			logger.error(t.getMessage(), t);
336
-			throw new ServerFault(t);
337
-		} finally {
338
-			JdbcHelper.cleanup(con, rs, ps);
339
-		}
333
+		});
340 334
 	}
341 335
 
342 336
 	/**
... ...
@@ -406,24 +240,22 @@ public class ScheduledJobStore extends JdbcAbstractStore {
406 406
 	 * @param context
407 407
 	 * @param jq
408 408
 	 * @param ret
409
-	 * @throws ServerFault
410 409
 	 */
411
-	public void loadStatusesAndPlans(SecurityContext context, JobQuery jq, Collection<Job> ret) throws ServerFault {
410
+	public void loadStatusesAndPlans(SecurityContext context, JobQuery jq, Collection<Job> ret) {
412 411
 		if (ret == null || ret.isEmpty()) {
413 412
 			return;
414 413
 		}
415 414
 
416 415
 		StringBuilder q = new StringBuilder();
417 416
 		q.append(" SELECT");
418
-		q.append(" jp.domain_name,"); // 1
419
-		q.append(" jp.job_id,"); // 2
420
-		q.append(" jp.kind,"); // 3
421
-		q.append(" jp.cron,"); // 4
422
-		q.append(" je.exec_start,"); // 5
423
-		q.append(" je.exec_end,"); // 6
424
-		q.append(" je.status,"); // 7
425
-		q.append(" jp.send_report,"); // 8
426
-		q.append(" jp.report_recipients"); // 9
417
+		q.append(" jp.domain_name,");
418
+		q.append(" jp.job_id,");
419
+		q.append(" jp.kind,");
420
+		q.append(" je.exec_start,");
421
+		q.append(" jp.cron,");
422
+		q.append(" je.status,");
423
+		q.append(" jp.send_report,");
424
+		q.append(" jp.report_recipients");
427 425
 		q.append(" FROM t_job_plan jp");
428 426
 
429 427
 		if (jq == null || jq.statuses == null) {
... ...
@@ -472,87 +304,13 @@ public class ScheduledJobStore extends JdbcAbstractStore {
472 472
 
473 473
 		q.append("ORDER BY jp.job_id, jp.domain_name");
474 474
 
475
-		String query = q.toString();
476
-		Connection con = null;
477
-		PreparedStatement ps = null;
478
-		ResultSet rs = null;
479
-		try {
480
-			con = getConnection();
481
-			ps = con.prepareStatement(query);
482
-			int idx = 1;
483
-			for (Job j : ret) {
484
-				ps.setString(idx++, j.id);
485
-			}
486
-			rs = ps.executeQuery();
487
-
488
-			String lastJid = null;
489
-			int cnt = 0;
490
-			while (rs.next()) {
491
-				cnt++;
492
-				String domainName = rs.getString(1);
493
-				String jid = rs.getString(2);
494
-				if (!jid.equals(lastJid)) {
495
-					if (lastJid != null) {
496
-						idIndex.remove(lastJid);
497
-					}
498
-					lastJid = jid;
499
-				}
500
-
501
-				Job current = idIndex.get(jid);
502
-				List<JobDomainStatus> domainStatus = current.domainStatus;
503
-				List<JobPlanification> domainPlanification = current.domainPlanification;
504
-				JobPlanification jp = new JobPlanification();
505
-				jp.kind = PlanKind.valueOf(rs.getString(3));
506
-				jp.lastRun = rs.getTimestamp(5);
507
-				jp.domain = domainName;
508
-				if (jp.kind == PlanKind.SCHEDULED) {
509
-					JobRec rec = new JobRec();
510
-					String cs = rs.getString(4);
511
-					rec.cronString = cs;
512
-					jp.rec = rec;
513
-					if (cs != null) {
514
-						try {
515
-							CronExpression ce = new CronExpression(cs);
516
-							Date nextRun = null;
517
-							if (jp.lastRun != null) {
518
-								logger.debug("lastRun: " + jp.lastRun);
519
-								nextRun = ce.getNextValidTimeAfter(jp.lastRun);
520
-							} else {
521
-								Calendar cal = Calendar.getInstance();
522
-								cal.add(Calendar.MINUTE, -1);
523
-								nextRun = ce.getNextValidTimeAfter(cal.getTime());
524
-							}
525
-							jp.nextRun = nextRun;
526
-						} catch (ParseException pe) {
527
-							logger.error("Invalid cron string: '" + cs + "' (" + pe.getMessage() + ")");
528
-						}
529
-					}
530
-				}
531
-				domainPlanification.add(jp);
532
-
533
-				String statusString = rs.getString(7);
534
-				if (statusString != null) {
535
-					JobDomainStatus ds = new JobDomainStatus();
536
-					ds.domain = domainName;
537
-					ds.status = JobExitStatus.valueOf(statusString);
538
-					domainStatus.add(ds);
539
-				} else {
540
-					logger.warn("No recorded execution in database for " + current.id + "@" + domainName);
541
-				}
542
-				current.sendReport = rs.getBoolean(8);
543
-				current.recipients = rs.getString(9);
544
-			}
545
-			if (lastJid != null) {
546
-				idIndex.remove(lastJid);
547
-			}
475
+		inTransaction(() -> {
476
+			select(q.toString(), JobExecutionColumn.jobCreator(),
477
+					JobExecutionColumn.jobStatusAndPlansPopulator(idIndex),
478
+					ret.stream().map(j -> j.id).toArray(String[]::new));
479
+			return null;
480
+		});
548 481
 
549
-			logger.debug("Loaded {} jobs from database.", cnt);
550
-		} catch (Exception t) {
551
-			logger.error(t.getMessage(), t);
552
-			throw new ServerFault(t);
553
-		} finally {
554
-			JdbcHelper.cleanup(con, rs, ps);
555
-		}
556 482
 	}
557 483
 
558 484
 	/**
... ...
@@ -567,40 +325,8 @@ public class ScheduledJobStore extends JdbcAbstractStore {
567 567
 
568 568
 		inTransaction(() -> {
569 569
 			logger.debug("saving {} entries...", entries.size());
570
-
571
-			StringBuilder q = new StringBuilder();
572
-			q.append(" INSERT INTO t_job_log_entry");
573
-			q.append(" (execution_id, severity, stamp, locale, content)");
574
-			q.append(" VALUES (?, ?::t_entry_log_level, ?, ?, ?)");
575
-
576
-			String query = q.toString();
577
-			Connection con = null;
578
-			PreparedStatement ps = null;
579
-			try {
580
-				con = getConnection();
581
-				ps = con.prepareStatement(query);
582
-
583
-				for (LogEntry le : entries) {
584
-					int idx = 1;
585
-					ps.setInt(idx++, jobExececutionId);
586
-					ps.setString(idx++, le.severity.name());
587
-					ps.setTimestamp(idx++, new Timestamp(le.timestamp));
588
-					ps.setString(idx++, le.locale);
589
-					ps.setString(idx++, le.content);
590
-					ps.addBatch();
591
-				}
592
-				ps.executeBatch();
593
-			} catch (SQLException e) {
594
-				logger.warn("Cannot save Job log entry", e);
595
-				SQLException nextException = e.getNextException();
596
-				if (null != nextException) {
597
-					logger.warn("Next Exception: Cannot save Job log entry", nextException);
598
-					throw new ServerFault(nextException);
599
-				}
600
-				throw new ServerFault(e);
601
-			} finally {
602
-				JdbcHelper.cleanup(con, null, ps);
603
-			}
570
+			String query = "insert into t_job_log_entry (execution_id, severity, stamp, locale, content) values (?, ?::t_entry_log_level, ?, ?, ?)";
571
+			batchInsert(query, entries, JobExecutionColumn.logEntryValues(jobExececutionId));
604 572
 			return null;
605 573
 		});
606 574
 	}
... ...
@@ -609,52 +335,35 @@ public class ScheduledJobStore extends JdbcAbstractStore {
609 609
 	 * @param context
610 610
 	 * @param execId
611 611
 	 * @return
612
-	 * @throws ServerFault
613 612
 	 */
614
-	public Set<LogEntry> fetchLogEntries(SecurityContext context, int execId) throws ServerFault {
615
-
616
-		StringBuilder q = new StringBuilder();
617
-		q.append(" SELECT");
618
-		q.append(" severity, stamp, locale, content");
619
-		q.append(" FROM t_job_log_entry");
613
+	public Set<LogEntry> fetchLogEntries(SecurityContext context, int execId) {
614
+		return inTransaction(() -> {
620 615
 
621
-		if (!context.isDomainGlobal()) {
622
-			// ensure we can't read log from another domain using an id
623
-			q.append(" INNER JOIN t_job_execution ON t_job_execution.id=execution_id");
624
-			q.append(" WHERE t_job_execution.domain_name='").append(context.getContainerUid()).append("'");
625
-			q.append(" AND execution_id=").append(execId);
626
-		} else {
627
-			q.append(" WHERE execution_id=").append(execId);
628
-		}
616
+			StringBuilder q = new StringBuilder();
617
+			q.append(" SELECT");
618
+			q.append(" severity, stamp, locale, content");
619
+			q.append(" FROM t_job_log_entry");
620
+
621
+			Object[] params;
622
+			if (!context.isDomainGlobal()) {
623
+				// ensure we can't read log from another domain using an id
624
+				q.append(" INNER JOIN t_job_execution ON t_job_execution.id=execution_id");
625
+				q.append(" WHERE t_job_execution.domain_name = ?");
626
+				q.append(" AND execution_id = ?");
627
+				params = new Object[] { context.getContainerUid(), execId };
628
+			} else {
629
+				q.append(" WHERE execution_id = ?");
630
+				params = new Object[] { execId };
629 631
 
630
-		// FIXIT-9: limit 20000 to prevent hprof when we saved loads of crap
631
-		q.append(" ORDER BY stamp ASC LIMIT 20000");
632
-		String query = q.toString();
633
-
634
-		Connection con = null;
635
-		PreparedStatement ps = null;
636
-		ResultSet rs = null;
637
-		try {
638
-			con = getConnection();
639
-			ps = con.prepareStatement(query);
640
-			rs = ps.executeQuery();
641
-
642
-			Set<LogEntry> ret = new LinkedHashSet<LogEntry>();
643
-			while (rs.next()) {
644
-				LogEntry le = new LogEntry();
645
-				le.severity = LogLevel.valueOf(rs.getString(1));
646
-				le.timestamp = rs.getTimestamp(2).getTime();
647
-				le.locale = rs.getString(3);
648
-				le.content = rs.getString(4);
649
-				ret.add(le);
650 632
 			}
651 633
 
652
-			return ret;
653
-		} catch (Exception t) {
654
-			throw new ServerFault(t);
655
-		} finally {
656
-			JdbcHelper.cleanup(con, rs, ps);
657
-		}
634
+			// FIXIT-9: limit 20000 to prevent hprof when we saved loads of crap
635
+			q.append(" ORDER BY stamp ASC LIMIT 20000");
636
+			String query = q.toString();
637
+
638
+			return new LinkedHashSet<LogEntry>(select(query, JobExecutionColumn.logEntryCreator(),
639
+					JobExecutionColumn.logEntryPopulator(), params));
640
+		});
658 641
 	}
659 642
 
660 643
 	/**
... ...
@@ -665,66 +374,23 @@ public class ScheduledJobStore extends JdbcAbstractStore {
665 665
 		List<JobPlanification> plans = job.domainPlanification;
666 666
 
667 667
 		inTransaction(() -> {
668
-			StringBuilder q = new StringBuilder();
669
-			q.append(" UPDATE t_job_plan SET");
670
-			q.append(" kind = ?::t_job_plan_kind,");
671
-			q.append(" cron = ?,");
672
-			q.append(" send_report = ?,");
673
-			q.append(" report_recipients = ? ");
674
-			q.append(" WHERE 1>0");
675
-			q.append(" AND job_id = ?");
676
-			q.append(" AND domain_name = ?");
677
-
678
-			String query = q.toString();
679
-
680
-			Connection con = null;
681
-			PreparedStatement ps = null;
682
-			try {
683
-				con = getConnection();
684
-
685
-				for (JobPlanification jp : plans) {
686
-					ensureDefaultPlan(con, jp.domain, job.id);
687
-
688
-				}
689
-
690
-				ps = con.prepareStatement(query);
691
-				for (JobPlanification jp : plans) {
692
-					updatePlan(ps, job, jp);
693
-				}
694
-				int[] updated = ps.executeBatch();
695
-				logger.debug("{} plan rows updated.", updated.length);
696
-			} finally {
697
-				JdbcHelper.cleanup(con, null, ps);
668
+			for (JobPlanification jp : plans) {
669
+				ensureDefaultPlan(jp.domain, job.id);
698 670
 			}
671
+
672
+			String query = "UPDATE t_job_plan SET" //
673
+					+ " kind = ?::t_job_plan_kind,"//
674
+					+ " cron = ?,"//
675
+					+ " send_report = ?,"//
676
+					+ " report_recipients = ? "//
677
+					+ " WHERE 1>0"//
678
+					+ " AND job_id = ?"//
679
+					+ " AND domain_name = ?";
680
+			batchInsert(query, plans, JobExecutionColumn.planValues(job));
681
+
682
+			logger.debug("{} plan rows updated.", plans.size());
699 683
 			return null;
700 684
 		});
701 685
 	}
702 686
 
703
-	/**
704
-	 * @param ps
705
-	 * @param job
706
-	 * @param jp
707
-	 * @throws ServerFault
708
-	 * @throws SQLException
709
-	 */
710
-	private void updatePlan(PreparedStatement ps, Job job, JobPlanification jp) throws ServerFault, SQLException {
711
-
712
-		logger.debug("updating plan for {}@{}", job.id, jp.domain);
713
-
714
-		int idx = 1;
715
-		ps.setString(idx++, jp.kind.name());
716
-
717
-		if (jp.kind == PlanKind.SCHEDULED) {
718
-			ps.setString(idx++, jp.rec.cronString);
719
-		} else {
720
-			ps.setNull(idx++, Types.VARCHAR);
721
-		}
722
-
723
-		ps.setBoolean(idx++, job.sendReport);
724
-		ps.setString(idx++, job.recipients);
725
-		ps.setString(idx++, job.id);
726
-		ps.setString(idx++, jp.domain);
727
-		ps.addBatch();
728
-	}
729
-
730 687
 }