1: [2016-06-26 00:00:51,357][INFO ][rest.suppressed ] /<my_index>/<my_type>/1123456789.123456789/_update Params: {version_type=force, doc_as_upsert=true, index=<my_index>, op_type=create, id=1123456789.123456789, type=<my_type>, consistency=one, version=1466892051270, ttl=1814400000ms}
2: RemoteTransportException[[node1][][indices:data/write/update[s]]]; nested: DocumentAlreadyExistsException[[session][1123456789.123456789]: document already exists];
3: Caused by: [<my_index>][[<my_index>][3]] DocumentAlreadyExistsException[[<my_type>][1123456789.123456789]: document already exists]
4: at org.elasticsearch.index.engine.InternalEngine.innerCreateNoLock(InternalEngine.java:421)
5: at org.elasticsearch.index.engine.InternalEngine.innerCreate(InternalEngine.java:378)
6: at org.elasticsearch.index.engine.InternalEngine.create(InternalEngine.java:349)
7: at org.elasticsearch.index.shard.IndexShard.create(IndexShard.java:545)
8: at org.elasticsearch.index.engine.Engine$Create.execute(Engine.java:810)
9: at org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary(TransportIndexAction.java:236)
10: at org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(TransportIndexAction.java:157)
11: at org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(TransportIndexAction.java:65)
12: at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase.doRun(TransportReplicationAction.java:595)
13: at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
14: at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:263)
15: at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:260)
16: at org.elasticsearch.transport.TransportService$4.doRun(TransportService.java:350)
17: at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
18: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
19: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
20: at java.lang.Thread.run(Thread.java:745)
well, some background,
* elasticsearch 2.2.1
* java8
* upsert operation
* version external
* version type force
* ttl enabled
* consistency one
As usual, google first, this come close but solve in elasticsearch version 5a. Upgrade not a chance yet. At least not for production with alpha release. Now we trace into the code. On the top of the stack,
at org.elasticsearch.index.engine.InternalEngine.innerCreateNoLock(InternalEngine.java:421)
I extracted out the method for easier discussion.
1: private void innerCreateNoLock(Create create, long currentVersion, VersionValue versionValue) throws IOException {
3: // same logic as index
4: long updatedVersion;
5: long expectedVersion = create.version();
6: if (create.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
7: if (create.origin() == Operation.Origin.RECOVERY) {
8: return;
9: } else {
10: throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion);
11: }
12: }
13: updatedVersion = create.versionType().updateVersion(currentVersion, expectedVersion);
15: // if the doc exists
16: boolean doUpdate = false;
17: if ((versionValue != null && versionValue.delete() == false) || (versionValue == null && currentVersion != Versions.NOT_FOUND)) {
18: if (create.origin() == Operation.Origin.RECOVERY) {
19: return;
20: } else if (create.origin() == Operation.Origin.REPLICA) {
21: // #7142: the primary already determined it's OK to index this document, and we confirmed above that the version doesn't
22: // conflict, so we must also update here on the replica to remain consistent:
23: doUpdate = true;
24: } else if (create.origin() == Operation.Origin.PRIMARY && create.autoGeneratedId() && create.canHaveDuplicates() &&
25: currentVersion == 1 && create.version() == Versions.MATCH_ANY) {
26: /**
27: * If bulk index request fails due to a disconnect, unavailable shard etc. then the request is
28: * retried before it actually fails. However, the documents might already be indexed.
29: * For autogenerated ids this means that a version conflict will be reported in the bulk request
30: * although the document was indexed properly.
31: * To avoid this we have to make sure that the index request is treated as an update and set updatedVersion to 1.
32: * See also discussion on https://github.com/elasticsearch/elasticsearch/pull/9125
33: */
34: doUpdate = true;
35: updatedVersion = 1;
36: } else {
37: // On primary, we throw DAEE if the _uid is already in the index with an older version:
38: assert create.origin() == Operation.Origin.PRIMARY;
39: throw new DocumentAlreadyExistsException(shardId, create.type(), create.id());
40: }
41: }
43: create.updateVersion(updatedVersion);
45: if (doUpdate) {
46: if (create.docs().size() > 1) {
47: indexWriter.updateDocuments(create.uid(), create.docs());
48: } else {
49: indexWriter.updateDocument(create.uid(), create.docs().get(0));
50: }
51: } else {
52: if (create.docs().size() > 1) {
53: indexWriter.addDocuments(create.docs());
54: } else {
55: indexWriter.addDocument(create.docs().get(0));
56: }
57: }
58: Translog.Location translogLocation = translog.add(new Translog.Create(create));
60: versionMap.putUnderLock(create.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
61: create.setTranslogLocation(translogLocation);
62: indexingService.postCreateUnderLock(create);
63: }
It looks like it is safe to retry on this upsert operation. Read further discussion here.
What do you think? Leave your comment below!
What do you think? Leave your comment below!
No comments:
Post a Comment