Showing posts with label java. Show all posts
Showing posts with label java. Show all posts

Monday, June 21, 2021

Continuous Integration with GitHub Action

This is a contribution to the SUSE Cloud Native Foundations Scholarship Program which I received and some of the peers want a CI online demo. So here I will share on my knowledge and experience on continuous integration using a public repository and public runner. I will start with a sample project creation, GitHub project setup and end with GitHub action setup and runner. 


What is CI? 

In software engineering, continuous integration (CI) is the practice of merging  all developers' working copies to a shared mainline several times a day.[1]


So essentially, it is a routine where every developer has to go through after they made code changes. Example, syntax check, linting, code compiling, multiple tests, package building, perhaps also support different runtime versions and/or operating systems.

There are many CI software available, please choose the one which match your requirements. I particularly like Jenkins a lot but Jenkins would require you to setup the runner on your machine. Since the objective of this blog is all public, hence, I picked GitHub action.

Let's begin with initial project setup. Reference if you want to know more.

https://maven.apache.org/guides/getting-started/maven-in-five-minutes.html

https://docs.github.com/en/actions/guides/building-and-testing-java-with-maven


```

$ export JAVA_HOME=/usr/lib/jvm/jdk-11.0.5/

$ mvn --version

Apache Maven 3.6.3

Maven home: /usr/share/maven

Java version: 11.0.5, vendor: Oracle Corporation, runtime: /usr/lib/jvm/jdk-11.0.5

Default locale: en_US, platform encoding: UTF-8

OS name: "linux", version: "5.10.0-7-amd64", arch: "amd64", family: "unix"

$ mvn archetype:generate -DgroupId=ch.weetech.app -DartifactId=demo_ci -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false

[INFO] Scanning for projects...

[INFO] 

[INFO] ------------------< org.apache.maven:standalone-pom >-------------------

[INFO] Building Maven Stub Project (No POM) 1

[INFO] --------------------------------[ pom ]---------------------------------

...

...

[INFO] ------------------------------------------------------------------------

[INFO] BUILD SUCCESS

[INFO] ------------------------------------------------------------------------

[INFO] Total time:  5.555 s

[INFO] Finished at: 2021-06-20T14:53:50+08:00

[INFO] ------------------------------------------------------------------------

$ cd demo_ci

$ tree .

.

├── pom.xml

└── src

    ├── main

    │   └── java

    │       └── ch

    │           └── weetech

    │               └── app

    │                   └── App.java

    └── test

        └── java

            └── ch

                └── weetech

                    └── app

                        └── AppTest.java


11 directories, 3 files

$ mvn package


[INFO] Scanning for projects...

[INFO] 

[INFO] -----------------------< ch.weetech.app:demo_ci >-----------------------

[INFO] Building demo_ci 1.0-SNAPSHOT

[INFO] --------------------------------[ jar ]---------------------------------

[INFO] 

[INFO] --- maven-resources-plugin:3.0.2:resources (default-resources) @ demo_ci ---

[INFO] Using 'UTF-8' encoding to copy filtered resources.

...

...

...

[INFO] ------------------------------------------------------------------------

[INFO] BUILD SUCCESS

[INFO] ------------------------------------------------------------------------

[INFO] Total time:  2.221 s

[INFO] Finished at: 2021-06-20T14:56:41+08:00

[INFO] ------------------------------------------------------------------------

$ java -cp target/demo_ci-1.0-SNAPSHOT.jar ch.weetech.app.App

Hello World!

```

Once we have the sample project setup, test and compile code locally okay. Then it is time that we initialize git repository locally and push to GitHub. But before  we do of the following, you need to create a new repository on your GitHub.


```

$ mvn clean

[INFO] Scanning for projects...

[INFO] 

[INFO] -----------------------< ch.weetech.app:demo_ci >-----------------------

[INFO] Building demo_ci 1.0-SNAPSHOT

[INFO] --------------------------------[ jar ]---------------------------------

[INFO] ------------------------------------------------------------------------

[INFO] BUILD SUCCESS

[INFO] ------------------------------------------------------------------------

[INFO] Total time:  2.079 s

[INFO] Finished at: 2021-06-20T15:24:46+08:00

[INFO] ------------------------------------------------------------------------

$ git init 

hint: Using 'master' as the name for the initial branch. This default branch name

hint: is subject to change. To configure the initial branch name to use in all

hint: of your new repositories, which will suppress this warning, call:

hint: 

hint: git config --global init.defaultBranch <name>

hint: 

hint: Names commonly chosen instead of 'master' are 'main', 'trunk' and

hint: 'development'. The just-created branch can be renamed via this command:

hint: 

hint: git branch -m <name>

$ git status -m master

$ git status .

On branch master


No commits yet


Untracked files:

  (use "git add <file>..." to include in what will be committed)

pom.xml

src/


nothing added to commit but untracked files present (use "git add" to track)

$ git add pom.xml src/

$ git commit -m "initial"

[master (root-commit) 2cd8641] initial

 3 files changed, 108 insertions(+)

 create mode 100644 pom.xml

 create mode 100644 src/main/java/ch/weetech/app/App.java

 create mode 100644 src/test/java/ch/weetech/app/AppTest.java

$ git remote add origin https://github.com/jasonwee/demo_ci.git

$ git branch -M main

$ git push -u origin main

Enumerating objects: 16, done.

Counting objects: 100% (16/16), done.

Delta compression using up to 16 threads

Compressing objects: 100% (6/6), done.

Writing objects: 100% (16/16), 1.76 KiB | 899.00 KiB/s, done.

Total 16 (delta 0), reused 0 (delta 0), pack-reused 0

To https://github.com/jasonwee/demo_ci.git

 * [new branch]      main -> main

Branch 'main' set up to track remote branch 'main' from 'origin'.

$ git status .

On branch main

Your branch is up to date with 'origin/main'.


nothing to commit, working tree clean


```

Now that you pushed everything to GitHub, next is to setup GitHub Actions.

1. Locate Actions from the project tab.

2. Picked 'Java with Maven' as this best match the current requirement.

3. Check action configurations are okay and commit it. 



4. once you commit, the action will kickstart the first run.


Congratulation! This is the first CI that you setup. The rest, as you may have already guessed will be the same as every developer will do, i.e. making code changes.

```

$ vim README.md

$ cat README.md

a contribution to https://www.udacity.com/scholarships/suse-cloud-native-foundations-scholarship

$ git status .

On branch main

Your branch is up to date with 'origin/main'.


Untracked files:

  (use "git add <file>..." to include in what will be committed)

README.md


nothing added to commit but untracked files present (use "git add" to track)

$ git add README.md 

$ git commit -m "added readme" README.md 

[main a966f09] added readme

 1 file changed, 1 insertion(+)

 create mode 100644 README.md

$ git push

Enumerating objects: 4, done.

Counting objects: 100% (4/4), done.

Delta compression using up to 16 threads

Compressing objects: 100% (3/3), done.

Writing objects: 100% (3/3), 410 bytes | 410.00 KiB/s, done.

Total 3 (delta 0), reused 0 (delta 0), pack-reused 0

To https://github.com/jasonwee/demo_ci.git

   a273126..a966f09  main -> main

```





That's it, in case you figure where can I find this repository to begin mine? https://github.com/jasonwee/demo_ci 


Wednesday, January 30, 2019

Java Roadmap


* JEP 389: Foreign Linker API (Incubator)
* JEP 396: Strongly Encapsulate JDK Internals by Default
* JEP 393: Foreign-Memory Access API (Third Incubator
* JEP 390: Warnings for Value-based Classes
* Add InvocationHandler::invokeDefault Method for Proxy's Default Method Support
* JEP 380: Unix domain sockets
* Day Period Support Added to java.time Formats
* Add Stream.toList() Method
* JEP 338: Vector API (Incubator)
* Improved CompileCommand Flag
* JEP 376: ZGC Concurrent Stack Processing
* Concurrently Uncommit Memory in G1
* New jdk.ObjectAllocationSample Event Enabled by Default
* JEP 387: Elastic Metaspace
* Signed JAR Support for RSASSA-PSS and EdDSA
* SUN, SunRsaSign, and SunEC Providers Supports SHA-3 Based Signature Algorithms
* jarsigner Preserves POSIX File Permission and symlink Attributes
* Added -trustcacerts and -keystore Options to keytool -printcert and -printcrl Commands
* SunPKCS11 Provider Supports SHA-3 Related Algorithms
* Improve Certificate Chain Handling
* Improve Encoding of TLS Application-Layer Protocol Negotiation (ALPN) Values
* TLS Support for the EdDSA Signature Algorithm
* JEP 397: Sealed Classes (Second Preview)
* JEP 395: Records
* JEP 394: Pattern Matching for instanceof
* JEP 392: Packaging Tool
* Removal of java.awt.PeerFixer
* Removal of Experimental Features AOT and Graal JIT
* Deprecated Tracing Flags Are Obsolete and Must Be Replaced With Unified Logging Equivalents
* Removed Root Certificates with 1024-bit Keys
* Removal of Legacy Elliptic Curves
* Terminally Deprecated ThreadGroup stop, destroy, isDestroyed, setDaemon and isDaemon
* Parts of the Signal-Chaining API Are Deprecated
* Deprecated the java.security.cert APIs That Represent DNs as Principal or String Objects
* Line Terminator Definition Changed in java.io.LineNumberReader
* Enhanced Support of Proxy Class
* Module::getPackages Returns the Set of Package Names in This Module
* Support Supplementary Characters in String Case Insensitive Operations
* Proxy Classes Are Not Open for Reflective Access
* The Default HttpClient Implementation Returns Cancelable Futures
* HttpPrincipal::getName Returned Incorrect Name
* HttpClient.newHttpClient and HttpClient.Builder.build Might Throw UncheckedIOException
* NullPointerException Not Thrown When First Argument to Path.of or Paths.get Is null
* Incomplete Support for Unix Domain Sockets in Windows 2019 Server
* US/Pacific-New Zone Name Removed as Part of tzdata2020b
* Argument Index of Zero or Unrepresentable by int Throws IllegalFormatException.
* GZIPOutputStream Sets the GZIP OS Header Field to the Correct Default Value
* Refine ZipOutputStream.putNextEntry() to Recalculate ZipEntry's Compressed Size
* java.util.logging.LogRecord Updated to Support Long Thread IDs
* TreeMap.computeIfAbsent Mishandles Existing Entries Whose Values Are null
* Support for CLDR Version 38
* Added Property to Control LDAP Authentication Mechanisms Allowed to Authenticate Over Clear Connections
* LDAP Channel Binding Support for Java GSS/Kerberos
* Make JVMTI Table Concurrent
* IncompatibleClassChangeError Exceptions Are Thrown For Failing 'final' Checks When Defining a Class
* Object Monitors No Longer Keep Strong References to Their Associated Object
* Added 3 SSL Corporation Root CA Certificates
* Added Entrust Root Certification Authority - G4 certificate
* Upgraded the Default PKCS12 Encryption and MAC Algorithms
* Disable TLS 1.0 and 1.1
* C-Style Array Declarations Are Not Allowed in Record Components
* Annotation Interfaces May Not Be Declared As Local Interfaces
* DocLint Support Moved to jdk.javadoc Module
* Eliminating Duplication in Simple Documentation Comments
* Viewing API Documentation on Small Devices
* API Documentation Links to Platform Documentation
* Improvements for JavaDoc Search

jdk15

* Unicode support to 13.0
* Hidden Classes
* Added Support for SO_INCOMING_NAPI_ID Support
* Specialized Implementations of TreeMap Methods
* Added Ability to Configure Third Port for Remote JMX
* New Option Added to jstatd for Specifying RMI Connector Port Number
* New Option Added to jcmd for Writing a gzipped Heap Dump
* Text Blocks
* New Options Added to jhsdb for debugd Mode
* Oracle JDK Installer for Windows Provides Executables (javac, etc) in a Path Reachable From Any Command Prompt
* Added Revocation Checking to jarsigner
* Tools Warn If Weak Algorithms Are Used Before Restricting Them
* SunJCE Provider Supports SHA-3 Based Hmac Algorithms
* New System Properties to Configure the TLS Signature Schemes
* Support for certificate_authorities Extension
* Support for canonicalize in krb5.conf
* Removal of Terminally Deprecated Solaris-specific SO_FLOW_SLA Socket Option
* Removal of RMI Static Stub Compiler (rmic)
* Removal of Deprecated Constant RMIConnectorServer.CREDENTIAL_TYPES
* Removal of Nashorn JavaScript Engine
* Obsolete -XXUseAdaptiveGCBoundary
* Removal of Comodo Root CA Certificate
* Removal of DocuSign Root CA Certificate
* Retired the Deprecated SSLSession.getPeerCertificateChain() Method Implementation
* Removal of com.sun.net.ssl.internal.ssl.Provider Name
* Deprecated RMI Activation for Removal
* Deprecated NSWindowStyleMaskTexturedBackground
* Deprecated -XXForceNUMA Option
* Disabled Biased-locking and Deprecated Biased-locking Flags
* Disabled Native SunEC Implementation by Default
* Added forRemoval=true to Previously Deprecated ContentSigner APIs
* Workaround for Windows GDI API's memory restrictions
* java.awt.Robot.delay() Method Completes With Interrupt Status Set When Interrupted
* Improved Serialization Handling
* Optimized Empty Substring Handling
* LookupdefineClass Links the Class
* DatagramSocketdisconnect Allows an Implementation to Throw UncheckedIOException
* java.net.HttpClient Does Not Override Protocols Specified in SSLContext Default Parameters
* Filtering and Ordering of Addresses Returned by Alternative Hosts File Name Service Provider
* DatagramPacket.getPort() Returns 0 When the Port Is Not Set
* Modified the MS950 charset Encoder's Conversion Table
* Support Monetary Grouping Separator in DecimalFormat/DecimalFormatSymbols
* localizedBy() Overrides Localized Values With Default Values
* ValueRange.of(long, long, long) Does Not Throw IAE on Invalid Inputs
* Performance Improvement for InflaterOutputStream.write
* Case Insensitive Matching Doesn't Work Correctly for Some Character Classes
* Better Listing of Arrays
* Support for CLDR version 37
* Localized Time Zone Name Inconsistency Between English and Other Locales
* [macos] Support for Notarizing jpackage app-image and dmg
* Flags Controlling C1 Inlining Have New Names
* Improved Ergonomics for G1 Heap Region Size
* ZGC A Scalable Low-Latency Garbage Collector (Production)
* Disabling large pages on Windows
* Disabling NUMA Interleaving on Windows
* Field Layout Computation Changed
* Enable ShowCodeDetailsInExceptionMessages by default
* Signature and SignatureSpi Get Parameter Methods May Return null When Unsupported
* SunPKCS11 Initialization With NSS When External FIPS Modules Are in Security Modules Database
* Default SSLEngine Should Create in Server Role
* Pattern Matching for instanceof (Second Preview)
* Standard Doclet Index Files Compression

jdk14
* JDK Flight Recorder event streaming provides an API for the continuous consumption of JFR data from both in-process and out-of-process applications.
* The planned improvement to NullPointerExceptions pertains to improving the usability of the exceptions generated by the JVM by describing exactly which variable was null.
* Non-volatile mapped byte buffers would add new JDK-specific file mapping modes that allow the FileChannel API to be used to create MappedByteBuffer instances that refer to non-volatile memory (NVM).
* Enhance the language with pattern matching for the instanceof operator. This would be a preview feature in JDK 14.
* Switch expressions simplify coding by extending switch so that it can be used as either a statement or an expression.
* NUMA-aware memory allocation for the G1 garbage collector, intended to improve G1 performance on large machines.
* Removal of the Concurrent Mark Sweep (CMS) garbage collector, which previously was deprecated and slated for removal. Successors to CMS have arisen including ZGC and Shenandoah.
* Porting of ZGC to MacOS. It has been supported only on Linux thus far.
* Removal of the pack200 and unpack200 tools and the Pack200 API in the java.util.jar package.
* Records https://openjdk.java.net/jeps/359
* Deprecating the combination of the Parallel Scavenge and Serial Old garbage collection algorithms.
* Porting of the ZGC (Z Garbage Collector) to Windows.
* Foreign-memory access API, with the introduction of an API for Java programs to safely and efficiently access foreign memory outside of the Java heap.
* Deprecation of the Solaris/Sparc, Solaris/x64, and Linux/Sparc ports, with the intent to remove them in a future release.



jdk13
* text block
* a reimplementation of the legacy socket API
* switch expressions
* enhancements to the ZGC (Z Garbage Collector)
* extending application class-data sharing (AppCDS) to enable dynamic archiving of classes at the end of application execution.


jdk 12
* switch expressions
https://openjdk.java.net/projects/jdk/12/


jdk 11
* lts
* dynamic class file constants
* converged binaries, oracle jdk & open jdk
* opensource flight recorder
* opensource mission control
* browser plugin removed
* java web start removed
* javafx removed from jdk and replace as a lib
javafx.* [8-10]
javafx.css [9-10]
javafx.css.converter [9-10]
javafx.fxml [9-10]
javafx.scene [9-10]
javafx.util [9-10]
* epsilon garbage collector
* improve aarch64 intrinsics
* low overhead heap profiling
* http client
   The Standard HTTP Client has been moved from jdk.incubator.http to java.net.http:
    java.net.http.HttpClient
java.net.http.HttpClient$Builder
java.net.http.HttpClient$Redirect
java.net.http.HttpClient$Version
java.net.http.HttpConnectTimeoutException
java.net.http.HttpHeaders
java.net.http.HttpRequest
java.net.http.HttpRequest$BodyPublisher
java.net.http.HttpRequest$BodyPublishers
java.net.http.HttpRequest$Builder
java.net.http.HttpResponse
java.net.http.HttpResponse$BodyHandler
java.net.http.HttpResponse$BodyHandlers
java.net.http.HttpResponse$BodySubscriber
java.net.http.HttpResponse$BodySubscribers
java.net.http.HttpResponse$PushPromiseHandler
java.net.http.HttpResponse$ResponseInfo
java.net.http.HttpTimeoutException
java.net.http.WebSocket
java.net.http.WebSocket$Builder
java.net.http.WebSocket$Listener
java.net.http.WebSocketHandshakeException
* extend local-variable syntax
* unicode 10 support
* launch single file source code
* shebang
* transport layer security tls 1.3
* zgc
* deprecate nashorn javascript engine
* key agreement with curve25519 and curve448
   JEP 324: Key Agreement with Curve25519 and Curve448 comes with a few classes,
java.security.interfaces.XECKey
java.security.interfaces.XECPrivateKey
java.security.interfaces.XECPublicKey
java.security.spec.NamedParameterSpec
java.security.spec.XECPrivateKeySpec
java.security.spec.XECPublicKeySpec
* chacha20 and poly1305 cryptographic algorithms
* optional.isEmpty()
* character.toString(int)
* String, isBlank(), lines(), repeat(int), strip(), stripLeading(), stripTrailing()
* predicate not
* java ee and corba module are dropped
javax.activation [6-10]
javax.activity [5-10]
javax.annotation [6-10]
javax.jnlp [5-10]
javax.jws [6-10]
javax.rmi.CORBA [3-10]
javax.security.auth.Policy [4-10]
javax.transaction [3-10]
javax.xml.bind [6-10]
javax.xml.soap [6-10]
javax.xml.ws [6-10]
jdk.management.cmm.SystemResourcePressureMXBean [8-10]
jdk.management.resource [8-10]
jdk.packager.services.singleton [only 10]
jdk.packager.services.userjvmoptions [9-10]
org.omg.CORBA [2-10]
*

jdk 10
* local variable type inference
* parallel full gc for g1
* application class data sharing
* experimental java based jit compiler (graal)
* root certificates
* consolidate jdk forests into single repo
* heap allocation on alternative devices (intel)
* remove javah tool
* garbage collector interface (red hat)
* thread local handshakes
* list, set, map.copyOf(collection)
* collectors, toUnmodifiableList, toUnmodifiableMap, toUnmodifiableSet
* Optional.orElseThrow()
* jvm now more docker container aware
*


jdk 9

* Java Platform Module System
* Java flow API

jdk 8
* lts
* lambda




Saturday, July 16, 2016

Initial learning into apache cassandra paxos

Recently I have been reading into apache lightweight transaction in cassandra 2.0 and interested into how it implemented in code level. From end user perspective, when you manupulating data either insert and update with if not exists, then internally, paxos operation willl be used.

An example of lightweight transaction.

 INSERT INTO USERS (login, email, name, login_count) values ('jbellis', 'jbellis@datastax.com', 'Jonathan Ellis', 1) IF NOT EXISTS  
   
 UPDATE users SET reset_token = null, password = ‘newpassword’ WHERE login = ‘jbellis’ IF reset_token = ‘some-generated-reset-token’  

Essentially the paxos how operation concerntrated in class StorageProxy.We read that from the code documentation,

There are three phases to Paxos:
1. Prepare: the coordinator generates a ballot (timeUUID in our case) and asks replicas to (a) promise
   not to accept updates from older ballots and (b) tell us about the most recent update it has already
   accepted.
2. Accept: if a majority of replicas reply, the coordinator asks replicas to accept the value of the
   highest proposal ballot it heard about, or a new value if no in-progress proposals were reported.
3. Commit (Learn): if a majority of replicas acknowledge the accept request, we can commit the new
   value.

So it involve a few operation before an insert and update can be perform and this is not something you want to replace in bulk operation call with. We see that in class StorageService, several paxos verbs are registered. You can find them mostly in the paxos packages. Following are some useful paxos classes.


Interesting in the class PaxosState, noticed the following
locks - an array of length 1024
call system keyspace class to load and save paxos state.

When you check into cassandra using cqlsh, you will find the following.

 cqlsh:jw_schema1> use system;  
 cqlsh:system> desc tables;  
   
 available_ranges     peers        paxos      range_xfers  
 batches          compaction_history batchlog    local     
 "IndexInfo"        sstable_activity  size_estimates hints     
 views_builds_in_progress peer_events     built_views    
   
 cqlsh:system> select * from paxos;  
   
  row_key | cf_id | in_progress_ballot | most_recent_commit | most_recent_commit_at | most_recent_commit_version | proposal | proposal_ballot | proposal_version  
 ---------+-------+--------------------+--------------------+-----------------------+----------------------------+----------+-----------------+------------------  
   
 (0 rows)  

You can also read the unit test for paxos in cassandra 2.0.17 as can be read here.

This compare and set (cas) operation looks interesting if you want to ensure the value only be created if it not exists, a nifty feature found in apache cassandra 2.0 onward. Feel free to explore further!

Sunday, June 5, 2016

java collection framework

Once I was asked by a company what is the data structure in java and i was not prepare at all, but as usual, why bother remembered every details when we can google and start to read. It turn out that the answer they are seeking is the java collection framework and the next question comes, like what are the characteristics of the collections.

Well, to be really honest, who go remember every details when we can read the javadoc? Anyway, recently I found this chart circulating in the facebook which remind me of the questions asked. So I thought this is helpful and we should not memorized every fine details but the essence point is you know where to get the material and willing to share.

So here goes!

































This is a short article and I hope you find this useful in your daily coding reference than use it to answer some funny questions. haha!





Friday, December 18, 2015

Learn java util concurrent part5

This is the last series of learning into java util concurrent package. If you have not read the series before, you can find part1, part2, part3 and part4 at the respectively links. In this series, we will study remaining 19 classes in java.util.concurrent package.

ForkJoinTask<V>

  • Abstract base class for tasks that run within a ForkJoinPool.

1:        ForkJoinTask<Integer> fjt = ForkJoinTask.adapt(new Summer(44,55));  
2:        fjt.invoke();  
3:        Integer sum = fjt.get();  
4:        System.out.println(sum);  
5:        System.out.println(fjt.isDone());  
6:        fjt.join();  

Noticed that a new callable class was adapted into ForkJoinTask. The execution is commenced with invokeking the task. You can check if the task is complete using isDone method.

ForkJoinWorkerThread

  • A thread managed by a ForkJoinPool, which executes ForkJoinTasks.

1:        ForkJoinWorkerThreadFactory customFactory = new ForkJoinWorkerThreadFactory() {  
2:           @Override  
3:           public ForkJoinWorkerThread newThread(ForkJoinPool pool) {  
4:              return null;  
5:           }  
6:        };  

As explained by ForkJoinWorkerThread javadoc, ForkJoinWorkerThreadFactory return a thread from the pool.

FutureTask<V>

  • A cancellable asynchronous computation.
  • A FutureTask can be used to wrap a Callable or Runnable object. Because FutureTask implements Runnable, a FutureTask can be submitted to an Executor for execution.

1:        FutureTask<Integer> ft = new FutureTask<Integer>(new Summer(66,77));  
2:        ft.run();  
3:        System.out.println(ft.get());  

If you have a long running task, you can use FutureTask so the task can be cancel. Next, we will go into another three queues.

LinkedBlockingDeque<E>

  • An optionally-bounded blocking deque based on linked nodes.
  • The capacity, if unspecified, is equal to Integer.MAX_VALUE.
  • Most operations run in constant time (ignoring time spent blocking). Exceptions include remove, removeFirstOccurrence, removeLastOccurrence, contains, iterator.remove(), and the bulk operations, all of which run in linear time.


LinkedBlockingQueue<E>

  • An optionally-bounded blocking queue based on linked nodes.
  • This queue orders elements FIFO (first-in-first-out).
  • The head of the queue is that element that has been on the queue the longest time.
  • The tail of the queue is that element that has been on the queue the shortest time.
  • New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.
  • The capacity, if unspecified, is equal to Integer.MAX_VALUE.


LinkedTransferQueue<E>

  • An unbounded TransferQueue based on linked nodes.
  • This queue orders elements FIFO (first-in-first-out) with respect to any given producer.
  • The head of the queue is that element that has been on the queue the longest time for some producer.
  • the size method is NOT a constant-time operation.

1:        LinkedBlockingDeque<Integer> lbd = new LinkedBlockingDeque<Integer>();  
2:        lbd.add(1);  
3:        lbd.add(2);  
4:        lbd.add(3);  
5:    
6:        LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<Integer>();  
7:        lbq.add(4);  
8:        lbq.add(5);  
9:        lbq.add(6);  
10:          
11:        LinkedTransferQueue<Integer> ltq = new LinkedTransferQueue<Integer>();  
12:        ltq.add(7);  
13:        ltq.add(8);  
14:        ltq.add(9);  

Like the queues we talked about in part4, these queues will shown its benefits under multithreaded codes.

Phaser

  • A reusable synchronization barrier, similar in functionality to CyclicBarrier and CountDownLatch but supporting more flexible usage.
  • This implementation restricts the maximum number of parties to 65535.


1:        Phaser phaser = new Phaser();  
2:        phaser.register();  
3:        System.out.println("current phase number : " + phaser.getPhase());  
4:        testPhaser(phaser, 2000);  
5:        testPhaser(phaser, 4000);  
6:        testPhaser(phaser, 6000);  
7:          
8:        phaser.arriveAndDeregister();  
9:        Thread.sleep(10000);  
10:        System.out.println("current phase number : " + phaser.getPhase());  

PriorityBlockingQueue<E>

  • An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations.
  • This class does not permit null elements.
  • The Iterator provided in method iterator() is not guaranteed to traverse the elements of the PriorityBlockingQueue in any particular order


1:        PriorityBlockingQueue<Integer> pbq = new PriorityBlockingQueue<Integer>();  
2:        pbq.add(10);  
3:        pbq.add(11);  
4:        pbq.add(12);  

RecursiveAction

  • A recursive resultless ForkJoinTask.


1:        long[] array = {1,3,2,5,4,9,5,7,8};  
2:        RecursiveAction ar = new SortTask(array);  
3:        ar.invoke();  
4:        System.out.println("array " + array[0]);  
5:        System.out.println("array " + array[1]);  
6:        System.out.println("array " + array[2]);  
7:        System.out.println("array " + array[3]);  
8:        System.out.println("array " + array[4]);  
9:        System.out.println("array " + array[5]);  
10:        System.out.println("array " + array[6]);  
11:        System.out.println("array " + array[7]);  
12:        System.out.println("array " + array[8]);  
13:    
14:     static class SortTask extends RecursiveAction {  
15:          
16:        final long[] array;  
17:        final int lo, hi;  
18:          
19:        SortTask(long[] array, int lo, int hi) {  
20:           this.array = array;  
21:           this.lo = lo;  
22:           this.hi = hi;  
23:        }  
24:          
25:        SortTask(long[] array) {  
26:           this(array, 0, array.length);  
27:        }  
28:    
29:        @Override  
30:        protected void compute() {  
31:           if (hi - lo < THRESHOLD)  
32:              sortSequentially(lo,hi);  
33:           else {  
34:              int mid = (lo + hi) >>> 1;  
35:              invokeAll(new SortTask(array, lo, mid), new SortTask(array, mid, hi));  
36:              merge(lo, mid, hi);  
37:           }  
38:        }  
39:          
40:        // implementation details follow:  
41:        static final int THRESHOLD = 1000;  
42:          
43:        void sortSequentially(int lo, int hi) {  
44:           Arrays.sort(array, lo, hi);  
45:        }  
46:          
47:        void merge(int lo, int mid, int hi) {  
48:           long[] buf = Arrays.copyOfRange(array, lo, mid);  
49:           for (int i = 0, j = lo, k = mid; i < buf.length; j++)  
50:              array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++];  
51:        }  
52:           
53:     }  

