Skip to content

Instantly share code, notes, and snippets.

@Karry
Last active December 15, 2015 16:49
Show Gist options
  • Save Karry/5291758 to your computer and use it in GitHub Desktop.
Save Karry/5291758 to your computer and use it in GitHub Desktop.
Simple iterator for efective parallel iterating huge collections with known count of readers.
/**
* <pre>
* Copyright (c) 2013, AVAST Software a.s.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the AVAST Software a.s. nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* </pre>
*
*/
import java.util.concurrent.atomic.AtomicInteger;
/**
* Iterator for parallel access. It don't garant items order.
*
* @author karry
*/
public class BlockParallelIterator<T> implements ParallelIterator<T> {
private final T[] data;
private final DataBlock[] blocks;
//ThreadLocal<DataBlock> threadBlock = new ThreadLocal<DataBlock>();
public BlockParallelIterator(T[] data, int expectedReaders) {
if (data == null)
throw new NullPointerException("ParallelStack: data is null");
expectedReaders = Math.max(expectedReaders, 1);
this.data = data;
int blockCount = Math.min(Math.max(data.length, 1), expectedReaders);
blocks = new DataBlock[blockCount];
int step = data.length / blockCount;
int last = -1;
int first = 0;
for (int i = 0; i < blockCount; ++i) {
first = last + 1;
last = (i == blockCount - 1) ? data.length - 1 : first + step - 1;
blocks[i] = new DataBlock(first, last);
}
}
@Override
public int size() {
return data.length;
}
@Override
public void reset() {
for (DataBlock block : blocks) {
block.reset();
}
}
@Override
public boolean hasNext() {
return getBlockWithData() != null;
}
@Override
public T next() {
DataBlock block = getBlockWithData();
if (block == null)
return null;
int next = block.nextIndex();
if (next >= 0)
return data[next];
return null;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Not supported in parallel iterator.");
}
DataBlock getBlockWithData() {
/*
DataBlock block = threadBlock.get();
if (block != null && block.remaining() > 0)
return block;
*/
long tid = Thread.currentThread().getId();
int blockId = (int) (tid % (long) blocks.length);
for (int i = 0; i < blocks.length; ++i) {
DataBlock block = blocks[ (blockId + i) % blocks.length];
if (block.remaining() > 0) {
//threadBlock.set(block);
return block;
}
}
return null;
}
static class DataBlock {
final AtomicInteger position = new AtomicInteger(-1);
final int first;
final int last;
DataBlock(int first, int last) {
this.first = first;
this.last = last;
}
private void reset() {
position.set(-1);
}
private int nextIndex() {
int i = position.incrementAndGet();
if (i >= ((last - first) + 1))
return -1;
return first + i;
}
public int remaining() {
return ((last - first) + 1) - position.get() - 1;
}
}
}
/**
* <pre>
* Copyright (c) 2013, AVAST Software a.s.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the AVAST Software a.s. nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* </pre>
*
*/
import java.util.Iterator;
/**
*
* @author karry
*/
public interface ParallelIterator<T> extends Iterator<T> {
public int size();
/**
* Rewind iterator internal pointer to beginning.
*/
public void reset();
}
/**
* <pre>
* Copyright (c) 2013, AVAST Software a.s.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the AVAST Software a.s. nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* </pre>
*
*/
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* @author karry
*/
public class ParallelIteratorTester implements Runnable {
/**
* @param args the command line arguments
*/
public static void main(String[] args) throws InterruptedException {
final ExecutorService distExecutorThreadPool = Executors.newCachedThreadPool();
for (int readers = 1; readers <= 30; ++readers) {
for (int size = 20000000; size <= 20000000; size += 100) {
test(readers, size, true, distExecutorThreadPool);
//test(readers, size, false, distExecutorThreadPool);
}
}
for (int readers = 1; readers <= 30; ++readers) {
for (int size = 20000000; size <= 20000000; size += 100) {
test(readers, size, false, distExecutorThreadPool);
//test(readers, size, false, distExecutorThreadPool);
}
}
distExecutorThreadPool.shutdown();
}
private static void test(int readers, int size, boolean block, ExecutorService distExecutorThreadPool) throws InterruptedException {
// init test array
AtomicInteger[] arr = new AtomicInteger[size];
for (int i = 0; i < arr.length; ++i) {
arr[i] = new AtomicInteger(0);
}
long start = System.nanoTime();
ParallelIterator<AtomicInteger> it = block
? new BlockParallelIterator<AtomicInteger>(arr, readers)
: new SimpleParallelIterator<AtomicInteger>(arr);
final CountDownLatch distLatch = new CountDownLatch(readers);
for (int i = 0; i < readers; ++i) {
ParallelIteratorTester worker = new ParallelIteratorTester(it, distLatch);
distExecutorThreadPool.submit(worker);
}
distLatch.await();
long end = System.nanoTime();
int readedOnce = 0;
for (int i = 0; i < arr.length; ++i) {
if (arr[i].get() == 1)
readedOnce++;
}
if (readedOnce != size)
throw new RuntimeException("some value skipped");
System.out.println((block ? "[BLOCK]" : "[NORM ]") + " size " + size + " readers " + readers + " duration " + (end - start) + " ns");
}
private final CountDownLatch distLatch;
private final ParallelIterator<AtomicInteger> it;
private ParallelIteratorTester(ParallelIterator<AtomicInteger> it, CountDownLatch distLatch) {
this.it = it;
this.distLatch = distLatch;
}
@Override
public void run() {
while (it.hasNext()) {
AtomicInteger ai = it.next();
if (ai == null)
continue;
if (ai.incrementAndGet() > 1)
throw new RuntimeException("double returned value");
}
distLatch.countDown();
}
}
/**
* <pre>
* Copyright (c) 2013, AVAST Software a.s.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the AVAST Software a.s. nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* </pre>
*
*/
import java.util.concurrent.atomic.AtomicInteger;
/**
* Simple iterator for parallel access. It share one position pointer for all
* readers. It garant items order.
*
* @author karry
*/
public class SimpleParallelIterator<T> implements ParallelIterator<T> {
private final T[] data;
private final AtomicInteger position;
public SimpleParallelIterator(T[] data) {
if (data == null)
throw new NullPointerException("ParallelStack: data is null");
this.data = data;
this.position = new AtomicInteger(-1);
}
@Override
public boolean hasNext() {
return remaining() > 0;
}
@Override
public T next() {
int i = position.incrementAndGet();
if (i >= data.length)
return null;
return data[i];
}
@Override
public void remove() {
throw new UnsupportedOperationException("Not supported in parallel iterator.");
}
@Override
public int size() {
return data.length;
}
public int remaining() {
return data.length - position.get() - 1;
}
@Override
public void reset() {
position.set(-1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment