Created
August 7, 2014 00:46
-
-
Save jerieljan/1ec151f0bd08d382d8cb to your computer and use it in GitHub Desktop.
Dealing with long-running, processor-intensive tasks happen, and sometimes, requests for the same tasks get triggered and your system ends up attempting to process the -same- thing multiple times. JobManager is an attempt to resolve that by taking note of each Task<Result> as Jobs, which takes care of assigning future reattempts into existing Jo…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Job | |
* v1.0 | |
* June 18, 2014 | |
* | |
* Copyright 2014, Jeriel Jan del Prado | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.jerieljan.app.jobs; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
/** | |
* A Job wraps around a {@link Task} | |
* Jobs are designed to be pointed by multiple sources that require the result of a single task. | |
* | |
* @see Task | |
* @param <T> the expected result type of the Task. | |
* | |
* @author jerieljan | |
*/ | |
public class Job<T> { | |
public static final int SLEEP_INTERVAL = 250; | |
private static final Logger log = LoggerFactory.getLogger(Job.class); | |
private final Task<T> ongoingJob; | |
private final Object lock = new Object(); | |
private AtomicBoolean isRunning = new AtomicBoolean(false); | |
private AtomicBoolean isFinished = new AtomicBoolean(false); | |
private T result = null; | |
private Exception caughtException = null; | |
/** | |
* Creates a new Job with the given Task. | |
* | |
* Note that it is advised to use | |
* {@link JobManager#newJob(Task)} to | |
* create new Jobs within the context of a JobManager. | |
* | |
* @param taskToPerform the task to perform. | |
*/ | |
public Job(Task<T> taskToPerform) { | |
this.ongoingJob = taskToPerform; | |
} | |
/** | |
* Starts the task within the job. | |
* Contrary to its name, this method is marked private. | |
* Use {@link #get()} to start the inner Task. | |
* | |
* @return the result of the task. | |
* @throws Exception any exceptions encountered in the task. | |
*/ | |
private T start() throws Exception { | |
T result = ongoingJob.call(); | |
synchronized (lock) { | |
isRunning.set(false); | |
isFinished.set(true); | |
} | |
return result; | |
} | |
/** | |
* Gets the Task ID of this Job. | |
* | |
* @return task id | |
*/ | |
public String getTaskIdentifier() { | |
return ongoingJob.getTaskIdentifier(); | |
} | |
/** | |
* Performs the task included in the Job and returns the result. | |
* | |
* Get is designed to work when called by multiple classes simultaneously. Only the first thread will be accommodated to process the task. | |
* Once complete, all classes calling this method will receive the intended output. | |
* | |
* This method will block the currently running thread to those who call it, similar to Future's get() call. | |
* | |
* @return the result of the task. | |
* @throws Exception any exceptions encountered in the task. | |
*/ | |
public T get() throws Exception { | |
//The running state and finished state determines whether the task has been performed or not. | |
//Atomicity is important so that other classes calling this method will be subjected to the loop trap below. | |
if (isRunning.compareAndSet(false, true) && !isFinished.get()) { | |
try { | |
result = start(); | |
} catch (Exception inProcessException) { | |
//When an exception is caught, save its reference first before throwing again. | |
caughtException = inProcessException; | |
throw caughtException; | |
} | |
} | |
//This loop trap keeps other callers at bay while the task is currently being processed. | |
while (isRunning.get() && !isFinished.get()) { | |
//We throw Exceptions anyway, so we don't need to catch this. | |
log.trace("This job is currently being processed by another thread, waiting for result..."); | |
Thread.sleep(SLEEP_INTERVAL); | |
} | |
//Once a result has been resolved, return it (or any exceptions that may have occurred.) | |
if (caughtException != null) { | |
throw caughtException; | |
} | |
return result; | |
} | |
public boolean isFinished() { | |
return isFinished.get(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* JobManager | |
* v1.0 | |
* June 18, 2014 | |
* | |
* Copyright 2014, Jeriel Jan del Prado | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.jerieljan.app.jobs; | |
import java.util.ArrayList; | |
import java.util.List; | |
/** | |
* JobManager maintains a pool of running {@link Job}s. | |
*/ | |
public class JobManager { | |
private final List<Job> ongoingJobs; | |
private final List<Job> jobsToPurge; | |
private final Object lock = new Object(); | |
/** | |
* Starts the Job Manager. | |
*/ | |
public JobManager() { | |
ongoingJobs = new ArrayList<>(); | |
jobsToPurge = new ArrayList<>(); | |
} | |
/** | |
* Starts a new Job. | |
* If the job is already running, it will be reused. | |
* | |
* @param taskToPerform | |
* @param <T> | |
* @return | |
*/ | |
public <T> Job newJob(Task<T> taskToPerform) { | |
Job runningJob = getRunningJob(taskToPerform.getTaskIdentifier()); | |
//If the job is found, reuse it. Else, create it! | |
if (runningJob == null) { | |
runningJob = new Job<>(taskToPerform); | |
ongoingJobs.add(runningJob); | |
} | |
return runningJob; | |
} | |
/** | |
* Returns the running jobs (that are not yet finished) in the job pool. | |
* Note that this operation also expunges finished jobs, in order to clean the job pool. | |
* | |
* @param taskId the ID of the task to retrieve. | |
* | |
* @return | |
*/ | |
private Job getRunningJob(String taskId) { | |
Job target = null; | |
synchronized (lock) { | |
for (Job runningJob : ongoingJobs) { | |
if (runningJob.isFinished()) { | |
jobsToPurge.add(runningJob); | |
} else if (runningJob.getTaskIdentifier().contentEquals(taskId)) { | |
target = runningJob; | |
break; | |
} | |
} | |
ongoingJobs.removeAll(jobsToPurge); | |
jobsToPurge.clear(); | |
} | |
return target; | |
} | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Task | |
* v1.0 | |
* June 18, 2014 | |
* | |
* Copyright 2014, Jeriel Jan del Prado | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.jerieljan.app.jobs; | |
import java.util.concurrent.Callable; | |
/** | |
* A Task is a glorified {@link java.util.concurrent.Callable} that has an identifier. | |
* The identifier is used to compare multiple tasks together and to verify if they have the same parameters and tasks. | |
* | |
* @param <T> | |
*/ | |
public abstract class Task<T> implements Callable<T> { | |
private final String taskIdentifier; | |
protected Task(String taskIdentifier) { | |
this.taskIdentifier = taskIdentifier; | |
} | |
/** | |
* A convenience static call to generate a task identifier, based on the parameters provided. | |
* | |
* @param objects | |
* | |
* @return | |
*/ | |
public static String generateId(String methodName, Object... objects) { | |
String result = methodName + ": "; | |
for (Object object : objects) { | |
result = result.concat(object.toString()); | |
} | |
return result; | |
} | |
/** | |
* Retrieves the task identifier of this Task. | |
* <p/> | |
* Note that task identifiers do not serve as unique identifiers. Task identifiers are used to | |
* compare tasks that perform the same parameters, similar to how MD5 hashes are used to | |
* validate file content equality. | |
* | |
* @return | |
*/ | |
public String getTaskIdentifier() { | |
return taskIdentifier; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment