Last active
December 15, 2015 16:49
-
-
Save Karry/5291758 to your computer and use it in GitHub Desktop.
Simple iterator for efective parallel iterating huge collections with known count of readers.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* <pre> | |
* Copyright (c) 2013, AVAST Software a.s. | |
* All rights reserved. | |
* | |
* Redistribution and use in source and binary forms, with or without | |
* modification, are permitted provided that the following conditions are met: | |
* * Redistributions of source code must retain the above copyright | |
* notice, this list of conditions and the following disclaimer. | |
* * Redistributions in binary form must reproduce the above copyright | |
* notice, this list of conditions and the following disclaimer in the | |
* documentation and/or other materials provided with the distribution. | |
* * Neither the name of the AVAST Software a.s. nor the | |
* names of its contributors may be used to endorse or promote products | |
* derived from this software without specific prior written permission. | |
* | |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND | |
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | |
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY | |
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES | |
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; | |
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND | |
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | |
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | |
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
* </pre> | |
* | |
*/ | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* Iterator for parallel access. It don't garant items order. | |
* | |
* @author karry | |
*/ | |
public class BlockParallelIterator<T> implements ParallelIterator<T> { | |
private final T[] data; | |
private final DataBlock[] blocks; | |
//ThreadLocal<DataBlock> threadBlock = new ThreadLocal<DataBlock>(); | |
public BlockParallelIterator(T[] data, int expectedReaders) { | |
if (data == null) | |
throw new NullPointerException("ParallelStack: data is null"); | |
expectedReaders = Math.max(expectedReaders, 1); | |
this.data = data; | |
int blockCount = Math.min(Math.max(data.length, 1), expectedReaders); | |
blocks = new DataBlock[blockCount]; | |
int step = data.length / blockCount; | |
int last = -1; | |
int first = 0; | |
for (int i = 0; i < blockCount; ++i) { | |
first = last + 1; | |
last = (i == blockCount - 1) ? data.length - 1 : first + step - 1; | |
blocks[i] = new DataBlock(first, last); | |
} | |
} | |
@Override | |
public int size() { | |
return data.length; | |
} | |
@Override | |
public void reset() { | |
for (DataBlock block : blocks) { | |
block.reset(); | |
} | |
} | |
@Override | |
public boolean hasNext() { | |
return getBlockWithData() != null; | |
} | |
@Override | |
public T next() { | |
DataBlock block = getBlockWithData(); | |
if (block == null) | |
return null; | |
int next = block.nextIndex(); | |
if (next >= 0) | |
return data[next]; | |
return null; | |
} | |
@Override | |
public void remove() { | |
throw new UnsupportedOperationException("Not supported in parallel iterator."); | |
} | |
DataBlock getBlockWithData() { | |
/* | |
DataBlock block = threadBlock.get(); | |
if (block != null && block.remaining() > 0) | |
return block; | |
*/ | |
long tid = Thread.currentThread().getId(); | |
int blockId = (int) (tid % (long) blocks.length); | |
for (int i = 0; i < blocks.length; ++i) { | |
DataBlock block = blocks[ (blockId + i) % blocks.length]; | |
if (block.remaining() > 0) { | |
//threadBlock.set(block); | |
return block; | |
} | |
} | |
return null; | |
} | |
static class DataBlock { | |
final AtomicInteger position = new AtomicInteger(-1); | |
final int first; | |
final int last; | |
DataBlock(int first, int last) { | |
this.first = first; | |
this.last = last; | |
} | |
private void reset() { | |
position.set(-1); | |
} | |
private int nextIndex() { | |
int i = position.incrementAndGet(); | |
if (i >= ((last - first) + 1)) | |
return -1; | |
return first + i; | |
} | |
public int remaining() { | |
return ((last - first) + 1) - position.get() - 1; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* <pre> | |
* Copyright (c) 2013, AVAST Software a.s. | |
* All rights reserved. | |
* | |
* Redistribution and use in source and binary forms, with or without | |
* modification, are permitted provided that the following conditions are met: | |
* * Redistributions of source code must retain the above copyright | |
* notice, this list of conditions and the following disclaimer. | |
* * Redistributions in binary form must reproduce the above copyright | |
* notice, this list of conditions and the following disclaimer in the | |
* documentation and/or other materials provided with the distribution. | |
* * Neither the name of the AVAST Software a.s. nor the | |
* names of its contributors may be used to endorse or promote products | |
* derived from this software without specific prior written permission. | |
* | |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND | |
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | |
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY | |
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES | |
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; | |
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND | |
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | |
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | |
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
* </pre> | |
* | |
*/ | |
import java.util.Iterator; | |
/** | |
* | |
* @author karry | |
*/ | |
public interface ParallelIterator<T> extends Iterator<T> { | |
public int size(); | |
/** | |
* Rewind iterator internal pointer to beginning. | |
*/ | |
public void reset(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* <pre> | |
* Copyright (c) 2013, AVAST Software a.s. | |
* All rights reserved. | |
* | |
* Redistribution and use in source and binary forms, with or without | |
* modification, are permitted provided that the following conditions are met: | |
* * Redistributions of source code must retain the above copyright | |
* notice, this list of conditions and the following disclaimer. | |
* * Redistributions in binary form must reproduce the above copyright | |
* notice, this list of conditions and the following disclaimer in the | |
* documentation and/or other materials provided with the distribution. | |
* * Neither the name of the AVAST Software a.s. nor the | |
* names of its contributors may be used to endorse or promote products | |
* derived from this software without specific prior written permission. | |
* | |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND | |
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | |
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY | |
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES | |
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; | |
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND | |
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | |
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | |
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
* </pre> | |
* | |
*/ | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* | |
* @author karry | |
*/ | |
public class ParallelIteratorTester implements Runnable { | |
/** | |
* @param args the command line arguments | |
*/ | |
public static void main(String[] args) throws InterruptedException { | |
final ExecutorService distExecutorThreadPool = Executors.newCachedThreadPool(); | |
for (int readers = 1; readers <= 30; ++readers) { | |
for (int size = 20000000; size <= 20000000; size += 100) { | |
test(readers, size, true, distExecutorThreadPool); | |
//test(readers, size, false, distExecutorThreadPool); | |
} | |
} | |
for (int readers = 1; readers <= 30; ++readers) { | |
for (int size = 20000000; size <= 20000000; size += 100) { | |
test(readers, size, false, distExecutorThreadPool); | |
//test(readers, size, false, distExecutorThreadPool); | |
} | |
} | |
distExecutorThreadPool.shutdown(); | |
} | |
private static void test(int readers, int size, boolean block, ExecutorService distExecutorThreadPool) throws InterruptedException { | |
// init test array | |
AtomicInteger[] arr = new AtomicInteger[size]; | |
for (int i = 0; i < arr.length; ++i) { | |
arr[i] = new AtomicInteger(0); | |
} | |
long start = System.nanoTime(); | |
ParallelIterator<AtomicInteger> it = block | |
? new BlockParallelIterator<AtomicInteger>(arr, readers) | |
: new SimpleParallelIterator<AtomicInteger>(arr); | |
final CountDownLatch distLatch = new CountDownLatch(readers); | |
for (int i = 0; i < readers; ++i) { | |
ParallelIteratorTester worker = new ParallelIteratorTester(it, distLatch); | |
distExecutorThreadPool.submit(worker); | |
} | |
distLatch.await(); | |
long end = System.nanoTime(); | |
int readedOnce = 0; | |
for (int i = 0; i < arr.length; ++i) { | |
if (arr[i].get() == 1) | |
readedOnce++; | |
} | |
if (readedOnce != size) | |
throw new RuntimeException("some value skipped"); | |
System.out.println((block ? "[BLOCK]" : "[NORM ]") + " size " + size + " readers " + readers + " duration " + (end - start) + " ns"); | |
} | |
private final CountDownLatch distLatch; | |
private final ParallelIterator<AtomicInteger> it; | |
private ParallelIteratorTester(ParallelIterator<AtomicInteger> it, CountDownLatch distLatch) { | |
this.it = it; | |
this.distLatch = distLatch; | |
} | |
@Override | |
public void run() { | |
while (it.hasNext()) { | |
AtomicInteger ai = it.next(); | |
if (ai == null) | |
continue; | |
if (ai.incrementAndGet() > 1) | |
throw new RuntimeException("double returned value"); | |
} | |
distLatch.countDown(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* <pre> | |
* Copyright (c) 2013, AVAST Software a.s. | |
* All rights reserved. | |
* | |
* Redistribution and use in source and binary forms, with or without | |
* modification, are permitted provided that the following conditions are met: | |
* * Redistributions of source code must retain the above copyright | |
* notice, this list of conditions and the following disclaimer. | |
* * Redistributions in binary form must reproduce the above copyright | |
* notice, this list of conditions and the following disclaimer in the | |
* documentation and/or other materials provided with the distribution. | |
* * Neither the name of the AVAST Software a.s. nor the | |
* names of its contributors may be used to endorse or promote products | |
* derived from this software without specific prior written permission. | |
* | |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND | |
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | |
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY | |
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES | |
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; | |
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND | |
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | |
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | |
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
* </pre> | |
* | |
*/ | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* Simple iterator for parallel access. It share one position pointer for all | |
* readers. It garant items order. | |
* | |
* @author karry | |
*/ | |
public class SimpleParallelIterator<T> implements ParallelIterator<T> { | |
private final T[] data; | |
private final AtomicInteger position; | |
public SimpleParallelIterator(T[] data) { | |
if (data == null) | |
throw new NullPointerException("ParallelStack: data is null"); | |
this.data = data; | |
this.position = new AtomicInteger(-1); | |
} | |
@Override | |
public boolean hasNext() { | |
return remaining() > 0; | |
} | |
@Override | |
public T next() { | |
int i = position.incrementAndGet(); | |
if (i >= data.length) | |
return null; | |
return data[i]; | |
} | |
@Override | |
public void remove() { | |
throw new UnsupportedOperationException("Not supported in parallel iterator."); | |
} | |
@Override | |
public int size() { | |
return data.length; | |
} | |
public int remaining() { | |
return data.length - position.get() - 1; | |
} | |
@Override | |
public void reset() { | |
position.set(-1); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment