@@ -0,0 +1 @@ | |||
Add a config options to allow for auto-tuning of caches. |
@@ -784,6 +784,24 @@ caches: | |||
# | |||
#cache_entry_ttl: 30m | |||
# This flag enables cache autotuning, and is further specified by the sub-options `max_cache_memory_usage`, | |||
# `target_cache_memory_usage`, `min_cache_ttl`. These flags work in conjunction with each other to maintain | |||
# a balance between cache memory usage and cache entry availability. You must be using jemalloc to utilize | |||
# this option, and all three of the options must be specified for this feature to work. | |||
#cache_autotuning: | |||
# This flag sets a ceiling on much memory the cache can use before caches begin to be continuously evicted. | |||
# They will continue to be evicted until the memory usage drops below the `target_memory_usage`, set in | |||
# the flag below, or until the `min_cache_ttl` is hit. | |||
#max_cache_memory_usage: 1024M | |||
# This flag sets a rough target for the desired memory usage of the caches. | |||
#target_cache_memory_usage: 758M | |||
# 'min_cache_ttl` sets a limit under which newer cache entries are not evicted and is only applied when | |||
# caches are actively being evicted/`max_cache_memory_usage` has been exceeded. This is to protect hot caches | |||
# from being emptied while Synapse is evicting due to memory. | |||
#min_cache_ttl: 5m | |||
# Controls how long the results of a /sync request are cached for after | |||
# a successful response is returned. A higher duration can help clients with | |||
# intermittent connections, at the cost of higher memory usage. | |||
@@ -1119,7 +1119,17 @@ Caching can be configured through the following sub-options: | |||
with intermittent connections, at the cost of higher memory usage. | |||
By default, this is zero, which means that sync responses are not cached | |||
at all. | |||
* `cache_autotuning` and its sub-options `max_cache_memory_usage`, `target_cache_memory_usage`, and | |||
`min_cache_ttl` work in conjunction with each other to maintain a balance between cache memory | |||
usage and cache entry availability. You must be using [jemalloc](https://github.com/matrix-org/synapse#help-synapse-is-slow-and-eats-all-my-ramcpu) | |||
to utilize this option, and all three of the options must be specified for this feature to work. | |||
* `max_cache_memory_usage` sets a ceiling on how much memory the cache can use before caches begin to be continuously evicted. | |||
They will continue to be evicted until the memory usage drops below the `target_memory_usage`, set in | |||
the flag below, or until the `min_cache_ttl` is hit. | |||
* `target_memory_usage` sets a rough target for the desired memory usage of the caches. | |||
* `min_cache_ttl` sets a limit under which newer cache entries are not evicted and is only applied when | |||
caches are actively being evicted/`max_cache_memory_usage` has been exceeded. This is to protect hot caches | |||
from being emptied while Synapse is evicting due to memory. | |||
Example configuration: | |||
```yaml | |||
@@ -1127,8 +1137,11 @@ caches: | |||
global_factor: 1.0 | |||
per_cache_factors: | |||
get_users_who_share_room_with_user: 2.0 | |||
expire_caches: false | |||
sync_response_cache_duration: 2m | |||
cache_autotuning: | |||
max_cache_memory_usage: 1024M | |||
target_cache_memory_usage: 758M | |||
min_cache_ttl: 5m | |||
``` | |||
### Reloading cache factors | |||
@@ -176,6 +176,24 @@ class CacheConfig(Config): | |||
# | |||
#cache_entry_ttl: 30m | |||
# This flag enables cache autotuning, and is further specified by the sub-options `max_cache_memory_usage`, | |||
# `target_cache_memory_usage`, `min_cache_ttl`. These flags work in conjunction with each other to maintain | |||
# a balance between cache memory usage and cache entry availability. You must be using jemalloc to utilize | |||
# this option, and all three of the options must be specified for this feature to work. | |||
#cache_autotuning: | |||
# This flag sets a ceiling on much memory the cache can use before caches begin to be continuously evicted. | |||
# They will continue to be evicted until the memory usage drops below the `target_memory_usage`, set in | |||
# the flag below, or until the `min_cache_ttl` is hit. | |||
#max_cache_memory_usage: 1024M | |||
# This flag sets a rough target for the desired memory usage of the caches. | |||
#target_cache_memory_usage: 758M | |||
# 'min_cache_ttl` sets a limit under which newer cache entries are not evicted and is only applied when | |||
# caches are actively being evicted/`max_cache_memory_usage` has been exceeded. This is to protect hot caches | |||
# from being emptied while Synapse is evicting due to memory. | |||
#min_cache_ttl: 5m | |||
# Controls how long the results of a /sync request are cached for after | |||
# a successful response is returned. A higher duration can help clients with | |||
# intermittent connections, at the cost of higher memory usage. | |||
@@ -263,6 +281,21 @@ class CacheConfig(Config): | |||
) | |||
self.expiry_time_msec = self.parse_duration(expiry_time) | |||
self.cache_autotuning = cache_config.get("cache_autotuning") | |||
if self.cache_autotuning: | |||
max_memory_usage = self.cache_autotuning.get("max_cache_memory_usage") | |||
self.cache_autotuning["max_cache_memory_usage"] = self.parse_size( | |||
max_memory_usage | |||
) | |||
target_mem_size = self.cache_autotuning.get("target_cache_memory_usage") | |||
self.cache_autotuning["target_cache_memory_usage"] = self.parse_size( | |||
target_mem_size | |||
) | |||
min_cache_ttl = self.cache_autotuning.get("min_cache_ttl") | |||
self.cache_autotuning["min_cache_ttl"] = self.parse_duration(min_cache_ttl) | |||
self.sync_response_cache_duration = self.parse_duration( | |||
cache_config.get("sync_response_cache_duration", 0) | |||
) | |||
@@ -18,6 +18,7 @@ import os | |||
import re | |||
from typing import Iterable, Optional, overload | |||
import attr | |||
from prometheus_client import REGISTRY, Metric | |||
from typing_extensions import Literal | |||
@@ -27,52 +28,24 @@ from synapse.metrics._types import Collector | |||
logger = logging.getLogger(__name__) | |||
def _setup_jemalloc_stats() -> None: | |||
"""Checks to see if jemalloc is loaded, and hooks up a collector to record | |||
statistics exposed by jemalloc. | |||
""" | |||
# Try to find the loaded jemalloc shared library, if any. We need to | |||
# introspect into what is loaded, rather than loading whatever is on the | |||
# path, as if we load a *different* jemalloc version things will seg fault. | |||
# We look in `/proc/self/maps`, which only exists on linux. | |||
if not os.path.exists("/proc/self/maps"): | |||
logger.debug("Not looking for jemalloc as no /proc/self/maps exist") | |||
return | |||
# We're looking for a path at the end of the line that includes | |||
# "libjemalloc". | |||
regex = re.compile(r"/\S+/libjemalloc.*$") | |||
jemalloc_path = None | |||
with open("/proc/self/maps") as f: | |||
for line in f: | |||
match = regex.search(line.strip()) | |||
if match: | |||
jemalloc_path = match.group() | |||
if not jemalloc_path: | |||
# No loaded jemalloc was found. | |||
logger.debug("jemalloc not found") | |||
return | |||
logger.debug("Found jemalloc at %s", jemalloc_path) | |||
jemalloc = ctypes.CDLL(jemalloc_path) | |||
@attr.s(slots=True, frozen=True, auto_attribs=True) | |||
class JemallocStats: | |||
jemalloc: ctypes.CDLL | |||
@overload | |||
def _mallctl( | |||
name: str, read: Literal[True] = True, write: Optional[int] = None | |||
self, name: str, read: Literal[True] = True, write: Optional[int] = None | |||
) -> int: | |||
... | |||
@overload | |||
def _mallctl(name: str, read: Literal[False], write: Optional[int] = None) -> None: | |||
def _mallctl( | |||
self, name: str, read: Literal[False], write: Optional[int] = None | |||
) -> None: | |||
... | |||
def _mallctl( | |||
name: str, read: bool = True, write: Optional[int] = None | |||
self, name: str, read: bool = True, write: Optional[int] = None | |||
) -> Optional[int]: | |||
"""Wrapper around `mallctl` for reading and writing integers to | |||
jemalloc. | |||
@@ -120,7 +93,7 @@ def _setup_jemalloc_stats() -> None: | |||
# Where oldp/oldlenp is a buffer where the old value will be written to | |||
# (if not null), and newp/newlen is the buffer with the new value to set | |||
# (if not null). Note that they're all references *except* newlen. | |||
result = jemalloc.mallctl( | |||
result = self.jemalloc.mallctl( | |||
name.encode("ascii"), | |||
input_var_ref, | |||
input_len_ref, | |||
@@ -136,21 +109,80 @@ def _setup_jemalloc_stats() -> None: | |||
return input_var.value | |||
def _jemalloc_refresh_stats() -> None: | |||
def refresh_stats(self) -> None: | |||
"""Request that jemalloc updates its internal statistics. This needs to | |||
be called before querying for stats, otherwise it will return stale | |||
values. | |||
""" | |||
try: | |||
_mallctl("epoch", read=False, write=1) | |||
self._mallctl("epoch", read=False, write=1) | |||
except Exception as e: | |||
logger.warning("Failed to reload jemalloc stats: %s", e) | |||
def get_stat(self, name: str) -> int: | |||
"""Request the stat of the given name at the time of the last | |||
`refresh_stats` call. This may throw if we fail to read | |||
the stat. | |||
""" | |||
return self._mallctl(f"stats.{name}") | |||
_JEMALLOC_STATS: Optional[JemallocStats] = None | |||
def get_jemalloc_stats() -> Optional[JemallocStats]: | |||
"""Returns an interface to jemalloc, if it is being used. | |||
Note that this will always return None until `setup_jemalloc_stats` has been | |||
called. | |||
""" | |||
return _JEMALLOC_STATS | |||
def _setup_jemalloc_stats() -> None: | |||
"""Checks to see if jemalloc is loaded, and hooks up a collector to record | |||
statistics exposed by jemalloc. | |||
""" | |||
global _JEMALLOC_STATS | |||
# Try to find the loaded jemalloc shared library, if any. We need to | |||
# introspect into what is loaded, rather than loading whatever is on the | |||
# path, as if we load a *different* jemalloc version things will seg fault. | |||
# We look in `/proc/self/maps`, which only exists on linux. | |||
if not os.path.exists("/proc/self/maps"): | |||
logger.debug("Not looking for jemalloc as no /proc/self/maps exist") | |||
return | |||
# We're looking for a path at the end of the line that includes | |||
# "libjemalloc". | |||
regex = re.compile(r"/\S+/libjemalloc.*$") | |||
jemalloc_path = None | |||
with open("/proc/self/maps") as f: | |||
for line in f: | |||
match = regex.search(line.strip()) | |||
if match: | |||
jemalloc_path = match.group() | |||
if not jemalloc_path: | |||
# No loaded jemalloc was found. | |||
logger.debug("jemalloc not found") | |||
return | |||
logger.debug("Found jemalloc at %s", jemalloc_path) | |||
jemalloc_dll = ctypes.CDLL(jemalloc_path) | |||
stats = JemallocStats(jemalloc_dll) | |||
_JEMALLOC_STATS = stats | |||
class JemallocCollector(Collector): | |||
"""Metrics for internal jemalloc stats.""" | |||
def collect(self) -> Iterable[Metric]: | |||
_jemalloc_refresh_stats() | |||
stats.refresh_stats() | |||
g = GaugeMetricFamily( | |||
"jemalloc_stats_app_memory_bytes", | |||
@@ -184,7 +216,7 @@ def _setup_jemalloc_stats() -> None: | |||
"metadata", | |||
): | |||
try: | |||
value = _mallctl(f"stats.{t}") | |||
value = stats.get_stat(t) | |||
except Exception as e: | |||
# There was an error fetching the value, skip. | |||
logger.warning("Failed to read jemalloc stats.%s: %s", t, e) | |||
@@ -13,6 +13,7 @@ | |||
# limitations under the License. | |||
import logging | |||
import math | |||
import threading | |||
import weakref | |||
from enum import Enum | |||
@@ -40,6 +41,7 @@ from twisted.internet.interfaces import IReactorTime | |||
from synapse.config import cache as cache_config | |||
from synapse.metrics.background_process_metrics import wrap_as_background_process | |||
from synapse.metrics.jemalloc import get_jemalloc_stats | |||
from synapse.util import Clock, caches | |||
from synapse.util.caches import CacheMetric, EvictionReason, register_cache | |||
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry | |||
@@ -106,10 +108,16 @@ GLOBAL_ROOT = ListNode["_Node"].create_root_node() | |||
@wrap_as_background_process("LruCache._expire_old_entries") | |||
async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None: | |||
async def _expire_old_entries( | |||
clock: Clock, expiry_seconds: int, autotune_config: Optional[dict] | |||
) -> None: | |||
"""Walks the global cache list to find cache entries that haven't been | |||
accessed in the given number of seconds. | |||
accessed in the given number of seconds, or if a given memory threshold has been breached. | |||
""" | |||
if autotune_config: | |||
max_cache_memory_usage = autotune_config["max_cache_memory_usage"] | |||
target_cache_memory_usage = autotune_config["target_cache_memory_usage"] | |||
min_cache_ttl = autotune_config["min_cache_ttl"] / 1000 | |||
now = int(clock.time()) | |||
node = GLOBAL_ROOT.prev_node | |||
@@ -119,11 +127,36 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None: | |||
logger.debug("Searching for stale caches") | |||
evicting_due_to_memory = False | |||
# determine if we're evicting due to memory | |||
jemalloc_interface = get_jemalloc_stats() | |||
if jemalloc_interface and autotune_config: | |||
try: | |||
jemalloc_interface.refresh_stats() | |||
mem_usage = jemalloc_interface.get_stat("allocated") | |||
if mem_usage > max_cache_memory_usage: | |||
logger.info("Begin memory-based cache eviction.") | |||
evicting_due_to_memory = True | |||
except Exception: | |||
logger.warning( | |||
"Unable to read allocated memory, skipping memory-based cache eviction." | |||
) | |||
while node is not GLOBAL_ROOT: | |||
# Only the root node isn't a `_TimedListNode`. | |||
assert isinstance(node, _TimedListNode) | |||
if node.last_access_ts_secs > now - expiry_seconds: | |||
# if node has not aged past expiry_seconds and we are not evicting due to memory usage, there's | |||
# nothing to do here | |||
if ( | |||
node.last_access_ts_secs > now - expiry_seconds | |||
and not evicting_due_to_memory | |||
): | |||
break | |||
# if entry is newer than min_cache_entry_ttl then do not evict and don't evict anything newer | |||
if evicting_due_to_memory and now - node.last_access_ts_secs < min_cache_ttl: | |||
break | |||
cache_entry = node.get_cache_entry() | |||
@@ -136,10 +169,29 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None: | |||
assert cache_entry is not None | |||
cache_entry.drop_from_cache() | |||
# Check mem allocation periodically if we are evicting a bunch of caches | |||
if jemalloc_interface and evicting_due_to_memory and (i + 1) % 100 == 0: | |||
try: | |||
jemalloc_interface.refresh_stats() | |||
mem_usage = jemalloc_interface.get_stat("allocated") | |||
if mem_usage < target_cache_memory_usage: | |||
evicting_due_to_memory = False | |||
logger.info("Stop memory-based cache eviction.") | |||
except Exception: | |||
logger.warning( | |||
"Unable to read allocated memory, this may affect memory-based cache eviction." | |||
) | |||
# If we've failed to read the current memory usage then we | |||
# should stop trying to evict based on memory usage | |||
evicting_due_to_memory = False | |||
# If we do lots of work at once we yield to allow other stuff to happen. | |||
if (i + 1) % 10000 == 0: | |||
logger.debug("Waiting during drop") | |||
await clock.sleep(0) | |||
if node.last_access_ts_secs > now - expiry_seconds: | |||
await clock.sleep(0.5) | |||
else: | |||
await clock.sleep(0) | |||
logger.debug("Waking during drop") | |||
node = next_node | |||
@@ -156,21 +208,28 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None: | |||
def setup_expire_lru_cache_entries(hs: "HomeServer") -> None: | |||
"""Start a background job that expires all cache entries if they have not | |||
been accessed for the given number of seconds. | |||
been accessed for the given number of seconds, or if a given memory usage threshold has been | |||
breached. | |||
""" | |||
if not hs.config.caches.expiry_time_msec: | |||
if not hs.config.caches.expiry_time_msec and not hs.config.caches.cache_autotuning: | |||
return | |||
logger.info( | |||
"Expiring LRU caches after %d seconds", hs.config.caches.expiry_time_msec / 1000 | |||
) | |||
if hs.config.caches.expiry_time_msec: | |||
expiry_time = hs.config.caches.expiry_time_msec / 1000 | |||
logger.info("Expiring LRU caches after %d seconds", expiry_time) | |||
else: | |||
expiry_time = math.inf | |||
global USE_GLOBAL_LIST | |||
USE_GLOBAL_LIST = True | |||
clock = hs.get_clock() | |||
clock.looping_call( | |||
_expire_old_entries, 30 * 1000, clock, hs.config.caches.expiry_time_msec / 1000 | |||
_expire_old_entries, | |||
30 * 1000, | |||
clock, | |||
expiry_time, | |||
hs.config.caches.cache_autotuning, | |||
) | |||
@@ -14,8 +14,9 @@ | |||
from typing import List | |||
from unittest.mock import Mock | |||
from unittest.mock import Mock, patch | |||
from synapse.metrics.jemalloc import JemallocStats | |||
from synapse.util.caches.lrucache import LruCache, setup_expire_lru_cache_entries | |||
from synapse.util.caches.treecache import TreeCache | |||
@@ -316,3 +317,58 @@ class TimeEvictionTestCase(unittest.HomeserverTestCase): | |||
self.assertEqual(cache.get("key1"), None) | |||
self.assertEqual(cache.get("key2"), 3) | |||
class MemoryEvictionTestCase(unittest.HomeserverTestCase): | |||
@override_config( | |||
{ | |||
"caches": { | |||
"cache_autotuning": { | |||
"max_cache_memory_usage": "700M", | |||
"target_cache_memory_usage": "500M", | |||
"min_cache_ttl": "5m", | |||
} | |||
} | |||
} | |||
) | |||
@patch("synapse.util.caches.lrucache.get_jemalloc_stats") | |||
def test_evict_memory(self, jemalloc_interface) -> None: | |||
mock_jemalloc_class = Mock(spec=JemallocStats) | |||
jemalloc_interface.return_value = mock_jemalloc_class | |||
# set the return value of get_stat() to be greater than max_cache_memory_usage | |||
mock_jemalloc_class.get_stat.return_value = 924288000 | |||
setup_expire_lru_cache_entries(self.hs) | |||
cache = LruCache(4, clock=self.hs.get_clock()) | |||
cache["key1"] = 1 | |||
cache["key2"] = 2 | |||
# advance the reactor less than the min_cache_ttl | |||
self.reactor.advance(60 * 2) | |||
# our items should still be in the cache | |||
self.assertEqual(cache.get("key1"), 1) | |||
self.assertEqual(cache.get("key2"), 2) | |||
# advance the reactor past the min_cache_ttl | |||
self.reactor.advance(60 * 6) | |||
# the items should be cleared from cache | |||
self.assertEqual(cache.get("key1"), None) | |||
self.assertEqual(cache.get("key2"), None) | |||
# add more stuff to caches | |||
cache["key1"] = 1 | |||
cache["key2"] = 2 | |||
# set the return value of get_stat() to be lower than target_cache_memory_usage | |||
mock_jemalloc_class.get_stat.return_value = 10000 | |||
# advance the reactor past the min_cache_ttl | |||
self.reactor.advance(60 * 6) | |||
# the items should still be in the cache | |||
self.assertEqual(cache.get("key1"), 1) | |||
self.assertEqual(cache.get("key2"), 2) |