Last active
January 11, 2018 12:17
-
-
Save paramsen/d779e56c034af395051951aabbbb968c to your computer and use it in GitHub Desktop.
Concurrent queue workers (download stuff synchronized in parallel)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Stack; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
// MIT License | |
// | |
// Copyright(c) 2017 Pär Amsen | |
// | |
// Permission is hereby granted,free of charge,to any person obtaining a copy | |
// of this software and associated documentation files(the"Software"),to deal | |
// in the Software without restriction,including without limitation the rights | |
// to use,copy,modify,merge,publish,distribute,sublicense,and/or sell | |
// copies of the Software,and to permit persons to whom the Software is | |
// furnished to do so,subject to the following conditions: | |
// | |
// The above copyright notice and this permission notice shall be included in all | |
// copies or substantial portions of the Software. | |
// | |
// THE SOFTWARE IS PROVIDED"AS IS",WITHOUT WARRANTY OF ANY KIND,EXPRESS OR | |
// IMPLIED,INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE | |
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,DAMAGES OR OTHER | |
// LIABILITY,WHETHER IN AN ACTION OF CONTRACT,TORT OR OTHERWISE,ARISING FROM, | |
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
// SOFTWARE. | |
/** | |
* Concurrently download from predefined list of urls on a predefined amount of worker threads. | |
* Workers execute requests until the Stack is depleted in a synchronized manor, since Stack | |
* is synchronized in Java. | |
* | |
* Does not care about IO errors and/or pause/resume for now | |
* | |
* @author Pär Amsen 05/2017 | |
*/ | |
public class ConcurrentRequestSource { | |
public static final int WORKERS = 4; | |
private AtomicBoolean done; | |
private File imagesDir; | |
private Stack<String> urls; | |
public ConcurrentRequestSource(File imagesDir, List<String> urls) { | |
this.imagesDir = imagesDir; | |
this.urls = new Stack<>(); | |
this.urls.addAll(urls); | |
done = new AtomicBoolean(false); | |
} | |
/** | |
* @param callback is a callback that will be called with each work result when available | |
*/ | |
public void download(Callback callback) { | |
for(Thread worker : newWorkers(callback)) { | |
worker.start(); | |
} | |
} | |
private List<Thread> newWorkers(Callback callback) { | |
ArrayList<Thread> workers = new ArrayList<>(); | |
for (int i = 0; i < WORKERS; i++) { | |
workers.add(new Thread(newWorker(callback), "ConcurrentRequestSourceWorker-" + i)); | |
} | |
return workers; | |
} | |
private Runnable newWorker(Callback callback) { | |
return new Runnable() { | |
@Override | |
public void run() { | |
while (!urls.empty()) { | |
callback.onEach(ConcurrentRequestSource.this.doDownload(urls.pop())); | |
} | |
if(done.compareAndSet(false, true)) { | |
callback.onDone(imagesDir); | |
} | |
System.out.println(String.format("Thread %s done", Thread.currentThread().getName())); | |
} | |
}; | |
} | |
/** | |
* Download and persist file on disk | |
* | |
* @return File pointing to downloaded file on disk | |
*/ | |
private File doDownload(String url) { | |
try { | |
Thread.sleep(10); //mock request time | |
} catch (InterruptedException e) { | |
//ignore | |
} | |
System.out.printf("Downloaded %s on thread: %s%n", url, Thread.currentThread().getName()); | |
return null; // file pointing to downloaded image on disk | |
} | |
interface Callback { | |
void onEach(File image); | |
void onDone(File imagesDir); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.junit.Test; | |
import java.io.File; | |
import java.util.ArrayList; | |
import java.util.List; | |
import static org.junit.Assert.*; | |
/** | |
* @author Pär Amsen 05/2017 | |
*/ | |
public class ConcurrentRequestSourceTest { | |
@Test | |
public void download() throws Exception { | |
List<File> results = new ArrayList<>(); | |
final int[] done = {0}; | |
List<String> urls = new ArrayList<>(); | |
for (int i = 0; i < 2000; i++) { | |
urls.add(String.valueOf(2000 - i - 1)); | |
} | |
new ConcurrentRequestSource(null, urls).download(new ConcurrentRequestSource.Callback() { | |
@Override | |
public void onEach(File image) { | |
results.add(image); | |
} | |
@Override | |
public void onDone(File imagesDir) { | |
done[0]++; | |
} | |
}); | |
while (results.size() < 2000) { | |
Thread.sleep(10); | |
} | |
assertEquals(2000, results.size()); | |
assertEquals(1, done[0]); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment