Skip to content

Instantly share code, notes, and snippets.

@ptupitsyn
Created June 10, 2025 06:21
Show Gist options
  • Save ptupitsyn/18a38db4dd1702558ebe273261f54233 to your computer and use it in GitHub Desktop.
Save ptupitsyn/18a38db4dd1702558ebe273261f54233 to your computer and use it in GitHub Desktop.
Custom Transactional Apache Ignite queue
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
public class MyQueue {
private static final String HEADER_KEY = "QUEUE_HEADER";
private final IgniteCache<Object, Object> cache;
private final Ignite ignite;
public MyQueue(Ignite ignite, String name) {
this.ignite = ignite;
CacheConfiguration<Object, Object> cacheCfg = new CacheConfiguration<>("my-queue")
.setAtomicityMode(TRANSACTIONAL);
cache = ignite.createCache(cacheCfg);
cache.put(HEADER_KEY, new IgniteBiTuple<>(0L, 0L));
}
public boolean offer(Object item) {
try (Transaction tx = txStart()) {
IgniteBiTuple<Long, Long> header = (IgniteBiTuple<Long, Long>) cache.get(HEADER_KEY);
long tail = header.get2();
cache.put(tail, item);
cache.put(HEADER_KEY, new IgniteBiTuple<>(header.get1(), tail + 1));
tx.commit();
return true;
}
}
public Object poll() {
try (Transaction tx = txStart()) {
IgniteBiTuple<Long, Long> header = (IgniteBiTuple<Long, Long>) cache.get(HEADER_KEY);
long head = header.get1();
Long tail = header.get2();
if (head >= tail) {
return null;
}
Object item = cache.getAndRemove(head);
cache.put(HEADER_KEY, new IgniteBiTuple<>(head + 1, tail));
tx.commit();
return item;
}
}
private Transaction txStart() {
return ignite.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment