Okay, let's start with ForkJoinPool.ForkJoinWorkerThreadFactory. For programming wise, you should not be worry as the class ForkJoinPool takes care of this implementation. Within class ForkJoinPool, we see that, there are two classes
- DefaultForkJoinWorkerThreadFactory
- InnocuousForkJoinWorkerThreadFactory
which implement ForkJoinWorkerThreadFactory with access modifier to default. So unless you know what you want and you know how ForkJoinPool work, then subclass ForkJoinPool away. For beginner in this article, it is sufficient to just use ForkJoinPool.
ManagedBlocker is an interface for extending managed parallelism for tasks running in ForkJoinPools. There are two methods to be implemented, block() and isReleasable()
1: public class QueueManagedBlocker<T> implements ManagedBlocker {
2:
3: final BlockingQueue<T> queue;
4: volatile T value = null;
5:
6: QueueManagedBlocker(BlockingQueue<T> queue) {
7: this.queue = queue;
8: }
9:
10: @Override
11: public boolean block() throws InterruptedException {
12: if (value == null)
13: value = queue.take();
14: return true;
15: }
16:
17: @Override
18: public boolean isReleasable() {
19: return value != null || (value = queue.poll()) != null;
20: }
21:
22: public T getValue() {
23: return value;
24: }
25:
26: }
Next, we have interface Future<V> which we have see before in the previous learning series.
1: ExecutorService executorService = Executors.newFixedThreadPool(1);
2: Future<Integer> future = executorService.submit(new Summer(11,22));
It's very clear you can obtain the result via future variable above. Interface RejectedExecutionHandler is mostly for error handling.
1: RejectedExecutionHandler executionHandler = new MyRejectedExecutionHandlerImpl();
2: ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 10, TimeUnit.SECONDS, worksQueue, executionHandler);
3:
4: public class MyRejectedExecutionHandlerImpl implements RejectedExecutionHandler {
5:
6: @Override
7: public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
8: System.out.println(r.toString() + " : I've been rejected !");
9: }
10:
11: }
So you can set the implementation classed to ThreadPoolExecutor and if a task cannot be executor by the ThreadPoolExecutor, rejectedExecution will be executed. Moving onto the next interface, RunnableFuture<V> .
1: RunnableFuture<Integer> rf = new FutureTask<Integer>(new Summer(22,33));
so we see an initialization of object FutureTask with a callable class Summer class which we created in the previous learning series. Interface RunnableScheduledFuture which extend the previous interface RunnableFuture has another additional method to implement upon on.
1: RunnableScheduledFuture<Integer> rsf = new Summer1();
2: System.out.println(rsf.isPeriodic());RunnableFuture<Integer> rf = new FutureTask<Integer>(new Summer(22,33));
In the class Summer1, you should determine if the class is periodic or not. ScheduledExecutorService is pretty common if you google this interface and given the code below.
1: ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
2: scheduler.scheduleAtFixedRate(() -> System.out.println("hihi"), 1, 1, TimeUnit.SECONDS);
3: Thread.sleep(3000);
4: scheduler.shutdown();
so we see a thread is executed every second.
1: ScheduledFuture<Integer> sf = new ScheduledFutureImpl();
2: sf.isCancelled();
ScheduledFuture<V> is a delayed result-bearing action that can be cancelled. Usually a scheduled future is the result of scheduling a task with a ScheduledExecutorService. This class is pretty common if you have a future task which get delay for whatever reason or it may get cancel, you want to look further into this class.
ThreadFactory is another interface which creates new threads on demand. Using thread factories removes hardwiring of calls to new Thread, enabling applications to use special thread subclasses, priorities, etc.
1: ThreadFactory tf = Executors.defaultThreadFactory();
2: tf.newThread(()->System.out.println("ThreadFactory")).start();
In this last series, we take a look at the last interface, TransferQueue. A TransferQueue may be useful for example in message passing applications in which producers sometimes (using method transfer(E)) await receipt of elements by consumers invoking take or poll, while at other times enqueue elements (via method put) without waiting for receipt.
1: TransferQueue<Integer> tq = new LinkedTransferQueue<Integer>();
That's it for this learning series. Thank you. Oh, and the source code.
https://github.com/jasonwee/videoOnCloud/commit/ce479e5befaf7abe84d3d85930d5196a639e2643
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/org/just4fun/concurrent/MyRejectedExecutionHandlerImpl.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/org/just4fun/concurrent/ExampleThreadPoolExecutor.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/org/just4fun/concurrent/DemoExecutor.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/LearnConcurrentInterfaceP2.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/QueueManagedBlocker.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/ScheduledFutureImpl.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/Summer1.java