Showing posts with label interfaces. Show all posts
Showing posts with label interfaces. Show all posts

Saturday, December 5, 2015

Learn java util concurrent part3

This series is the next learning series of java.util.concurrent. You should read the first part and second part too. Today we will learn remaining ten interfaces in package java.util.concurrent.

Okay, let's start with ForkJoinPool.ForkJoinWorkerThreadFactory. For programming wise, you should not be worry as the class ForkJoinPool takes care of this implementation. Within class ForkJoinPool, we see that, there are two classes


  • DefaultForkJoinWorkerThreadFactory
  • InnocuousForkJoinWorkerThreadFactory

which implement ForkJoinWorkerThreadFactory with access modifier to default. So unless you know what you want and you know how ForkJoinPool work, then subclass ForkJoinPool away. For beginner in this article, it is sufficient to just use ForkJoinPool.

ManagedBlocker is an interface for extending managed parallelism for tasks running in ForkJoinPools. There are two methods to be implemented, block() and isReleasable()

1:  public class QueueManagedBlocker<T> implements ManagedBlocker {  
2:       
3:     final BlockingQueue<T> queue;  
4:     volatile T value = null;  
5:       
6:     QueueManagedBlocker(BlockingQueue<T> queue) {  
7:        this.queue = queue;  
8:     }  
9:    
10:     @Override  
11:     public boolean block() throws InterruptedException {  
12:        if (value == null)  
13:           value = queue.take();  
14:        return true;  
15:     }  
16:    
17:     @Override  
18:     public boolean isReleasable() {  
19:        return value != null || (value = queue.poll()) != null;  
20:     }  
21:       
22:     public T getValue() {  
23:        return value;  
24:     }  
25:    
26:  }  

Next, we have interface Future<V> which we have see before in the previous learning series.

1:  ExecutorService executorService = Executors.newFixedThreadPool(1);  
2:  Future<Integer> future = executorService.submit(new Summer(11,22));  

It's very clear you can obtain the result via future variable above. Interface RejectedExecutionHandler is mostly for error handling.

1:  RejectedExecutionHandler executionHandler = new MyRejectedExecutionHandlerImpl();  
2:  ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 10, TimeUnit.SECONDS, worksQueue, executionHandler);  
3:    
4:  public class MyRejectedExecutionHandlerImpl implements RejectedExecutionHandler {  
5:    
6:     @Override  
7:     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
8:        System.out.println(r.toString() + " : I've been rejected !");  
9:     }  
10:    
11:  }  

So you can set the implementation classed to ThreadPoolExecutor and if a task cannot be executor by the ThreadPoolExecutor, rejectedExecution will be executed. Moving onto the next interface, RunnableFuture<V> .

1:  RunnableFuture<Integer> rf = new FutureTask<Integer>(new Summer(22,33));  

so we see an initialization of object FutureTask with a callable class Summer class which we created in the previous learning series. Interface RunnableScheduledFuture which extend the previous interface RunnableFuture has another additional method to implement upon on.

1:  RunnableScheduledFuture<Integer> rsf = new Summer1();  
2:  System.out.println(rsf.isPeriodic());RunnableFuture<Integer> rf = new FutureTask<Integer>(new Summer(22,33));  

In the class Summer1, you should determine if the class is periodic or not. ScheduledExecutorService is pretty common if you google this interface and given the code below.

1:  ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);  
2:  scheduler.scheduleAtFixedRate(() -> System.out.println("hihi"), 1, 1, TimeUnit.SECONDS);  
3:  Thread.sleep(3000);  
4:  scheduler.shutdown();  

so we see a thread is executed every second.

1:  ScheduledFuture<Integer> sf = new ScheduledFutureImpl();  
2:  sf.isCancelled();  

ScheduledFuture<V> is a delayed result-bearing action that can be cancelled. Usually a scheduled future is the result of scheduling a task with a ScheduledExecutorService. This class is pretty common if you have a future task which get delay for whatever reason or it may get cancel, you want to look further into this class.

ThreadFactory is another interface which creates new threads on demand. Using thread factories removes hardwiring of calls to new Thread, enabling applications to use special thread subclasses, priorities, etc.

1:  ThreadFactory tf = Executors.defaultThreadFactory();  
2:  tf.newThread(()->System.out.println("ThreadFactory")).start();  

In this last series, we take a look at the last interface, TransferQueue.  A TransferQueue may be useful for example in message passing applications in which producers sometimes (using method transfer(E)) await receipt of elements by consumers invoking take or poll, while at other times enqueue elements (via method put) without waiting for receipt.

1:  TransferQueue<Integer> tq = new LinkedTransferQueue<Integer>();  

That's it for this learning series. Thank you. Oh, and the source code.

https://github.com/jasonwee/videoOnCloud/commit/ce479e5befaf7abe84d3d85930d5196a639e2643

https://github.com/jasonwee/videoOnCloud/blob/master/src/java/org/just4fun/concurrent/MyRejectedExecutionHandlerImpl.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/org/just4fun/concurrent/ExampleThreadPoolExecutor.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/org/just4fun/concurrent/DemoExecutor.java

https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/LearnConcurrentInterfaceP2.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/QueueManagedBlocker.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/ScheduledFutureImpl.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/Summer1.java


Friday, December 4, 2015

Learn java util concurrent part2

This series is the next learning series of java.util.concurrent. You should read the first part here too. Today we will learn ten interfaces in package java.util.concurrent.

Okay, let's start on the queue interface, BlockingDeque. Some characteristics of this interface including

  • blocking
  • thread safe
  • does not permit null elements
  • may (or may not) be capacity-constrained.

and we can do adding/removing item to this queue. Example below.

1:  bd.add(1);  
2:  bd.add(2);  
3:  System.out.println("size: " + bd.size());  
4:          
5:  bd.add(3);  
6:  System.out.println("size: " + bd.size());  
7:  //bd.add(4); // exception  
8:          
9:  bd.forEach(s -> System.out.println(s));  

Try play around this class with different methods to get a basic understanding on it. Next, we have a similar queue called BlockingQueue. It's characters same as BlockingDeque, not sure where is the different. But official java has many classes implement this BlockingQueue.

1:  BlockingQueue<Integer> bq = new ArrayBlockingQueue<Integer>(10);  
2:  bq = new DelayQueue();  
3:  bq = new LinkedBlockingDeque<Integer>();  
4:  bq = new LinkedTransferQueue<Integer>();  
5:  bq = new PriorityBlockingQueue<Integer>();  
6:  bq = new SynchronousQueue<Integer>();  

Next we have Callable interface which is similar to Runnable with a clear distinction. Callable return a value.

1:  ExecutorService executorService = Executors.newFixedThreadPool(1);  
2:    
3:  Future<Integer> future = executorService.submit(new Summer(11,22));  
4:    
5:  try {  
6:     Integer total = future.get();  
7:     System.out.println("sum " + total);  
8:  } catch (Exception e) {  
9:     e.printStackTrace();  
10:  }  
11:    

As can be read above, Summer is a implementation of interface Callable and it is submitted to an executor service to be execute upon on. CompletableFuture.AsynchronousCompletionTask is a interesting interface and its official documentation said "A marker interface identifying asynchronous tasks produced by async methods. This may be useful for monitoring, debugging, and tracking asynchronous activities."

1:  CompletableFuture<Integer> cf = new CompletableFuture<Integer>();  
2:  ForkJoinPool.commonPool().submit(  
3:        (Runnable & CompletableFuture.AsynchronousCompletionTask)()->{  
4:      try {  
5:         cf.complete(1);  
6:      } catch (Exception e) {  
7:         cf.completeExceptionally(e);  
8:      }  
9:   });  

As can be read above, we submit a anonymous function to the ForkJoinPool where this anonymous function cast into the intersection of interface Runnable and CompletableFuture.AsynchronousCompletionTask. Moving on, we have Interface CompletionService. Now if you have a long running service, you might want to look into this interface. Example as can be read below.

1:  CompletionService<Integer> longRunningCompletionService = new ExecutorCompletionService<Integer>(executorService);  
2:    
3:  longRunningCompletionService.submit(() -> {System.out.println("done"); return 1;});  
4:    
5:  try {  
6:     Future<Integer> result = longRunningCompletionService.take();  
7:     System.out.println(result.get());  
8:  } catch (Exception e) {  
9:     // TODO Auto-generated catch block  
10:     e.printStackTrace();  
11:  }  

Persist the object longRunningCompletionService throughout your application and the result can be retrieve in the future. Pretty handy. Moving on, we have a new Interface CompletionStage which debut on jdk8. From CompletionStage javadoc, A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes. A stage completes upon termination of its computation, but this may in turn trigger other dependent stages.

Example code using CompletionStage as of following.

1:  ListenableFuture<String> springListenableFuture = createSpringListenableFuture();  
2:    
3:  CompletableCompletionStage<Object> completionStage = factory.createCompletionStage();  
4:  springListenableFuture.addCallback(new ListenableFutureCallback<String>() {  
5:    @Override  
6:    public void onSuccess(String result) {  
7:       System.out.println("onSuccess called");  
8:      completionStage.complete(result);  
9:    }  
10:    @Override  
11:    public void onFailure(Throwable t) {  
12:       System.out.println("onFailure called");  
13:      completionStage.completeExceptionally(t);  
14:    }  
15:  });  
16:    
17:  completionStage.thenAccept(System.out::println);  

Until here, if you don't understand, you should start to take the code and start to work on it. In this concurrent package, we have two Maps to use, that is ConcurrentMap and ConcurrentNavigableMap.

1:  ConcurrentMap<String, String> cm = new ConcurrentHashMap();  
2:  cm = new ConcurrentSkipListMap<String, String>();  
3:    
4:  ConcurrentNavigableMap<String, String> cnm = new ConcurrentSkipListMap<String, String>();  

ConcurrentMap providing thread safety and atomicity guarantees whilst ConcurrentNavigableMap support additional supporting NavigableMap operations, and recursively so for its navigable sub-maps. Then we have interface Delayed.

1:  Random random = new Random();  
2:  int delay = random.nextInt(10000);  
3:  Delayed employer = new SalaryDelay("a lot of bs reasons", delay);  
4:  System.out.println("bullshit delay this time " + employer.getDelay(TimeUnit.SECONDS));  

Delayed is an interface where you should implemented two require methods, getDelay and compareTo. As you can read fundamentally is you have a few object which feed to the executor to process upon but not immmediate, it may get delay for any reasons. As an example, pretty common in the working world, emplayer delay salary for any reasons.

Last two interfaces is related to each other where ExecutorService is a sub interface of Executor. For ExecutorService, we have seen an example above and below is for Executor,

1:  Executor executor = new ForkJoinPool();  
2:  executor = new ScheduledThreadPoolExecutor(1);  
3:    
4:  BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(4);  
5:  executor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.SECONDS, blockingQueue);  

We see there are many classes implemented Executor interface. Executor interface guaranteed


  • An object that executes submitted Runnable tasks. 
  • Executor interface does not strictly require that execution be asynchronous


That's it for this article, we continue the rest in the next article!

Oh before that, you can download the full source at the follow links

https://github.com/jasonwee/videoOnCloud/commit/3d291610df89c610215b8ecdbcc59cbb028ba14b
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/LearnConcurrentInterfaceP1.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/SalaryDelay.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/Summer.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/CompletableFuture/LearnCompletionStage.java