recursive sorting to the array by invoke commence to the object ar.

RecursiveTask<V>

  • A recursive result-bearing ForkJoinTask.


1:        RecursiveTask<Integer> fibo = new Fibonacci(10);  
2:        fibo.invoke();  
3:        System.out.println(fibo.get());  
4:          
5:     static class Fibonacci extends RecursiveTask<Integer> {  
6:          
7:        final int n;  
8:    
9:        Fibonacci(int n) {  
10:           this.n = n;  
11:        }  
12:    
13:        protected Integer compute() {  
14:           if (n <= 1)  
15:              return n;  
16:           Fibonacci f1 = new Fibonacci(n - 1);  
17:           f1.fork();  
18:           Fibonacci f2 = new Fibonacci(n - 2);  
19:           return f2.compute() + f1.join();  
20:        }  
21:     }  

ScheduledThreadPoolExecutor

  • A ThreadPoolExecutor that can additionally schedule commands to run after a given delay, or to execute periodically.


1:        ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(10);  
2:        Future<Integer> total = stpe.submit(new Summer(88,99));  
3:        System.out.println(total.get());  
4:        stpe.shutdown();  

Semaphore

  • A counting semaphore.
  • Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource.


1:        ConnectionLimiter cl = new ConnectionLimiter(3);  
2:        URLConnection conn = cl.acquire(new URL("http://www.google.com"));  
3:        conn = cl.acquire(new URL("http://www.yahoo.com"));  
4:        conn = cl.acquire(new URL("http://www.youtube.com"));  
5:        cl.release(conn);  
6:          
7:     static class ConnectionLimiter {  
8:        private final Semaphore semaphore;  
9:          
10:        private ConnectionLimiter(int max) {  
11:           semaphore = new Semaphore(max);  
12:        }  
13:          
14:        public URLConnection acquire(URL url) throws IOException, InterruptedException {  
15:           semaphore.acquire();  
16:           return url.openConnection();  
17:        }  
18:          
19:        public void release(URLConnection conn) {  
20:           try {  
21:              // blahblah  
22:           } finally {  
23:              semaphore.release();  
24:           }  
25:        }  
26:     }  

