fix concurrent modification in portfolio by locking sequence number map
This commit is contained in:
parent
81eaeb6df0
commit
c214919aa5
2 changed files with 49 additions and 23 deletions
|
@ -187,7 +187,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||||
@Override
|
@Override
|
||||||
public void readPersisted(Runnable completeHandler) {
|
public void readPersisted(Runnable completeHandler) {
|
||||||
persistenceManager.readPersisted(persisted -> {
|
persistenceManager.readPersisted(persisted -> {
|
||||||
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(persisted.getMap()));
|
synchronized (persisted.getMap()) {
|
||||||
|
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(persisted.getMap()));
|
||||||
|
}
|
||||||
completeHandler.run();
|
completeHandler.run();
|
||||||
},
|
},
|
||||||
completeHandler);
|
completeHandler);
|
||||||
|
@ -198,7 +200,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||||
public void readPersistedSync() {
|
public void readPersistedSync() {
|
||||||
SequenceNumberMap persisted = persistenceManager.getPersisted();
|
SequenceNumberMap persisted = persistenceManager.getPersisted();
|
||||||
if (persisted != null) {
|
if (persisted != null) {
|
||||||
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(persisted.getMap()));
|
synchronized (persisted.getMap()) {
|
||||||
|
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(persisted.getMap()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -641,9 +645,11 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||||
}
|
}
|
||||||
removeFromMapAndDataStore(toRemoveList);
|
removeFromMapAndDataStore(toRemoveList);
|
||||||
|
|
||||||
if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge) {
|
synchronized (sequenceNumberMap.getMap()) {
|
||||||
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap()));
|
if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge) {
|
||||||
requestPersistence();
|
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap()));
|
||||||
|
requestPersistence();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,8 +19,6 @@ package haveno.network.p2p.storage.persistence;
|
||||||
|
|
||||||
import haveno.common.proto.persistable.PersistableEnvelope;
|
import haveno.common.proto.persistable.PersistableEnvelope;
|
||||||
import haveno.network.p2p.storage.P2PDataStorage;
|
import haveno.network.p2p.storage.P2PDataStorage;
|
||||||
import lombok.Getter;
|
|
||||||
import lombok.Setter;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -33,8 +31,6 @@ import java.util.stream.Collectors;
|
||||||
* Hence this Persistable class.
|
* Hence this Persistable class.
|
||||||
*/
|
*/
|
||||||
public class SequenceNumberMap implements PersistableEnvelope {
|
public class SequenceNumberMap implements PersistableEnvelope {
|
||||||
@Getter
|
|
||||||
@Setter
|
|
||||||
private Map<P2PDataStorage.ByteArray, P2PDataStorage.MapValue> map = new ConcurrentHashMap<>();
|
private Map<P2PDataStorage.ByteArray, P2PDataStorage.MapValue> map = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public SequenceNumberMap() {
|
public SequenceNumberMap() {
|
||||||
|
@ -46,20 +42,24 @@ public class SequenceNumberMap implements PersistableEnvelope {
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
private SequenceNumberMap(Map<P2PDataStorage.ByteArray, P2PDataStorage.MapValue> map) {
|
private SequenceNumberMap(Map<P2PDataStorage.ByteArray, P2PDataStorage.MapValue> map) {
|
||||||
this.map.putAll(map);
|
synchronized (this.map) {
|
||||||
|
this.map.putAll(map);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public protobuf.PersistableEnvelope toProtoMessage() {
|
public protobuf.PersistableEnvelope toProtoMessage() {
|
||||||
return protobuf.PersistableEnvelope.newBuilder()
|
synchronized (map) {
|
||||||
.setSequenceNumberMap(protobuf.SequenceNumberMap.newBuilder()
|
return protobuf.PersistableEnvelope.newBuilder()
|
||||||
.addAllSequenceNumberEntries(map.entrySet().stream()
|
.setSequenceNumberMap(protobuf.SequenceNumberMap.newBuilder()
|
||||||
.map(entry -> protobuf.SequenceNumberEntry.newBuilder()
|
.addAllSequenceNumberEntries(map.entrySet().stream()
|
||||||
.setBytes(entry.getKey().toProtoMessage())
|
.map(entry -> protobuf.SequenceNumberEntry.newBuilder()
|
||||||
.setMapValue(entry.getValue().toProtoMessage())
|
.setBytes(entry.getKey().toProtoMessage())
|
||||||
.build())
|
.setMapValue(entry.getValue().toProtoMessage())
|
||||||
.collect(Collectors.toList())))
|
.build())
|
||||||
.build();
|
.collect(Collectors.toList())))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static SequenceNumberMap fromProto(protobuf.SequenceNumberMap proto) {
|
public static SequenceNumberMap fromProto(protobuf.SequenceNumberMap proto) {
|
||||||
|
@ -74,20 +74,40 @@ public class SequenceNumberMap implements PersistableEnvelope {
|
||||||
// API
|
// API
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
public Map<P2PDataStorage.ByteArray, P2PDataStorage.MapValue> getMap() {
|
||||||
|
synchronized (map) {
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMap(Map<P2PDataStorage.ByteArray, P2PDataStorage.MapValue> map) {
|
||||||
|
synchronized (this.map) {
|
||||||
|
this.map = map;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Delegates
|
// Delegates
|
||||||
public int size() {
|
public int size() {
|
||||||
return map.size();
|
synchronized (map) {
|
||||||
|
return map.size();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean containsKey(P2PDataStorage.ByteArray key) {
|
public boolean containsKey(P2PDataStorage.ByteArray key) {
|
||||||
return map.containsKey(key);
|
synchronized (map) {
|
||||||
|
return map.containsKey(key);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public P2PDataStorage.MapValue get(P2PDataStorage.ByteArray key) {
|
public P2PDataStorage.MapValue get(P2PDataStorage.ByteArray key) {
|
||||||
return map.get(key);
|
synchronized (map) {
|
||||||
|
return map.get(key);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void put(P2PDataStorage.ByteArray key, P2PDataStorage.MapValue value) {
|
public void put(P2PDataStorage.ByteArray key, P2PDataStorage.MapValue value) {
|
||||||
map.put(key, value);
|
synchronized (map) {
|
||||||
|
map.put(key, value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue