Friday, December 18, 2015

Learn java util concurrent part5

This is the last series of learning into java util concurrent package. If you have not read the series before, you can find part1, part2, part3 and part4 at the respectively links. In this series, we will study remaining 19 classes in java.util.concurrent package.

ForkJoinTask<V>

  • Abstract base class for tasks that run within a ForkJoinPool.

1:        ForkJoinTask<Integer> fjt = ForkJoinTask.adapt(new Summer(44,55));  
2:        fjt.invoke();  
3:        Integer sum = fjt.get();  
4:        System.out.println(sum);  
5:        System.out.println(fjt.isDone());  
6:        fjt.join();  

Noticed that a new callable class was adapted into ForkJoinTask. The execution is commenced with invokeking the task. You can check if the task is complete using isDone method.

ForkJoinWorkerThread

  • A thread managed by a ForkJoinPool, which executes ForkJoinTasks.

1:        ForkJoinWorkerThreadFactory customFactory = new ForkJoinWorkerThreadFactory() {  
2:           @Override  
3:           public ForkJoinWorkerThread newThread(ForkJoinPool pool) {  
4:              return null;  
5:           }  
6:        };  

As explained by ForkJoinWorkerThread javadoc, ForkJoinWorkerThreadFactory return a thread from the pool.

FutureTask<V>

  • A cancellable asynchronous computation.
  • A FutureTask can be used to wrap a Callable or Runnable object. Because FutureTask implements Runnable, a FutureTask can be submitted to an Executor for execution.

1:        FutureTask<Integer> ft = new FutureTask<Integer>(new Summer(66,77));  
2:        ft.run();  
3:        System.out.println(ft.get());  

If you have a long running task, you can use FutureTask so the task can be cancel. Next, we will go into another three queues.

LinkedBlockingDeque<E>

  • An optionally-bounded blocking deque based on linked nodes.
  • The capacity, if unspecified, is equal to Integer.MAX_VALUE.
  • Most operations run in constant time (ignoring time spent blocking). Exceptions include remove, removeFirstOccurrence, removeLastOccurrence, contains, iterator.remove(), and the bulk operations, all of which run in linear time.


LinkedBlockingQueue<E>

  • An optionally-bounded blocking queue based on linked nodes.
  • This queue orders elements FIFO (first-in-first-out).
  • The head of the queue is that element that has been on the queue the longest time.
  • The tail of the queue is that element that has been on the queue the shortest time.
  • New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.
  • The capacity, if unspecified, is equal to Integer.MAX_VALUE.


LinkedTransferQueue<E>

  • An unbounded TransferQueue based on linked nodes.
  • This queue orders elements FIFO (first-in-first-out) with respect to any given producer.
  • The head of the queue is that element that has been on the queue the longest time for some producer.
  • the size method is NOT a constant-time operation.

1:        LinkedBlockingDeque<Integer> lbd = new LinkedBlockingDeque<Integer>();  
2:        lbd.add(1);  
3:        lbd.add(2);  
4:        lbd.add(3);  
5:    
6:        LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<Integer>();  
7:        lbq.add(4);  
8:        lbq.add(5);  
9:        lbq.add(6);  
10:          
11:        LinkedTransferQueue<Integer> ltq = new LinkedTransferQueue<Integer>();  
12:        ltq.add(7);  
13:        ltq.add(8);  
14:        ltq.add(9);  

Like the queues we talked about in part4, these queues will shown its benefits under multithreaded codes.

Phaser

  • A reusable synchronization barrier, similar in functionality to CyclicBarrier and CountDownLatch but supporting more flexible usage.
  • This implementation restricts the maximum number of parties to 65535.


1:        Phaser phaser = new Phaser();  
2:        phaser.register();  
3:        System.out.println("current phase number : " + phaser.getPhase());  
4:        testPhaser(phaser, 2000);  
5:        testPhaser(phaser, 4000);  
6:        testPhaser(phaser, 6000);  
7:          
8:        phaser.arriveAndDeregister();  
9:        Thread.sleep(10000);  
10:        System.out.println("current phase number : " + phaser.getPhase());  

PriorityBlockingQueue<E>

  • An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations.
  • This class does not permit null elements.
  • The Iterator provided in method iterator() is not guaranteed to traverse the elements of the PriorityBlockingQueue in any particular order


