You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

184 lines
6.1 KiB

  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Dict, Iterable, List, Sequence
  15. from synapse.util.iterutils import (
  16. chunk_seq,
  17. sorted_topologically,
  18. sorted_topologically_batched,
  19. )
  20. from tests.unittest import TestCase
  21. class ChunkSeqTests(TestCase):
  22. def test_short_seq(self) -> None:
  23. parts = chunk_seq("123", 8)
  24. self.assertEqual(
  25. list(parts),
  26. ["123"],
  27. )
  28. def test_long_seq(self) -> None:
  29. parts = chunk_seq("abcdefghijklmnop", 8)
  30. self.assertEqual(
  31. list(parts),
  32. ["abcdefgh", "ijklmnop"],
  33. )
  34. def test_uneven_parts(self) -> None:
  35. parts = chunk_seq("abcdefghijklmnop", 5)
  36. self.assertEqual(
  37. list(parts),
  38. ["abcde", "fghij", "klmno", "p"],
  39. )
  40. def test_empty_input(self) -> None:
  41. parts: Iterable[Sequence] = chunk_seq([], 5)
  42. self.assertEqual(
  43. list(parts),
  44. [],
  45. )
  46. class SortTopologically(TestCase):
  47. def test_empty(self) -> None:
  48. "Test that an empty graph works correctly"
  49. graph: Dict[int, List[int]] = {}
  50. self.assertEqual(list(sorted_topologically([], graph)), [])
  51. def test_handle_empty_graph(self) -> None:
  52. "Test that a graph where a node doesn't have an entry is treated as empty"
  53. graph: Dict[int, List[int]] = {}
  54. # For disconnected nodes the output is simply sorted.
  55. self.assertEqual(list(sorted_topologically([1, 2], graph)), [1, 2])
  56. def test_disconnected(self) -> None:
  57. "Test that a graph with no edges work"
  58. graph: Dict[int, List[int]] = {1: [], 2: []}
  59. # For disconnected nodes the output is simply sorted.
  60. self.assertEqual(list(sorted_topologically([1, 2], graph)), [1, 2])
  61. def test_linear(self) -> None:
  62. "Test that a simple `4 -> 3 -> 2 -> 1` graph works"
  63. graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
  64. self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4])
  65. def test_subset(self) -> None:
  66. "Test that only sorting a subset of the graph works"
  67. graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
  68. self.assertEqual(list(sorted_topologically([4, 3], graph)), [3, 4])
  69. def test_fork(self) -> None:
  70. "Test that a forked graph works"
  71. graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [1], 4: [2, 3]}
  72. # Valid orderings are `[1, 3, 2, 4]` or `[1, 2, 3, 4]`, but we should
  73. # always get the same one.
  74. self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4])
  75. def test_duplicates(self) -> None:
  76. "Test that a graph with duplicate edges work"
  77. graph: Dict[int, List[int]] = {1: [], 2: [1, 1], 3: [2, 2], 4: [3]}
  78. self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4])
  79. def test_multiple_paths(self) -> None:
  80. "Test that a graph with multiple paths between two nodes work"
  81. graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]}
  82. self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4])
  83. class SortTopologicallyBatched(TestCase):
  84. "Test cases for `sorted_topologically_batched`"
  85. def test_empty(self) -> None:
  86. "Test that an empty graph works correctly"
  87. graph: Dict[int, List[int]] = {}
  88. self.assertEqual(list(sorted_topologically_batched([], graph)), [])
  89. def test_handle_empty_graph(self) -> None:
  90. "Test that a graph where a node doesn't have an entry is treated as empty"
  91. graph: Dict[int, List[int]] = {}
  92. # For disconnected nodes the output is simply sorted.
  93. self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]])
  94. def test_disconnected(self) -> None:
  95. "Test that a graph with no edges work"
  96. graph: Dict[int, List[int]] = {1: [], 2: []}
  97. # For disconnected nodes the output is simply sorted.
  98. self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]])
  99. def test_linear(self) -> None:
  100. "Test that a simple `4 -> 3 -> 2 -> 1` graph works"
  101. graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
  102. self.assertEqual(
  103. list(sorted_topologically_batched([4, 3, 2, 1], graph)),
  104. [[1], [2], [3], [4]],
  105. )
  106. def test_subset(self) -> None:
  107. "Test that only sorting a subset of the graph works"
  108. graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
  109. self.assertEqual(list(sorted_topologically_batched([4, 3], graph)), [[3], [4]])
  110. def test_fork(self) -> None:
  111. "Test that a forked graph works"
  112. graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [1], 4: [2, 3]}
  113. # Valid orderings are `[1, 3, 2, 4]` or `[1, 2, 3, 4]`, but we should
  114. # always get the same one.
  115. self.assertEqual(
  116. list(sorted_topologically_batched([4, 3, 2, 1], graph)), [[1], [2, 3], [4]]
  117. )
  118. def test_duplicates(self) -> None:
  119. "Test that a graph with duplicate edges work"
  120. graph: Dict[int, List[int]] = {1: [], 2: [1, 1], 3: [2, 2], 4: [3]}
  121. self.assertEqual(
  122. list(sorted_topologically_batched([4, 3, 2, 1], graph)),
  123. [[1], [2], [3], [4]],
  124. )
  125. def test_multiple_paths(self) -> None:
  126. "Test that a graph with multiple paths between two nodes work"
  127. graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]}
  128. self.assertEqual(
  129. list(sorted_topologically_batched([4, 3, 2, 1], graph)),
  130. [[1], [2], [3], [4]],
  131. )