SynchronousQueue<E>

  • A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa.
  • This queue does not permit null elements.

1:        final SynchronousQueue<String> queue = new SynchronousQueue<String>();  
2:        Thread a = new Thread(new QueueProducer(queue));  
3:        a.start();  
4:        Thread b = new Thread(new QueueConsumer(queue));  
5:        b.start();  
6:          
7:        Thread.sleep(1000);  
8:          
9:        a.interrupt();  
10:        b.interrupt();  
11:          
12:          
13:     static class QueueProducer implements Runnable {  
14:          
15:        private SynchronousQueue<String> queue;  
16:    
17:        public QueueProducer(SynchronousQueue<String> queue) {  
18:           this.queue = queue;  
19:        }  
20:    
21:        @Override  
22:        public void run() {  
23:           String event = "SYNCHRONOUS_EVENT";  
24:           String another_event = "ANOTHER_EVENT";  
25:             
26:           try {  
27:              queue.put(event);  
28:              System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), event);  
29:                
30:              queue.put(another_event);  
31:              System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), another_event);  
32:           } catch (InterruptedException e) {  
33:           }  
34:             
35:        }  
36:          
37:     }  
38:       
39:     static class QueueConsumer implements Runnable {  
40:          
41:        private SynchronousQueue<String> queue;  
42:    
43:        public QueueConsumer(SynchronousQueue<String> queue) {  
44:           this.queue = queue;  
45:        }  
46:    
47:        @Override  
48:        public void run() {  
49:           try {  
50:              String event = queue.take();  
51:              // thread will block here  
52:              System.out.printf("[%s] consumed event : %s %n", Thread.currentThread().getName(), event);  
53:           } catch (InterruptedException e) {  
54:           }  
55:             
56:        }  
57:          
58:     }  

ThreadLocalRandom

  • A random number generator isolated to the current thread.


1:        ThreadLocalRandom tlr = ThreadLocalRandom.current();  
2:        System.out.println(tlr.nextInt());  

ThreadPoolExecutor

  • An ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods.


1:        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(4);  
2:        ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, blockingQueue);  

The last four classes are policies of the previous ThreadPoolExecutor which execute under specific condition.

ThreadPoolExecutor.AbortPolicy

  • A handler for rejected tasks that throws a RejectedExecutionException.


ThreadPoolExecutor.CallerRunsPolicy

  • A handler for rejected tasks that runs the rejected task directly in the calling thread of the execute method, unless the executor has been shut down, in which case the task is discarded.


ThreadPoolExecutor.DiscardOldestPolicy

  • A handler for rejected tasks that discards the oldest unhandled request and then retries execute, unless the executor is shut down, in which case the task is discarded.


ThreadPoolExecutor.DiscardPolicy

  • A handler for rejected tasks that silently discards the rejected task.


1:        ThreadPoolExecutor.AbortPolicy ap = new ThreadPoolExecutor.AbortPolicy();  
2:        try {  
3:        ap.rejectedExecution(() -> System.out.println("abort"), tpe);  
4:        } catch (Exception e) {  
5:             
6:        }  
7:          
8:        ThreadPoolExecutor.CallerRunsPolicy crp = new ThreadPoolExecutor.CallerRunsPolicy();  
9:        try {  
10:        crp.rejectedExecution(() -> System.out.println("run"), tpe);  
11:        } catch (Exception e) {  
12:             
13:        }  
14:          
15:        ThreadPoolExecutor.DiscardOldestPolicy dop = new ThreadPoolExecutor.DiscardOldestPolicy();  
16:        try {  
17:        dop.rejectedExecution(() -> System.out.println("abort"), tpe);  
18:        } catch (Exception e) {  
19:             
20:        }  
21:          
22:        ThreadPoolExecutor.DiscardPolicy dp = new ThreadPoolExecutor.DiscardPolicy();  
23:        try {  
24:        dp.rejectedExecution(() -> System.out.println("discard"), tpe);  
25:        } catch (Exception e) {  
26:             
27:        }  
28:    

That's it for these long learning series of java util concurrent.

Sunday, December 6, 2015

Learn java util concurrent part4

This is yet another series of learning into java util concurrent package. If you have not read the series before, you can find part1, part2 and part3 at the respectively links. In this series, we will study the classes in java.util.concurrent package.

AbstractExecutorService
Provides default implementations of ExecutorService execution methods. This class implements the submit, invokeAny and invokeAll methods using a RunnableFuture returned by newTaskFor, which defaults to the FutureTask class provided in this package.

1:        AbstractExecutorService aes = null;  
2:          
3:        aes = new ForkJoinPool();  
4:        System.out.println(aes.isShutdown());  
5:        Future<Integer> total = aes.submit(new Summer(33, 44));  
6:        System.out.println(total.get());  
7:        aes.shutdown();  
8:          
9:          
10:        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(4);  
11:        aes = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, blockingQueue);  
12:        System.out.println(aes.isShutdown());  
13:        total = aes.submit(new Summer(33, 44));  
14:        System.out.println(total.get());  
15:        aes.shutdown();  

In the example above, we see that two concrete implementation of AbstractExecutorService, ForkJoinPool and ThreadPoolExecutor both invoking method submit from abstract class  AbstractExecutorService.

ArrayBlockingQueue
A bounded blocking queue backed by an array. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.

1:        ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<Integer>(5);  
2:        abq.add(1);  
3:        abq.offer(2);  
4:        System.out.println(abq.size());  

A simple queue implementation, just like any other collections in java collection framwork, you can add, remove, or drain the collection.

CompletableFuture
A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion.

1:        CompletableFuture<Integer> cf = new CompletableFuture<Integer>();  
2:        System.out.println(cf.isCancelled());  

ConcurrentHashMap
A hash table supporting full concurrency of retrievals and high expected concurrency for updates.
However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access.
this class does not allow null to be used as a key or value.

1:        ConcurrentHashMap<String,Integer> chm = new ConcurrentHashMap<String,Integer>();  
2:        chm.put("one", 1);  
3:        chm.put("two", 2);  
4:        chm.put("six", 6);  

ConcurrentHashMap.KeySetView<K,V>
A view of a ConcurrentHashMap as a Set of keys, in which additions may optionally be enabled by mapping to a common value.

1:        ConcurrentHashMap.KeySetView<String, Integer> keys = chm.keySet(10);  
2:        System.out.println(keys.isEmpty());  
3:        System.out.println(keys.toString());  
4:        keys.add("ten");  
5:          
6:        keys.forEach((s) -> System.out.println(s));  
7:        System.out.println(chm.toString());  
8:          
9:        ConcurrentHashMap.KeySetView<String, Boolean> keys1 = chm.newKeySet();  
10:        System.out.println(keys1.isEmpty());  
11:        System.out.println(keys1.toString());  
12:        keys1.add("four");  
13:          
14:        keys1.forEach((s) -> System.out.println(s));  
15:        System.out.println(chm.toString());  

The above give two examples of usage of ConcurrentHashMap.KeySetView. The first one notice that changes to the keys affect the original concurrentHashMap chm whilst the second does not. So read the javadoc and pick the implementation that suit your requirements.

Now, we will take a look at two for the concurrent linked queues.

ConcurrentLinkedDeque<E>

  • An unbounded concurrent deque based on linked nodes.
  • Concurrent insertion, removal, and access operations execute safely across multiple threads. 
  • this class does not permit the use of null elements.
  • the size method is NOT a constant-time operation. Because of the asynchronous nature of these deques, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal.

ConcurrentLinkedQueue<E>

  • An unbounded thread-safe queue based on linked nodes.
  • This queue orders elements FIFO (first-in-first-out).
  • The head of the queue is that element that has been on the queue the longest time. 
  • The tail of the queue is that element that has been on the queue the shortest time. 
  • this class does not permit the use of null elements.


  insert                              element  
  always                              oldest  
  at tail                             at head  
  and                                 and retrieve  
  youngest                            here  
    +------------------------------------+  
    |                                    |  
    |                                    |  
    +------------------------------------+  
   tail                               head  

1:    
2:        ConcurrentLinkedDeque<Integer> cldq = new ConcurrentLinkedDeque<Integer>();   
3:        cldq.add(1);  
4:        cldq.add(2);  
5:        cldq.add(3);  
6:          
7:    
8:        ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<Integer>();  
9:        clq.add(4);  
10:        clq.add(5);  
11:        clq.add(6);  

With the examples above, the apparent benefits is not actually express as the only thread is the main adding element to the queues serially. Like the javadoc mentioned, you should really use these queues on multithreaded situation.

ConcurrentSkipListMap<K,V>

  • A scalable concurrent ConcurrentNavigableMap implementation.
  • The map is sorted according to the natural ordering of its keys, or by a Comparator provided at map creation time, depending on which constructor is used.
  • providing expected average log(n) time cost for the containsKey, get, put and remove operations and their variants. 
  • Insertion, removal, update, and access operations safely execute concurrently by multiple threads.
  • Ascending key ordered views and their iterators are faster than descending ones.
  • the size method is not a constant-time operation.

ConcurrentSkipListSet<E>

  • A scalable concurrent NavigableSet implementation based on a ConcurrentSkipListMap.
  • The elements of the set are kept sorted according to their natural ordering, or by a Comparator provided at set creation time, depending on which constructor is used.
  • expected average log(n) time cost for the contains, add, and remove operations and their variants.
  • Insertion, removal, and access operations safely execute concurrently by multiple threads.
  • Ascending ordered views and their iterators are faster than descending ones.
  • the size method is not a constant-time operation. 

1:        ConcurrentSkipListMap<String,Integer> cslm = new ConcurrentSkipListMap<String, Integer>();  
2:        cslm.put("one", 1);  
3:        cslm.put("two", 2);  
4:        cslm.put("six", 6);  
5:          
6:        ConcurrentSkipListSet<Integer> csls = new ConcurrentSkipListSet<Integer>();  
7:        csls.add(1);  
8:        csls.add(1);  
9:        System.out.println("set size " + csls.size());  

With the examples above, the apparent benefits is not actually express as the only thread is the main adding element to the collections serially. Like the javadoc mentioned, you should really use these collections on multithreaded situation.

CopyOnWriteArrayList<E>

  • A thread-safe variant of ArrayList in which all mutative operations (add, set, and so on) are implemented by making a fresh copy of the underlying array.
  • This is ordinarily too costly, but may be more efficient than alternatives when traversal operations vastly outnumber mutations, and is useful when you cannot or don't want to synchronize traversals, yet need to preclude interference among concurrent threads.
  • All elements are permitted, including null.



CopyOnWriteArraySet<E>

  • A Set that uses an internal CopyOnWriteArrayList for all of its operations.
  • It is best suited for applications in which set sizes generally stay small, read-only operations vastly outnumber mutative operations, and you need to prevent interference among threads during traversal.
  • It is thread-safe.
  • Mutative operations (add, set, remove, etc.) are expensive since they usually entail copying the entire underlying array.


