Friday, July 14, 2017

elasticsearch 2.2.1 upsert with exception document already exist

so why is the following operation throwing exception?

1:  [2016-06-26 00:00:51,357][INFO ][rest.suppressed     ] /<my_index>/<my_type>/1123456789.123456789/_update Params: {version_type=force, doc_as_upsert=true, index=<my_index>, op_type=create, id=1123456789.123456789, type=<my_type>, consistency=one, version=1466892051270, ttl=1814400000ms}  
2:  RemoteTransportException[[node1][123.123.123.123:8700][indices:data/write/update[s]]]; nested: DocumentAlreadyExistsException[[session][1123456789.123456789]: document already exists];  
3:  Caused by: [<my_index>][[<my_index>][3]] DocumentAlreadyExistsException[[<my_type>][1123456789.123456789]: document already exists]  
4:      at org.elasticsearch.index.engine.InternalEngine.innerCreateNoLock(InternalEngine.java:421)  
5:      at org.elasticsearch.index.engine.InternalEngine.innerCreate(InternalEngine.java:378)  
6:      at org.elasticsearch.index.engine.InternalEngine.create(InternalEngine.java:349)  
7:      at org.elasticsearch.index.shard.IndexShard.create(IndexShard.java:545)  
8:      at org.elasticsearch.index.engine.Engine$Create.execute(Engine.java:810)  
9:      at org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary(TransportIndexAction.java:236)  
10:      at org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(TransportIndexAction.java:157)  
11:      at org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(TransportIndexAction.java:65)  
12:      at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase.doRun(TransportReplicationAction.java:595)  
13:      at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)  
14:      at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:263)  
15:      at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:260)  
16:      at org.elasticsearch.transport.TransportService$4.doRun(TransportService.java:350)  
17:      at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)  
18:      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)  
19:      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)  
20:      at java.lang.Thread.run(Thread.java:745)  

well, some background,
* elasticsearch 2.2.1
* java8
* upsert operation
* version external
* version type force
* ttl enabled
* consistency one

As usual, google first, this come close but solve in elasticsearch version 5a. Upgrade not a chance yet. At least not for production with alpha release. Now we trace into the code. On the top of the stack, 

at org.elasticsearch.index.engine.InternalEngine.innerCreateNoLock(InternalEngine.java:421)

I extracted out the method for easier discussion.

1:    private void innerCreateNoLock(Create create, long currentVersion, VersionValue versionValue) throws IOException {  
2:    
3:      // same logic as index  
4:      long updatedVersion;  
5:      long expectedVersion = create.version();  
6:      if (create.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {  
7:        if (create.origin() == Operation.Origin.RECOVERY) {  
8:          return;  
9:        } else {  
10:          throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion);  
11:        }  
12:      }  
13:      updatedVersion = create.versionType().updateVersion(currentVersion, expectedVersion);  
14:    
15:      // if the doc exists  
16:      boolean doUpdate = false;  
17:      if ((versionValue != null && versionValue.delete() == false) || (versionValue == null && currentVersion != Versions.NOT_FOUND)) {  
18:        if (create.origin() == Operation.Origin.RECOVERY) {  
19:          return;  
20:        } else if (create.origin() == Operation.Origin.REPLICA) {  
21:          // #7142: the primary already determined it's OK to index this document, and we confirmed above that the version doesn't  
22:          // conflict, so we must also update here on the replica to remain consistent:  
23:          doUpdate = true;  
24:        } else if (create.origin() == Operation.Origin.PRIMARY && create.autoGeneratedId() && create.canHaveDuplicates() &&  
25:            currentVersion == 1 && create.version() == Versions.MATCH_ANY) {  
26:          /**  
27:           * If bulk index request fails due to a disconnect, unavailable shard etc. then the request is  
28:           * retried before it actually fails. However, the documents might already be indexed.  
29:           * For autogenerated ids this means that a version conflict will be reported in the bulk request  
30:           * although the document was indexed properly.  
31:           * To avoid this we have to make sure that the index request is treated as an update and set updatedVersion to 1.  
32:           * See also discussion on https://github.com/elasticsearch/elasticsearch/pull/9125  
33:           */  
34:          doUpdate = true;  
35:          updatedVersion = 1;   
36:        } else {  
37:          // On primary, we throw DAEE if the _uid is already in the index with an older version:  
38:          assert create.origin() == Operation.Origin.PRIMARY;  
39:          throw new DocumentAlreadyExistsException(shardId, create.type(), create.id());  
40:        }  
41:      }  
42:    
43:      create.updateVersion(updatedVersion);  
44:    
45:      if (doUpdate) {  
46:        if (create.docs().size() > 1) {  
47:          indexWriter.updateDocuments(create.uid(), create.docs());  
48:        } else {  
49:          indexWriter.updateDocument(create.uid(), create.docs().get(0));  
50:        }  
51:      } else {  
52:        if (create.docs().size() > 1) {  
53:          indexWriter.addDocuments(create.docs());  
54:        } else {  
55:          indexWriter.addDocument(create.docs().get(0));  
56:        }  
57:      }  
58:      Translog.Location translogLocation = translog.add(new Translog.Create(create));  
59:    
60:      versionMap.putUnderLock(create.uid().bytes(), new VersionValue(updatedVersion, translogLocation));  
61:      create.setTranslogLocation(translogLocation);  
62:      indexingService.postCreateUnderLock(create);  
63:    }  

It looks like it is safe to retry on this upsert operation. Read further discussion here.

What do you think? Leave your comment below!

No comments:

Post a Comment