Friday, December 4, 2015

Learn java util concurrent part2

This series is the next learning series of java.util.concurrent. You should read the first part here too. Today we will learn ten interfaces in package java.util.concurrent.

Okay, let's start on the queue interface, BlockingDeque. Some characteristics of this interface including

  • blocking
  • thread safe
  • does not permit null elements
  • may (or may not) be capacity-constrained.

and we can do adding/removing item to this queue. Example below.

1:  bd.add(1);  
2:  bd.add(2);  
3:  System.out.println("size: " + bd.size());  
4:          
5:  bd.add(3);  
6:  System.out.println("size: " + bd.size());  
7:  //bd.add(4); // exception  
8:          
9:  bd.forEach(s -> System.out.println(s));  

Try play around this class with different methods to get a basic understanding on it. Next, we have a similar queue called BlockingQueue. It's characters same as BlockingDeque, not sure where is the different. But official java has many classes implement this BlockingQueue.

1:  BlockingQueue<Integer> bq = new ArrayBlockingQueue<Integer>(10);  
2:  bq = new DelayQueue();  
3:  bq = new LinkedBlockingDeque<Integer>();  
4:  bq = new LinkedTransferQueue<Integer>();  
5:  bq = new PriorityBlockingQueue<Integer>();  
6:  bq = new SynchronousQueue<Integer>();  

Next we have Callable interface which is similar to Runnable with a clear distinction. Callable return a value.

1:  ExecutorService executorService = Executors.newFixedThreadPool(1);  
2:    
3:  Future<Integer> future = executorService.submit(new Summer(11,22));  
4:    
5:  try {  
6:     Integer total = future.get();  
7:     System.out.println("sum " + total);  
8:  } catch (Exception e) {  
9:     e.printStackTrace();  
10:  }  
11:    

As can be read above, Summer is a implementation of interface Callable and it is submitted to an executor service to be execute upon on. CompletableFuture.AsynchronousCompletionTask is a interesting interface and its official documentation said "A marker interface identifying asynchronous tasks produced by async methods. This may be useful for monitoring, debugging, and tracking asynchronous activities."

1:  CompletableFuture<Integer> cf = new CompletableFuture<Integer>();  
2:  ForkJoinPool.commonPool().submit(  
3:        (Runnable & CompletableFuture.AsynchronousCompletionTask)()->{  
4:      try {  
5:         cf.complete(1);  
6:      } catch (Exception e) {  
7:         cf.completeExceptionally(e);  
8:      }  
9:   });  

As can be read above, we submit a anonymous function to the ForkJoinPool where this anonymous function cast into the intersection of interface Runnable and CompletableFuture.AsynchronousCompletionTask. Moving on, we have Interface CompletionService. Now if you have a long running service, you might want to look into this interface. Example as can be read below.

1:  CompletionService<Integer> longRunningCompletionService = new ExecutorCompletionService<Integer>(executorService);  
2:    
3:  longRunningCompletionService.submit(() -> {System.out.println("done"); return 1;});  
4:    
5:  try {  
6:     Future<Integer> result = longRunningCompletionService.take();  
7:     System.out.println(result.get());  
8:  } catch (Exception e) {  
9:     // TODO Auto-generated catch block  
10:     e.printStackTrace();  
11:  }  

Persist the object longRunningCompletionService throughout your application and the result can be retrieve in the future. Pretty handy. Moving on, we have a new Interface CompletionStage which debut on jdk8. From CompletionStage javadoc, A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes. A stage completes upon termination of its computation, but this may in turn trigger other dependent stages.

Example code using CompletionStage as of following.

1:  ListenableFuture<String> springListenableFuture = createSpringListenableFuture();  
2:    
3:  CompletableCompletionStage<Object> completionStage = factory.createCompletionStage();  
4:  springListenableFuture.addCallback(new ListenableFutureCallback<String>() {  
5:    @Override  
6:    public void onSuccess(String result) {  
7:       System.out.println("onSuccess called");  
8:      completionStage.complete(result);  
9:    }  
10:    @Override  
11:    public void onFailure(Throwable t) {  
12:       System.out.println("onFailure called");  
13:      completionStage.completeExceptionally(t);  
14:    }  
15:  });  
16:    
17:  completionStage.thenAccept(System.out::println);  

Until here, if you don't understand, you should start to take the code and start to work on it. In this concurrent package, we have two Maps to use, that is ConcurrentMap and ConcurrentNavigableMap.

1:  ConcurrentMap<String, String> cm = new ConcurrentHashMap();  
2:  cm = new ConcurrentSkipListMap<String, String>();  
3:    
4:  ConcurrentNavigableMap<String, String> cnm = new ConcurrentSkipListMap<String, String>();  

ConcurrentMap providing thread safety and atomicity guarantees whilst ConcurrentNavigableMap support additional supporting NavigableMap operations, and recursively so for its navigable sub-maps. Then we have interface Delayed.

1:  Random random = new Random();  
2:  int delay = random.nextInt(10000);  
3:  Delayed employer = new SalaryDelay("a lot of bs reasons", delay);  
4:  System.out.println("bullshit delay this time " + employer.getDelay(TimeUnit.SECONDS));  

Delayed is an interface where you should implemented two require methods, getDelay and compareTo. As you can read fundamentally is you have a few object which feed to the executor to process upon but not immmediate, it may get delay for any reasons. As an example, pretty common in the working world, emplayer delay salary for any reasons.

Last two interfaces is related to each other where ExecutorService is a sub interface of Executor. For ExecutorService, we have seen an example above and below is for Executor,

1:  Executor executor = new ForkJoinPool();  
2:  executor = new ScheduledThreadPoolExecutor(1);  
3:    
4:  BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(4);  
5:  executor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.SECONDS, blockingQueue);  

We see there are many classes implemented Executor interface. Executor interface guaranteed


  • An object that executes submitted Runnable tasks. 
  • Executor interface does not strictly require that execution be asynchronous


That's it for this article, we continue the rest in the next article!

Oh before that, you can download the full source at the follow links

https://github.com/jasonwee/videoOnCloud/commit/3d291610df89c610215b8ecdbcc59cbb028ba14b
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/LearnConcurrentInterfaceP1.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/SalaryDelay.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/Summer.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/CompletableFuture/LearnCompletionStage.java

No comments:

Post a Comment