Skip to content

Commit 6073b44

Browse files
Merge pull request #207 from wttech/script-scheduler-job-consumer
Script scheduler as job consumer
2 parents 9dd6ef6 + 426c84d commit 6073b44

File tree

4 files changed

+142
-75
lines changed

4 files changed

+142
-75
lines changed

core/src/main/java/dev/vml/es/acm/core/script/AutomaticScriptScheduler.java renamed to core/src/main/java/dev/vml/es/acm/core/script/ScriptScheduler.java

Lines changed: 132 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package dev.vml.es.acm.core.script;
22

3-
import dev.vml.es.acm.core.AcmConstants;
43
import dev.vml.es.acm.core.code.*;
54
import dev.vml.es.acm.core.code.schedule.BootSchedule;
65
import dev.vml.es.acm.core.code.schedule.CronSchedule;
@@ -15,19 +14,27 @@
1514
import dev.vml.es.acm.core.repo.Repo;
1615
import dev.vml.es.acm.core.util.ChecksumUtils;
1716
import dev.vml.es.acm.core.util.ResolverUtils;
17+
import java.util.Arrays;
18+
import java.util.Collection;
19+
import java.util.Collections;
20+
import java.util.Date;
21+
import java.util.HashMap;
1822
import java.util.List;
1923
import java.util.Map;
2024
import java.util.concurrent.ConcurrentHashMap;
21-
import java.util.concurrent.CopyOnWriteArrayList;
25+
import java.util.stream.Stream;
26+
2227
import org.apache.commons.lang3.StringUtils;
2328
import org.apache.sling.api.resource.LoginException;
2429
import org.apache.sling.api.resource.ResourceResolver;
2530
import org.apache.sling.api.resource.ResourceResolverFactory;
2631
import org.apache.sling.api.resource.observation.ResourceChange;
2732
import org.apache.sling.api.resource.observation.ResourceChangeListener;
28-
import org.apache.sling.commons.scheduler.Job;
29-
import org.apache.sling.commons.scheduler.ScheduleOptions;
30-
import org.apache.sling.commons.scheduler.Scheduler;
33+
import org.apache.sling.event.jobs.Job;
34+
import org.apache.sling.event.jobs.JobBuilder;
35+
import org.apache.sling.event.jobs.JobManager;
36+
import org.apache.sling.event.jobs.ScheduledJobInfo;
37+
import org.apache.sling.event.jobs.consumer.JobConsumer;
3138
import org.osgi.service.component.annotations.*;
3239
import org.osgi.service.metatype.annotations.AttributeDefinition;
3340
import org.osgi.service.metatype.annotations.Designate;
@@ -36,26 +43,47 @@
3643
import org.slf4j.LoggerFactory;
3744

