Created
February 3, 2011 14:41
-
-
Save zubairov/809545 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************************************************* | |
* Copyright (c) 2010 SOPERA GmbH | |
* All rights reserved. | |
* This program and the accompanying materials are made available | |
* under the terms of the Eclipse Public License v1.0 | |
* which accompanies this distribution, and is available at | |
* http://www.eclipse.org/legal/epl-v10.html | |
*******************************************************************************/ | |
package org.sopera.talend.provider; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
import javax.xml.ws.Provider; | |
/** | |
* Implementation of {@link Provider} that will queue incoming requests | |
* to be processed within one thread | |
* | |
* @author zubairov | |
*/ | |
public class QueuedProvider<T> implements Provider<T> { | |
private static final int MAX_QUEUE_SIZE = 1000; | |
private static final int WAIT_TIMEOUT_SECONDS = 120; | |
@SuppressWarnings("rawtypes") | |
private static final BlockingQueue<QueuedRequestImpl> queue = new LinkedBlockingQueue<QueuedRequestImpl>(MAX_QUEUE_SIZE); | |
/** | |
* {@inheritDoc} | |
*/ | |
public T invoke(T message) { | |
QueuedRequestImpl<T> context = new QueuedRequestImpl<T>(message); | |
boolean inserted = queue.offer(context); | |
if (!inserted) { | |
try { | |
context.release(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
throw new RuntimeException("Can't queue request, queue size of " + MAX_QUEUE_SIZE + " is exceeded"); | |
} else { | |
try { | |
context.waitForRelease(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
if (context.getError() != null) { | |
throw new RuntimeException("Exception when processing request", context.getError()); | |
} | |
return context.getResponse(); | |
} | |
/** | |
* Blocking method to obtain the next message from the queue | |
* | |
* @return | |
* @throws InterruptedException | |
*/ | |
@SuppressWarnings("unchecked") | |
public QueuedRequest<T> next() throws InterruptedException { | |
return queue.take(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment