Friday, July 14, 2017

elasticsearch 2.2.1 upsert with exception document already exist

so why is the following operation throwing exception?

1:  [2016-06-26 00:00:51,357][INFO ][rest.suppressed     ] /<my_index>/<my_type>/1123456789.123456789/_update Params: {version_type=force, doc_as_upsert=true, index=<my_index>, op_type=create, id=1123456789.123456789, type=<my_type>, consistency=one, version=1466892051270, ttl=1814400000ms}  
2:  RemoteTransportException[[node1][][indices:data/write/update[s]]]; nested: DocumentAlreadyExistsException[[session][1123456789.123456789]: document already exists];  
3:  Caused by: [<my_index>][[<my_index>][3]] DocumentAlreadyExistsException[[<my_type>][1123456789.123456789]: document already exists]  
4:      at org.elasticsearch.index.engine.InternalEngine.innerCreateNoLock(  
5:      at org.elasticsearch.index.engine.InternalEngine.innerCreate(  
6:      at org.elasticsearch.index.engine.InternalEngine.create(  
7:      at org.elasticsearch.index.shard.IndexShard.create(  
8:      at org.elasticsearch.index.engine.Engine$Create.execute(  
9:      at org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary(  
10:      at org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(  
11:      at org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(  
12:      at$PrimaryPhase.doRun(  
13:      at  
14:      at$PrimaryOperationTransportHandler.messageReceived(  
15:      at$PrimaryOperationTransportHandler.messageReceived(  
16:      at org.elasticsearch.transport.TransportService$4.doRun(  
17:      at  
18:      at java.util.concurrent.ThreadPoolExecutor.runWorker(  
19:      at java.util.concurrent.ThreadPoolExecutor$  
20:      at  

well, some background,
* elasticsearch 2.2.1
* java8
* upsert operation
* version external
* version type force
* ttl enabled
* consistency one

As usual, google first, this come close but solve in elasticsearch version 5a. Upgrade not a chance yet. At least not for production with alpha release. Now we trace into the code. On the top of the stack, 

at org.elasticsearch.index.engine.InternalEngine.innerCreateNoLock(

I extracted out the method for easier discussion.

1:    private void innerCreateNoLock(Create create, long currentVersion, VersionValue versionValue) throws IOException {  
3:      // same logic as index  
4:      long updatedVersion;  
5:      long expectedVersion = create.version();  
6:      if (create.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {  
7:        if (create.origin() == Operation.Origin.RECOVERY) {  
8:          return;  
9:        } else {  
10:          throw new VersionConflictEngineException(shardId, create.type(),, currentVersion, expectedVersion);  
11:        }  
12:      }  
13:      updatedVersion = create.versionType().updateVersion(currentVersion, expectedVersion);  
15:      // if the doc exists  
16:      boolean doUpdate = false;  
17:      if ((versionValue != null && versionValue.delete() == false) || (versionValue == null && currentVersion != Versions.NOT_FOUND)) {  
18:        if (create.origin() == Operation.Origin.RECOVERY) {  
19:          return;  
20:        } else if (create.origin() == Operation.Origin.REPLICA) {  
21:          // #7142: the primary already determined it's OK to index this document, and we confirmed above that the version doesn't  
22:          // conflict, so we must also update here on the replica to remain consistent:  
23:          doUpdate = true;  
24:        } else if (create.origin() == Operation.Origin.PRIMARY && create.autoGeneratedId() && create.canHaveDuplicates() &&  
25:            currentVersion == 1 && create.version() == Versions.MATCH_ANY) {  
26:          /**  
27:           * If bulk index request fails due to a disconnect, unavailable shard etc. then the request is  
28:           * retried before it actually fails. However, the documents might already be indexed.  
29:           * For autogenerated ids this means that a version conflict will be reported in the bulk request  
30:           * although the document was indexed properly.  
31:           * To avoid this we have to make sure that the index request is treated as an update and set updatedVersion to 1.  
32:           * See also discussion on  
33:           */  
34:          doUpdate = true;  
35:          updatedVersion = 1;   
36:        } else {  
37:          // On primary, we throw DAEE if the _uid is already in the index with an older version:  
38:          assert create.origin() == Operation.Origin.PRIMARY;  
39:          throw new DocumentAlreadyExistsException(shardId, create.type(),;  
40:        }  
41:      }  
43:      create.updateVersion(updatedVersion);  
45:      if (doUpdate) {  
46:        if ( > 1) {  
47:          indexWriter.updateDocuments(create.uid(),;  
48:        } else {  
49:          indexWriter.updateDocument(create.uid(),;  
50:        }  
51:      } else {  
52:        if ( > 1) {  
53:          indexWriter.addDocuments(;  
54:        } else {  
55:          indexWriter.addDocument(;  
56:        }  
57:      }  
58:      Translog.Location translogLocation = translog.add(new Translog.Create(create));  
60:      versionMap.putUnderLock(create.uid().bytes(), new VersionValue(updatedVersion, translogLocation));  
61:      create.setTranslogLocation(translogLocation);  
62:      indexingService.postCreateUnderLock(create);  
63:    }  

It looks like it is safe to retry on this upsert operation. Read further discussion here.

What do you think? Leave your comment below!

No comments:

Post a Comment