调整版本并做测试
This commit is contained in:
111
pyenjoy/kit/SyncWriteMap.py
Normal file
111
pyenjoy/kit/SyncWriteMap.py
Normal file
@@ -0,0 +1,111 @@
|
||||
#!/usr/bin/env python3.9
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
JFinal SyncWriteMap - Thread-Safe Dictionary
|
||||
"""
|
||||
|
||||
from typing import Dict, Generic, TypeVar, Optional, Callable
|
||||
from threading import Lock
|
||||
|
||||
K = TypeVar('K')
|
||||
V = TypeVar('V')
|
||||
|
||||
class SyncWriteDict(Dict[K, V]):
|
||||
"""Thread-safe dictionary with synchronized write operations"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._lock = Lock()
|
||||
|
||||
def put(self, key: K, value: V) -> Optional[V]:
|
||||
"""Put key-value pair with synchronization"""
|
||||
with self._lock:
|
||||
old_value = self.get(key)
|
||||
super().__setitem__(key, value)
|
||||
return old_value
|
||||
|
||||
def put_if_absent(self, key: K, value: V) -> V:
|
||||
"""Put value only if key doesn't exist"""
|
||||
with self._lock:
|
||||
if key not in self:
|
||||
super().__setitem__(key, value)
|
||||
return value
|
||||
return self[key]
|
||||
|
||||
def remove(self, key: K) -> Optional[V]:
|
||||
"""Remove key with synchronization"""
|
||||
with self._lock:
|
||||
return super().pop(key, None)
|
||||
|
||||
def clear(self):
|
||||
"""Clear all items with synchronization"""
|
||||
with self._lock:
|
||||
super().clear()
|
||||
|
||||
def update(self, other: dict = None, **kwargs):
|
||||
"""Update dictionary with synchronization"""
|
||||
with self._lock:
|
||||
if other:
|
||||
super().update(other)
|
||||
if kwargs:
|
||||
super().update(kwargs)
|
||||
|
||||
def compute_if_absent(self, key: K, mapping_function: Callable[[K], V]) -> V:
|
||||
"""Compute value if key is absent"""
|
||||
with self._lock:
|
||||
if key not in self:
|
||||
value = mapping_function(key)
|
||||
super().__setitem__(key, value)
|
||||
return value
|
||||
return self[key]
|
||||
|
||||
def compute_if_present(self, key: K, remapping_function: Callable[[K, V], V]) -> Optional[V]:
|
||||
"""Compute new value if key exists"""
|
||||
with self._lock:
|
||||
if key in self:
|
||||
old_value = self[key]
|
||||
new_value = remapping_function(key, old_value)
|
||||
if new_value is None:
|
||||
super().__delitem__(key)
|
||||
else:
|
||||
super().__setitem__(key, new_value)
|
||||
return new_value
|
||||
return None
|
||||
|
||||
def compute(self, key: K, remapping_function: Callable[[K, Optional[V]], Optional[V]]) -> Optional[V]:
|
||||
"""Compute new value for key"""
|
||||
with self._lock:
|
||||
old_value = self.get(key)
|
||||
new_value = remapping_function(key, old_value)
|
||||
if new_value is None:
|
||||
if key in self:
|
||||
super().__delitem__(key)
|
||||
else:
|
||||
super().__setitem__(key, new_value)
|
||||
return new_value
|
||||
|
||||
def replace(self, key: K, old_value: V, new_value: V) -> bool:
|
||||
"""Replace value if old value matches"""
|
||||
with self._lock:
|
||||
if key in self and self[key] == old_value:
|
||||
super().__setitem__(key, new_value)
|
||||
return True
|
||||
return False
|
||||
|
||||
def replace_value(self, key: K, value: V) -> bool:
|
||||
"""Replace value regardless of old value"""
|
||||
with self._lock:
|
||||
if key in self:
|
||||
super().__setitem__(key, value)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class SyncWriteMap(SyncWriteDict):
|
||||
"""Thread-safe dictionary (alias for compatibility)"""
|
||||
|
||||
def __init__(self, initial_capacity: int = None, load_factor: float = None,
|
||||
mapping: Dict[K, V] = None):
|
||||
super().__init__(mapping or {})
|
||||
# Note: initial_capacity and load_factor are ignored in Python dict implementation
|
||||
# but kept for API compatibility
|
||||
Reference in New Issue
Block a user