1:        PriorityBlockingQueue<Integer> pbq = new PriorityBlockingQueue<Integer>();  
2:        pbq.add(10);  
3:        pbq.add(11);  
4:        pbq.add(12);  

RecursiveAction

  • A recursive resultless ForkJoinTask.


1:        long[] array = {1,3,2,5,4,9,5,7,8};  
2:        RecursiveAction ar = new SortTask(array);  
3:        ar.invoke();  
4:        System.out.println("array " + array[0]);  
5:        System.out.println("array " + array[1]);  
6:        System.out.println("array " + array[2]);  
7:        System.out.println("array " + array[3]);  
8:        System.out.println("array " + array[4]);  
9:        System.out.println("array " + array[5]);  
10:        System.out.println("array " + array[6]);  
11:        System.out.println("array " + array[7]);  
12:        System.out.println("array " + array[8]);  
13:    
14:     static class SortTask extends RecursiveAction {  
15:          
16:        final long[] array;  
17:        final int lo, hi;  
18:          
19:        SortTask(long[] array, int lo, int hi) {  
20:           this.array = array;  
21:           this.lo = lo;  
22:           this.hi = hi;  
23:        }  
24:          
25:        SortTask(long[] array) {  
26:           this(array, 0, array.length);  
27:        }  
28:    
29:        @Override  
30:        protected void compute() {  
31:           if (hi - lo < THRESHOLD)  
32:              sortSequentially(lo,hi);  
33:           else {  
34:              int mid = (lo + hi) >>> 1;  
35:              invokeAll(new SortTask(array, lo, mid), new SortTask(array, mid, hi));  
36:              merge(lo, mid, hi);  
37:           }  
38:        }  
39:          
40:        // implementation details follow:  
41:        static final int THRESHOLD = 1000;  
42:          
43:        void sortSequentially(int lo, int hi) {  
44:           Arrays.sort(array, lo, hi);  
45:        }  
46:          
47:        void merge(int lo, int mid, int hi) {  
48:           long[] buf = Arrays.copyOfRange(array, lo, mid);  
49:           for (int i = 0, j = lo, k = mid; i < buf.length; j++)  
50:              array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++];  
51:        }  
52:           
53:     }  

recursive sorting to the array by invoke commence to the object ar.

RecursiveTask<V>

  • A recursive result-bearing ForkJoinTask.


1:        RecursiveTask<Integer> fibo = new Fibonacci(10);  
2:        fibo.invoke();  
3:        System.out.println(fibo.get());  
4:          
5:     static class Fibonacci extends RecursiveTask<Integer> {  
6:          
7:        final int n;  
8:    
9:        Fibonacci(int n) {  
10:           this.n = n;  
11:        }  
12:    
13:        protected Integer compute() {  
14:           if (n <= 1)  
15:              return n;  
16:           Fibonacci f1 = new Fibonacci(n - 1);  
17:           f1.fork();  
18:           Fibonacci f2 = new Fibonacci(n - 2);  
19:           return f2.compute() + f1.join();  
20:        }  
21:     }  

ScheduledThreadPoolExecutor

  • A ThreadPoolExecutor that can additionally schedule commands to run after a given delay, or to execute periodically.


1:        ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(10);  
2:        Future<Integer> total = stpe.submit(new Summer(88,99));  
3:        System.out.println(total.get());  
4:        stpe.shutdown();  

Semaphore

  • A counting semaphore.
  • Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource.


1:        ConnectionLimiter cl = new ConnectionLimiter(3);  
2:        URLConnection conn = cl.acquire(new URL("http://www.google.com"));  
3:        conn = cl.acquire(new URL("http://www.yahoo.com"));  
4:        conn = cl.acquire(new URL("http://www.youtube.com"));  
5:        cl.release(conn);  
6:          
7:     static class ConnectionLimiter {  
8:        private final Semaphore semaphore;  
9:          
10:        private ConnectionLimiter(int max) {  
11:           semaphore = new Semaphore(max);  
12:        }  
13:          
14:        public URLConnection acquire(URL url) throws IOException, InterruptedException {  
15:           semaphore.acquire();  
16:           return url.openConnection();  
17:        }  
18:          
19:        public void release(URLConnection conn) {  
20:           try {  
21:              // blahblah  
22:           } finally {  
23:              semaphore.release();  
24:           }  
25:        }  
26:     }  

SynchronousQueue<E>

  • A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa.
  • This queue does not permit null elements.

1:        final SynchronousQueue<String> queue = new SynchronousQueue<String>();  
2:        Thread a = new Thread(new QueueProducer(queue));  
3:        a.start();  
4:        Thread b = new Thread(new QueueConsumer(queue));  
5:        b.start();  
6:          
7:        Thread.sleep(1000);  
8:          
9:        a.interrupt();  
10:        b.interrupt();  
11:          
12:          
13:     static class QueueProducer implements Runnable {  
14:          
15:        private SynchronousQueue<String> queue;  
16:    
17:        public QueueProducer(SynchronousQueue<String> queue) {  
18:           this.queue = queue;  
19:        }  
20:    
21:        @Override  
22:        public void run() {  
23:           String event = "SYNCHRONOUS_EVENT";  
24:           String another_event = "ANOTHER_EVENT";  
25:             
26:           try {  
27:              queue.put(event);  
28:              System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), event);  
29:                
30:              queue.put(another_event);  
31:              System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), another_event);  
32:           } catch (InterruptedException e) {  
33:           }  
34:             
35:        }  
36:          
37:     }  
38:       
39:     static class QueueConsumer implements Runnable {  
40:          
41:        private SynchronousQueue<String> queue;  
42:    
43:        public QueueConsumer(SynchronousQueue<String> queue) {  
44:           this.queue = queue;  
45:        }  
46:    
47:        @Override  
48:        public void run() {  
49:           try {  
50:              String event = queue.take();  
51:              // thread will block here  
52:              System.out.printf("[%s] consumed event : %s %n", Thread.currentThread().getName(), event);  
53:           } catch (InterruptedException e) {  
54:           }  
55:             
56:        }  
57:          
58:     }  

ThreadLocalRandom

  • A random number generator isolated to the current thread.


1:        ThreadLocalRandom tlr = ThreadLocalRandom.current();  
2:        System.out.println(tlr.nextInt());  

ThreadPoolExecutor

  • An ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods.


1:        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(4);  
2:        ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, blockingQueue);  

The last four classes are policies of the previous ThreadPoolExecutor which execute under specific condition.

ThreadPoolExecutor.AbortPolicy

  • A handler for rejected tasks that throws a RejectedExecutionException.


ThreadPoolExecutor.CallerRunsPolicy

  • A handler for rejected tasks that runs the rejected task directly in the calling thread of the execute method, unless the executor has been shut down, in which case the task is discarded.


ThreadPoolExecutor.DiscardOldestPolicy

  • A handler for rejected tasks that discards the oldest unhandled request and then retries execute, unless the executor is shut down, in which case the task is discarded.


ThreadPoolExecutor.DiscardPolicy

  • A handler for rejected tasks that silently discards the rejected task.


1:        ThreadPoolExecutor.AbortPolicy ap = new ThreadPoolExecutor.AbortPolicy();  
2:        try {  
3:        ap.rejectedExecution(() -> System.out.println("abort"), tpe);  
4:        } catch (Exception e) {  
5:             
6:        }  
7:          
8:        ThreadPoolExecutor.CallerRunsPolicy crp = new ThreadPoolExecutor.CallerRunsPolicy();  
9:        try {  
10:        crp.rejectedExecution(() -> System.out.println("run"), tpe);  
11:        } catch (Exception e) {  
12:             
13:        }  
14:          
15:        ThreadPoolExecutor.DiscardOldestPolicy dop = new ThreadPoolExecutor.DiscardOldestPolicy();  
16:        try {  
17:        dop.rejectedExecution(() -> System.out.println("abort"), tpe);  
18:        } catch (Exception e) {  
19:             
20:        }  
21:          
22:        ThreadPoolExecutor.DiscardPolicy dp = new ThreadPoolExecutor.DiscardPolicy();  
23:        try {  
24:        dp.rejectedExecution(() -> System.out.println("discard"), tpe);  
25:        } catch (Exception e) {  
26:             
27:        }  
28:    

That's it for these long learning series of java util concurrent.

No comments:

Post a Comment