3845
@Component(
39-
service = {ResourceChangeListener.class, EventListener.class},
46+
service = {ResourceChangeListener.class, EventListener.class, JobConsumer.class},
4047
immediate = true,
4148
property = {
4249
ResourceChangeListener.PATHS + "=glob:" + ScriptRepository.ROOT + "/automatic/**/*.groovy",
4350
ResourceChangeListener.CHANGES + "=ADDED",
4451
ResourceChangeListener.CHANGES + "=CHANGED",
45-
ResourceChangeListener.CHANGES + "=REMOVED"
52+
ResourceChangeListener.CHANGES + "=REMOVED",
53+
JobConsumer.PROPERTY_TOPICS + "=" + ScriptScheduler.JOB_TOPIC
4654
})
47-
@Designate(ocd = AutomaticScriptScheduler.Config.class)
48-
public class AutomaticScriptScheduler implements ResourceChangeListener, EventListener {
55+
@Designate(ocd = ScriptScheduler.Config.class)
56+
public class ScriptScheduler implements ResourceChangeListener, EventListener, JobConsumer {
57+
58+
private static final Logger LOG = LoggerFactory.getLogger(ScriptScheduler.class);
59+
60+
public static final String JOB_TOPIC = "dev/vml/es/acm/ScriptScheduler";
61+
62+
public static final String JOB_PROP_TYPE = "type";
4963

50-
private static final Logger LOG = LoggerFactory.getLogger(AutomaticScriptScheduler.class);
64+
public static final String JOB_PROP_SCRIPT_PATH = "scriptPath";
5165

52-
private static final String BOOT_JOB_NAME = AcmConstants.CODE + "-boot";
66+
public enum JobType {
67+
BOOT,
68+
CRON;
69+
70+
public static JobType of(String value) {
71+
return Arrays.stream(values())
72+
.filter(v -> StringUtils.equalsIgnoreCase(v.name(), value))
73+
.findFirst()
74+
.orElseThrow(
75+
() -> new IllegalArgumentException("Script scheduler job type is unsupported: " + value));
76+
}
77+
}
5378

5479
@ObjectClassDefinition(
55-
name = "AEM Content Manager - Automatic Script Scheduler",
80+
name = "AEM Content Manager - Script Scheduler",
5681
description = "Schedules automatic scripts on instance up and script changes")
5782
public @interface Config {
5883

84+
@AttributeDefinition(name = "Boot Delay", description = "Time in milliseconds to wait before booting scripts")
85+
long bootDelay() default 1000 * 10; // 10 seconds
86+
5987
@AttributeDefinition(
6088
name = "User Impersonation ID",
6189
description =
@@ -81,9 +109,7 @@ public class AutomaticScriptScheduler implements ResourceChangeListener, EventLi
81109

82110
private Boolean instanceReady;
83111

84-
private final Map<String, String> booted = new ConcurrentHashMap<>();
85-
86-
private final List<String> scheduled = new CopyOnWriteArrayList<>();
112+
private final Map<String, String> bootedScripts = new ConcurrentHashMap<>();
87113

88114
@Reference
89115
private ResourceResolverFactory resourceResolverFactory;
@@ -95,7 +121,7 @@ public class AutomaticScriptScheduler implements ResourceChangeListener, EventLi
95121
private Executor executor;
96122

97123
@Reference
98-
private Scheduler scheduler;
124+
private JobManager jobManager;
99125

100126
@Reference
101127
private HealthChecker healthChecker;
@@ -123,7 +149,7 @@ protected void modify(Config config) {
123149
protected void deactivate() {
124150
unscheduleBoot();
125151
unscheduleScripts();
126-
booted.clear();
152+
bootedScripts.clear();
127153
instanceReady = null;
128154
}
129155

@@ -137,8 +163,19 @@ public void onChange(List<ResourceChange> changes) {
137163
@Override
138164
public void onEvent(Event event) {
139165
EventType eventType = EventType.of(event.getName()).orElse(null);
140-
if (eventType == EventType.SCRIPT_SCHEDULER_BOOT) {
141-
bootOnDemand();
166+
if (eventType == null) {
167+
return;
168+
}
169+
switch (eventType) {
170+
case SCRIPT_SCHEDULER_BOOT:
171+
bootOnDemand();
172+
break;
173+
case EXECUTOR_RESET:
174+
reset();
175+
break;
176+
default:
177+
// ignore else
178+
break;
142179
}
143180
}
144181

@@ -149,7 +186,6 @@ public void bootOnDemand() {
149186
LOG.info("Automatic scripts booting on demand - job scheduled");
150187
}
151188

152-
// TODO on AEMaaCS scheduler refuses to schedule job during activate
153189
private void bootWhenInstanceUp() {
154190
LOG.info("Automatic scripts booting on instance up - job scheduling");
155191
unscheduleBoot();
@@ -164,19 +200,35 @@ private void bootWhenScriptsChanged() {
164200
LOG.info("Automatic scripts booting on script changes - job scheduled");
165201
}
166202

203+
@SuppressWarnings("unchecked")
167204
private void unscheduleBoot() {
168-
scheduler.unschedule(BOOT_JOB_NAME);
205+
Collection<ScheduledJobInfo> jobInfos = jobManager.getScheduledJobs(
206+
JOB_TOPIC, -1, Collections.singletonMap(JOB_PROP_TYPE, JobType.BOOT.name()));
207+
for (ScheduledJobInfo jobInfo : jobInfos) {
208+
jobInfo.unschedule();
209+
}
169210
}
170211

171212
private void scheduleBoot() {
172-
scheduler.schedule(bootJob(), configureScheduleOptions(BOOT_JOB_NAME, scheduler.NOW()));
213+
JobBuilder jobBuilder = jobManager.createJob(JOB_TOPIC);
214+
jobBuilder.properties(Collections.singletonMap(JOB_PROP_TYPE, JobType.BOOT.name()));
215+
JobBuilder.ScheduleBuilder scheduleBuilder = jobBuilder.schedule();
216+
scheduleBuilder.at(new Date(System.currentTimeMillis() + config.bootDelay()));
217+
scheduleBuilder.add();
173218
}
174219

175-
private ScheduleOptions configureScheduleOptions(String name, ScheduleOptions options) {
176-
options.name(name);
177-
options.canRunConcurrently(false);
178-
options.onSingleInstanceOnly(true);
179-
return options;
220+
@Override
221+
public JobResult process(Job job) {
222+
switch (JobType.of(job.getProperty(JOB_PROP_TYPE, String.class))) {
223+
case BOOT:
224+
bootJob();
225+
break;
226+
case CRON:
227+
String scriptPath = job.getProperty(JOB_PROP_SCRIPT_PATH, String.class);
228+
cronJob(scriptPath);
229+
break;
230+
}
231+
return JobResult.OK;
180232
}
181233

182234
private ScheduleResult determineSchedule(Script script, ResourceResolver resourceResolver) {
@@ -186,18 +238,16 @@ private ScheduleResult determineSchedule(Script script, ResourceResolver resourc
186238
}
187239
}
188240

189-
private Job bootJob() {
190-
return context -> {
191-
LOG.info("Automatic scripts booting - job started");
192-
unscheduleScripts();
193-
if (awaitInstanceHealthy(
194-
"Automatic scripts queueing and scheduling",
195-
config.healthCheckRetryCountBoot(),
196-
config.healthCheckRetryInterval())) {
197-
queueAndScheduleScripts();
198-
}
199-
LOG.info("Automatic scripts booting - job finished");
200-
};
241+
private void bootJob() {
242+
LOG.info("Automatic scripts booting - job started");
243+
unscheduleScripts();
244+
if (awaitInstanceHealthy(
245+
"Automatic scripts queueing and scheduling",
246+
config.healthCheckRetryCountBoot(),
247+
config.healthCheckRetryInterval())) {
248+
queueAndScheduleScripts();
249+
}
250+
LOG.info("Automatic scripts booting - job finished");
201251
}
202252

203253
private boolean checkInstanceReady() {
@@ -216,15 +266,13 @@ private boolean checkInstanceReady() {
216266
return instanceReady;
217267
}
218268

269+
@SuppressWarnings("unchecked")
219270
private void unscheduleScripts() {
220-
for (String scriptPath : scheduled) {
221-
try {
222-
scheduler.unschedule(scriptPath);
223-
} catch (Exception e) {
224-
LOG.error("Cron schedule script '{}' cannot be unscheduled!", scriptPath, e);
225-
}
271+
Collection<ScheduledJobInfo> jobInfos = jobManager.getScheduledJobs(
272+
JOB_TOPIC, -1, Collections.singletonMap(JOB_PROP_TYPE, JobType.CRON.name()));
273+
for (ScheduledJobInfo jobInfo : jobInfos) {
274+
jobInfo.unschedule();
226275
}
227-
scheduled.clear();
228276
}
229277

230278
private void queueAndScheduleScripts() {
@@ -257,11 +305,11 @@ private void queueAndScheduleScripts() {
257305

258306
private void queueBootScript(Script script, ResourceResolver resourceResolver) {
259307
String checksum = ChecksumUtils.calculate(script.getContent());
260-
String previousChecksum = booted.get(script.getId());
308+
String previousChecksum = bootedScripts.get(script.getId());
261309
if (previousChecksum == null || !StringUtils.equals(previousChecksum, checksum)) {
262310
if (checkScript(script, resourceResolver)) {
263311
queueScript(script);
264-
booted.put(script.getId(), checksum);
312+
bootedScripts.put(script.getId(), checksum);
265313
LOG.info("Boot script '{}' queued", script.getId());
266314
} else {
267315
LOG.info("Boot script '{}' not eligible for queueing!", script.getId());
@@ -271,10 +319,14 @@ private void queueBootScript(Script script, ResourceResolver resourceResolver) {
271319

272320
private void scheduleCronScript(Script script, CronSchedule schedule) {
273321
if (StringUtils.isNotBlank(schedule.getExpression())) {
274-
scheduler.schedule(
275-
cronJob(script.getPath()),
276-
configureScheduleOptions(script.getPath(), scheduler.EXPR(schedule.getExpression())));
277-
scheduled.add(script.getPath());
322+
JobBuilder jobBuilder = jobManager.createJob(JOB_TOPIC);
323+
Map<String, Object> properties = new HashMap<>();
324+
properties.put(JOB_PROP_TYPE, JobType.CRON.name());
325+
properties.put(JOB_PROP_SCRIPT_PATH, script.getPath());
326+
jobBuilder.properties(properties);
327+
JobBuilder.ScheduleBuilder scheduleBuilder = jobBuilder.schedule();
328+
scheduleBuilder.cron(schedule.getExpression());
329+
scheduleBuilder.add();
278330
LOG.info(
279331
"Cron schedule script '{}' scheduled with expression '{}'",
280332
script.getId(),
@@ -284,31 +336,29 @@ private void scheduleCronScript(Script script, CronSchedule schedule) {
284336
}
285337
}
286338

287-
private Job cronJob(String scriptPath) {
288-
return context -> {
289-
LOG.info("Cron schedule script '{}' - job started", scriptPath);
290-
if (awaitInstanceHealthy(
291-
String.format("Cron schedule script '%s' queueing", scriptPath),
292-
config.healthCheckRetryCountCron(),
293-
config.healthCheckRetryInterval())) {
294-
try (ResourceResolver resourceResolver = ResolverUtils.contentResolver(resourceResolverFactory, null)) {
295-
ScriptRepository scriptRepository = new ScriptRepository(resourceResolver);
296-
Script script = scriptRepository.read(scriptPath).orElse(null);
297-
if (script == null) {
298-
LOG.error("Cron schedule script '{}' not found in repository!", scriptPath);
339+
private void cronJob(String scriptPath) {
340+
LOG.info("Cron schedule script '{}' - job started", scriptPath);
341+
if (awaitInstanceHealthy(
342+
String.format("Cron schedule script '%s' queueing", scriptPath),
343+
config.healthCheckRetryCountCron(),
344+
config.healthCheckRetryInterval())) {
345+
try (ResourceResolver resourceResolver = ResolverUtils.contentResolver(resourceResolverFactory, null)) {
346+
ScriptRepository scriptRepository = new ScriptRepository(resourceResolver);
347+
Script script = scriptRepository.read(scriptPath).orElse(null);
348+
if (script == null) {
349+
LOG.error("Cron schedule script '{}' not found in repository!", scriptPath);
350+
} else {
351+
if (checkScript(script, resourceResolver)) {
352+
queueScript(script);
299353
} else {
300-
if (checkScript(script, resourceResolver)) {
301-
queueScript(script);
302-
} else {
303-
LOG.info("Cron schedule script '{}' not eligible for queueing!", scriptPath);
304-
}
354+
LOG.info("Cron schedule script '{}' not eligible for queueing!", scriptPath);
305355
}
306-
} catch (LoginException e) {
307-
LOG.error("Cannot access repository while queueing cron schedule script '{}'!", scriptPath, e);
308356
}
357+
} catch (LoginException e) {
358+
LOG.error("Cannot access repository while queueing cron schedule script '{}'!", scriptPath, e);
309359
}
310-
LOG.info("Cron schedule script '{}' - job finished", scriptPath);
311-
};
360+
}
361+
LOG.info("Cron schedule script '{}' - job finished", scriptPath);
312362
}
313363

314364
private boolean checkScript(Script script, ResourceResolver resourceResolver) {
@@ -387,4 +437,13 @@ private boolean awaitInstanceHealthy(String operation, long retryMaxCount, long
387437
}
388438
return true;
389439
}
440+
441+
public void reset() {
442+
findJobs().forEach(job -> jobManager.removeJobById(job.getId()));
443+
}
444+
445+
@SuppressWarnings("unchecked")
446+
private Stream<Job> findJobs() {
447+
return jobManager.findJobs(JobManager.QueryType.ALL, JOB_TOPIC, -1, Collections.emptyMap()).stream();
448+
}
390449
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
queue.name="AEM\ Content\ Manager\ Script\ Scheduler"
2+
queue.topics=["dev/vml/es/acm/ScriptScheduler"]
3+
queue.priority="NORM"
4+
queue.type="ORDERED"
5+
queue.maxparallel="1"
6+
queue.retries="0"
7+
queue.retrydelay="2000"
8+
queue.keepJobs="false"

ui.frontend/src/components/ScriptAutomaticList.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ const ScriptAutomaticList: React.FC = () => {
6565
<ScriptsDeleteButton selectedKeys={selectedIds(selectedKeys)} onDelete={handleLoadScripts} />
6666
<ScriptsSyncButton selectedKeys={selectedIds(selectedKeys)} onSync={handleLoadScripts} />
6767
</Toggle>
68-
<Button variant="negative" isDisabled={appState.instanceSettings.type === InstanceType.CLOUD_CONTAINER} onPress={() => window.open(instanceOsgiServiceConfigUrl(InstanceOsgiServicePid.AUTOMATIC_SCRIPT_SCHEDULER), '_blank')}>
68+
<Button variant="negative" isDisabled={appState.instanceSettings.type === InstanceType.CLOUD_CONTAINER} onPress={() => window.open(instanceOsgiServiceConfigUrl(InstanceOsgiServicePid.SCRIPT_SCHEDULER), '_blank')}>
6969
<Settings />
7070
<Text>Configure</Text>
7171
</Button>

ui.frontend/src/utils/api.types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ export enum InstanceType {
351351
export const instancePrefix = isProduction() ? '' : 'http://localhost:5502';
352352

353353
export enum InstanceOsgiServicePid {
354-
AUTOMATIC_SCRIPT_SCHEDULER = 'dev.vml.es.acm.core.script.AutomaticScriptScheduler',
354+
SCRIPT_SCHEDULER = 'dev.vml.es.acm.core.script.ScriptScheduler',
355355

356356
CODE_EXECUTOR = 'dev.vml.es.acm.core.code.Executor',
357357
CODE_EXECUTION_QUEUE = 'dev.vml.es.acm.core.code.ExecutionQueue',

0 commit comments

Comments
 (0)