1:        CopyOnWriteArrayList<Integer> cowal = new CopyOnWriteArrayList<Integer>();  
2:        cowal.add(1);  
3:        cowal.add(2);  
4:        cowal.add(3);  
5:          
6:        CopyOnWriteArraySet<Integer> cowas = new CopyOnWriteArraySet<Integer>();  
7:        cowas.add(1);  
8:        cowas.add(1);  
9:        System.out.println("set size " + cowas.size());  

Just like the four collections above, these beneifts best shown on multithreaded applications.

CountDownLatch

  • A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
  • A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset


1:        int N = 10;  
2:        CountDownLatch startSignal = new CountDownLatch(1);  
3:        CountDownLatch doneSignal = new CountDownLatch(N);  
4:          
5:        for (int i = 0; i < N; ++i) // create and start threads  
6:           new Thread(new Worker(startSignal, doneSignal)).start();  
7:    
8:        doSomethingElse();     // don't let run yet  
9:        startSignal.countDown();  // let all threads proceed  
10:        doSomethingElse();  
11:        doneSignal.await();    // wait for all to finish  
12:    
13:    
14:     private static void doSomethingElse() throws InterruptedException {  
15:          Thread.sleep(3000);  
16:        System.out.println(Thread.currentThread().getName() + " doing something else");  
17:    
18:     }  
19:    
20:    
21:     static class Worker implements Runnable {  
22:          private final CountDownLatch startSignal;  
23:          private final CountDownLatch doneSignal;  
24:          Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {  
25:           this.startSignal = startSignal;  
26:           this.doneSignal = doneSignal;  
27:          }  
28:          public void run() {  
29:           try {  
30:            startSignal.await();  
31:            doWork();  
32:            doneSignal.countDown();  
33:           } catch (InterruptedException ex) {} // return;  
34:          }  
35:    
36:          void doWork() { System.out.println(Thread.currentThread().getName() + " doing work"); try {  
37:           Thread.sleep(200);  
38:      }  

we see that ten worker threads were started but it was in waiting state in the run method. Until the startSignal started to count down, then only all the workers thread started. In the individual worker threads, we will see doneSignal is counting down one by one for 10 tens for each worker thread respectively. In the main thread, doneSignal is in waiting state before all the worker thread done all the signals.

CountedCompleter<T>

  • A ForkJoinTask with a completion action performed when triggered and there are no remaining pending actions.
  • Sample Usages.
  • Parallel recursive decomposition.
  • Searching. 
  • Recording subtasks. 
  • Completion Traversals. 
  • Triggers.
1:        // CountedCompleter<T>  
2:        Integer[] numbers = {1,2,3,4,5};  
3:        // null ?  
4:        MapReducer<Integer> numbersReducer = new MapReducer<Integer>(null, numbers, new MyMapper(), new MyReducer(), 1, 10);  
5:        Integer result = numbersReducer.getRawResult();  
6:        System.out.println(result);  

CyclicBarrier

  • A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
  • CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. 
  • The barrier is called cyclic because it can be re-used after the waiting threads are released. 
  • The CyclicBarrier uses an all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads waiting at that barrier point will also leave abnormally via BrokenBarrierException (or InterruptedException if they too were interrupted at about the same time). 

1:        // CyclicBarrier  
2:        float[][] matrix = {{1,2}, {2,3}};  
3:        new Solver(matrix);  
4:    
5:  public class Solver {  
6:    
7:     final int N;  
8:     final float[][] data;  
9:     final CyclicBarrier barrier;  
10:    
11:     class Worker implements Runnable {  
12:        int myRow;  
13:        boolean done;  
14:    
15:        Worker(int row) {  
16:           myRow = row;  
17:        }  
18:    
19:        public void run() {  
20:           while (!done()) {  
21:              processRow(myRow);  
22:    
23:              try {  
24:                 barrier.await();  
25:              } catch (InterruptedException ex) {  
26:                 return;  
27:              } catch (BrokenBarrierException ex) {  
28:                 return;  
29:              }  
30:           }  
31:        }  
32:          
33:        public boolean done() {  
34:           return done;  
35:        }  
36:          
37:        private void processRow(int row) {  
38:           System.out.println(Thread.currentThread().getName() + " processing row " + row );  
39:           done = true;  
40:        }  
41:     }  
42:    
43:     public Solver(float[][] matrix) {  
44:        data = matrix;  
45:        N = matrix.length;  
46:        Runnable barrierAction = new Runnable() {   
47:           public void run() {   
48:              //mergeRows(...);   
49:              System.out.println("merging row");  
50:           }  
51:        };  
52:        barrier = new CyclicBarrier(N, barrierAction);  
53:    
54:        List<Thread> threads = new ArrayList<Thread>(N);  
55:        for (int i = 0; i < N; i++) {  
56:         Thread thread = new Thread(new Worker(i));  
57:         threads.add(thread);  
58:         thread.start();  
59:        }  
60:    
61:        // wait until done  
62:        for (Thread thread : threads)  
63:           try {  
64:              thread.join();  
65:           } catch (InterruptedException e) {  
66:              e.printStackTrace();  
67:           }  
68:       }  
69:  }  

In the example above, we see the main class initialized a new solver object passing a two dimentional floating matrix for process. In the solver class, we see that a cyclicbarrier is initialized with a runnable barrier action. Depending on the matrix length, the length shall be used to initialize the workers threads. The solver thread waits until all the workers thread done.

In the worker thread, I simplified the processRow to just printout and set done to true, you can of cause process data[myRow] to make the sample code near to the real world problem. Noticed that barrier in each individual worker thread is call method await. IN this example, if two workers are done process the row and barrier await is executed, then the final barrierAction object will run the final merging row.


DelayQueue<E extends Delayed>

  • An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.
  • The head of the queue is that Delayed element whose delay expired furthest in the past.
  • If no delay has expired there is no head and poll will return null.
  • Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero.
  • the size method returns the count of both expired and unexpired elements.
  • This queue does not permit null elements.

1:        DelayQueue<SalaryDelay> delayQueue = new DelayQueue<SalaryDelay>();  
2:        delayQueue.add(new SalaryDelay("August", 1));  
3:        delayQueue.add(new SalaryDelay("September", 2));  
4:        delayQueue.add(new SalaryDelay("October", 3));  
5:          
6:        System.out.println(delayQueue.size());  
7:        System.out.println(delayQueue.poll());  

Like this queue before, you add the class that implement delayed and be place in this delayqueue.

Exchanger<V>

  • A synchronization point at which threads can pair and swap elements within pairs.

1:        Exchanger<?> exchanger = new Exchanger<>();  
2:        ExchangerRunnable exchangerRunnable1 = new ExchangerRunnable(exchanger, "keychain");  
3:        ExchangerRunnable exchangerRunnable2 = new ExchangerRunnable(exchanger, "chocalate");  
4:          
5:        new Thread(exchangerRunnable1).start();  
6:        new Thread(exchangerRunnable2).start();  
7:          
8:  public class ExchangerRunnable implements Runnable {  
9:    
10:     Exchanger exchanger = null;  
11:     Object object = null;  
12:    
13:     public ExchangerRunnable(Exchanger exchanger, Object object) {  
14:        this.exchanger = exchanger;  
15:        this.object = object;  
16:     }  
17:    
18:     public void run() {  
19:        try {  
20:           Object previous = this.object;  
21:    
22:           this.object = this.exchanger.exchange(this.object);  
23:    
24:           System.out.println(Thread.currentThread().getName() + " exchanged "  
25:                 + previous + " for " + this.object);  
26:        } catch (InterruptedException e) {  
27:           e.printStackTrace();  
28:        }  
29:     }  
30:    
31:  }  

With the code above, there are two thread that exchange a string object to each other.

ExecutorCompletionService<V>

  • A CompletionService that uses a supplied Executor to execute tasks.

1:        ExecutorService executorService = Executors.newFixedThreadPool(1);  
2:        CompletionService<Integer> longRunningCompletionService = new ExecutorCompletionService<Integer>(executorService);  
3:        longRunningCompletionService.submit(() -> {System.out.println("done"); return 1;});  
4:        longRunningCompletionService.take();  
5:        executorService.shutdown();  

Moving onto our last 2 classes in this lesson.

Executors
- Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package.

1:        Executors.newCachedThreadPool();  
2:        Executors.defaultThreadFactory();  
3:        Executors.newFixedThreadPool(10);  
4:        Executors.newScheduledThreadPool(1);  
5:        Executors.newSingleThreadExecutor();  
6:        Executors.privilegedThreadFactory();  
7:        Executors.newWorkStealingPool();  

Just try different pools implementation in java to get some idea the specific pools.

ForkJoinPool

  • An ExecutorService for running ForkJoinTasks.
  • This implementation restricts the maximum number of running threads to 32767. 
1:        ForkJoinPool fjPool = new ForkJoinPool();  
2:        Future<Integer> sum = fjPool.submit(new Summer(11, 89));  
3:        System.out.println(sum.get());  
4:        fjPool.shutdown();  

A trivial example of using ForkJoinPool to submit a runnable task which return a result.

That's it for this long but brief lesson. This article end with the source code where you can get from the links below.

https://github.com/jasonwee/videoOnCloud/commit/0e7549249490ef5f9e528e101354440b7ad5cbdb
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/ExchangerRunnable.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/LearnConcurrentClassP3.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/MapReducer.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/MyMapper.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/MyReducer.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/Solver.java

Saturday, December 5, 2015

Learn java util concurrent part3

This series is the next learning series of java.util.concurrent. You should read the first part and second part too. Today we will learn remaining ten interfaces in package java.util.concurrent.

Okay, let's start with ForkJoinPool.ForkJoinWorkerThreadFactory. For programming wise, you should not be worry as the class ForkJoinPool takes care of this implementation. Within class ForkJoinPool, we see that, there are two classes


  • DefaultForkJoinWorkerThreadFactory
  • InnocuousForkJoinWorkerThreadFactory

which implement ForkJoinWorkerThreadFactory with access modifier to default. So unless you know what you want and you know how ForkJoinPool work, then subclass ForkJoinPool away. For beginner in this article, it is sufficient to just use ForkJoinPool.

ManagedBlocker is an interface for extending managed parallelism for tasks running in ForkJoinPools. There are two methods to be implemented, block() and isReleasable()

1:  public class QueueManagedBlocker<T> implements ManagedBlocker {  
2:       
3:     final BlockingQueue<T> queue;  
4:     volatile T value = null;  
5:       
6:     QueueManagedBlocker(BlockingQueue<T> queue) {  
7:        this.queue = queue;  
8:     }  
9:    
10:     @Override  
11:     public boolean block() throws InterruptedException {  
12:        if (value == null)  
13:           value = queue.take();  
14:        return true;  
15:     }  
16:    
17:     @Override  
18:     public boolean isReleasable() {  
19:        return value != null || (value = queue.poll()) != null;  
20:     }  
21:       
22:     public T getValue() {  
23:        return value;  
24:     }  
25:    
26:  }  

Next, we have interface Future<V> which we have see before in the previous learning series.

1:  ExecutorService executorService = Executors.newFixedThreadPool(1);  
2:  Future<Integer> future = executorService.submit(new Summer(11,22));  

It's very clear you can obtain the result via future variable above. Interface RejectedExecutionHandler is mostly for error handling.

1:  RejectedExecutionHandler executionHandler = new MyRejectedExecutionHandlerImpl();  
2:  ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 10, TimeUnit.SECONDS, worksQueue, executionHandler);  
3:    
4:  public class MyRejectedExecutionHandlerImpl implements RejectedExecutionHandler {  
5:    
6:     @Override  
7:     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
8:        System.out.println(r.toString() + " : I've been rejected !");  
9:     }  
10:    
11:  }  

So you can set the implementation classed to ThreadPoolExecutor and if a task cannot be executor by the ThreadPoolExecutor, rejectedExecution will be executed. Moving onto the next interface, RunnableFuture<V> .

1:  RunnableFuture<Integer> rf = new FutureTask<Integer>(new Summer(22,33));  

so we see an initialization of object FutureTask with a callable class Summer class which we created in the previous learning series. Interface RunnableScheduledFuture which extend the previous interface RunnableFuture has another additional method to implement upon on.

1:  RunnableScheduledFuture<Integer> rsf = new Summer1();  
2:  System.out.println(rsf.isPeriodic());RunnableFuture<Integer> rf = new FutureTask<Integer>(new Summer(22,33));  

In the class Summer1, you should determine if the class is periodic or not. ScheduledExecutorService is pretty common if you google this interface and given the code below.

1:  ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);  
2:  scheduler.scheduleAtFixedRate(() -> System.out.println("hihi"), 1, 1, TimeUnit.SECONDS);  
3:  Thread.sleep(3000);  
4:  scheduler.shutdown();  

so we see a thread is executed every second.

1:  ScheduledFuture<Integer> sf = new ScheduledFutureImpl();  
2:  sf.isCancelled();  

ScheduledFuture<V> is a delayed result-bearing action that can be cancelled. Usually a scheduled future is the result of scheduling a task with a ScheduledExecutorService. This class is pretty common if you have a future task which get delay for whatever reason or it may get cancel, you want to look further into this class.

ThreadFactory is another interface which creates new threads on demand. Using thread factories removes hardwiring of calls to new Thread, enabling applications to use special thread subclasses, priorities, etc.

1:  ThreadFactory tf = Executors.defaultThreadFactory();  
2:  tf.newThread(()->System.out.println("ThreadFactory")).start();  

In this last series, we take a look at the last interface, TransferQueue.  A TransferQueue may be useful for example in message passing applications in which producers sometimes (using method transfer(E)) await receipt of elements by consumers invoking take or poll, while at other times enqueue elements (via method put) without waiting for receipt.

1:  TransferQueue<Integer> tq = new LinkedTransferQueue<Integer>();  

That's it for this learning series. Thank you. Oh, and the source code.

https://github.com/jasonwee/videoOnCloud/commit/ce479e5befaf7abe84d3d85930d5196a639e2643

https://github.com/jasonwee/videoOnCloud/blob/master/src/java/org/just4fun/concurrent/MyRejectedExecutionHandlerImpl.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/org/just4fun/concurrent/ExampleThreadPoolExecutor.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/org/just4fun/concurrent/DemoExecutor.java

https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/LearnConcurrentInterfaceP2.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/QueueManagedBlocker.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/ScheduledFutureImpl.java
https://github.com/jasonwee/videoOnCloud/blob/master/src/java/play/learn/java/concurrent/Summer1.java