You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

335 lines
7.5 KiB

  1. /*
  2. This module contains an array of all threads. Each thread looks like this:
  3. {
  4. id: 'The zoneId',
  5. name: 'The name of the map',
  6. instanced: 'Boolean value indicating whether the thread is instanced or not',
  7. path: 'The path to the map file',
  8. worker: 'The actual thread that has been spawned',
  9. isReady: 'Boolean value that turns from false to true as soon as the thread is ready to accept players',
  10. promise: 'Used by the getThread method to wait for the thread to be ready, so it can be sent to the atlas',
  11. cbOnInitialized: 'The promise resolver',
  12. players: 'An array of all player id's that have been in this thread',
  13. playersCurrent: 'An array of player id's that are currently in this thread',
  14. birthEpoch: 'Defines the epoch when this was created',
  15. destroyWhenEmptyForMs: 'If not equal to -1, defines whether the thread should be destroyed if it has been empty for this amount of milliseconds',
  16. emptySinceEpoch: 'An epoch of when the thread became empty, or null if not empty'
  17. }
  18. */
  19. //System Imports
  20. const childProcess = require('child_process');
  21. //Imports
  22. const objects = require('../objects/objects');
  23. const connections = require('../security/connections');
  24. const { mapList } = require('./mapManager');
  25. const { registerCallback } = require('./atlas/registerCallback');
  26. //Internals
  27. const threads = [];
  28. const listenersOnZoneIdle = [];
  29. //Helpers
  30. const getThreadFromName = name => {
  31. return threads.find(t => t.name === name);
  32. };
  33. const getThreadFromId = threadId => {
  34. return threads.find(t => t.id === threadId);
  35. };
  36. const gePlayerCountInThread = async thread => {
  37. const { playerCount } = await new Promise(res => {
  38. const cb = registerCallback(res);
  39. thread.worker.send({
  40. method: 'getPlayerCount',
  41. args: {
  42. callbackId: cb
  43. }
  44. });
  45. });
  46. return playerCount;
  47. };
  48. const removePlayerFromThread = (thread, playerId) => {
  49. thread.playersCurrent.spliceWhere(p => p === playerId);
  50. };
  51. const messageHandlers = {
  52. onReady: function (thread) {
  53. thread.worker.send({
  54. method: 'init',
  55. args: {
  56. zoneName: thread.name,
  57. zoneId: thread.id,
  58. path: thread.path
  59. }
  60. });
  61. },
  62. onInitialized: function (thread) {
  63. thread.isReady = true;
  64. thread.cbOnInitialized(thread);
  65. delete thread.cbOnInitialized;
  66. delete thread.promise;
  67. },
  68. event: function (thread, message) {
  69. objects.sendEvent(message, thread);
  70. },
  71. events: function (thread, message) {
  72. objects.sendEvents(message, thread);
  73. },
  74. object: function (thread, message) {
  75. objects.updateObject(message);
  76. },
  77. track: function (thread, message) {
  78. let player = objects.objects.find(o => o.id === message.serverId);
  79. if (!player)
  80. return;
  81. player.auth.gaTracker.track(message.obj);
  82. },
  83. callDifferentThread: function (thread, message) {
  84. let obj = connections.players.find(p => (p.name === message.playerName));
  85. if (!obj)
  86. return;
  87. let newThread = getThreadFromName(obj.zoneName);
  88. if (!newThread)
  89. return;
  90. newThread.worker.send({
  91. module: message.data.module,
  92. method: message.data.method,
  93. args: message.data.args
  94. });
  95. },
  96. rezone: async function (thread, message) {
  97. const { args: { obj, newZone, keepPos = true } } = message;
  98. removePlayerFromThread(thread, obj.serverId);
  99. //When messages are sent from map threads, they have an id (id of the object in the map thread)
  100. // as well as a serverId (id of the object in the main thread)
  101. const serverId = obj.serverId;
  102. obj.id = serverId;
  103. obj.destroyed = false;
  104. const serverObj = objects.objects.find(o => o.id === obj.id);
  105. const mapExists = mapList.some(m => m.name === newZone);
  106. if (mapExists) {
  107. serverObj.zoneName = newZone;
  108. obj.zoneName = newZone;
  109. } else {
  110. obj.zoneName = clientConfig.config.defaultZone;
  111. serverObj.zoneName = clientConfig.config.defaultZone;
  112. }
  113. delete serverObj.zoneId;
  114. delete obj.zoneId;
  115. const isRezone = true;
  116. await atlas.addObject(obj, keepPos, isRezone);
  117. },
  118. onZoneIdle: function (thread) {
  119. listenersOnZoneIdle.forEach(l => l(thread));
  120. }
  121. };
  122. const onMessage = (thread, message) => {
  123. if (message.module) {
  124. try {
  125. global[message.module][message.method](message);
  126. } catch (e) {
  127. /* eslint-disable-next-line no-console */
  128. console.log('No global method found', message.module, message.method);
  129. process.exit();
  130. }
  131. } else if (message.event === 'onCrashed') {
  132. thread.worker.kill();
  133. process.exit();
  134. } else
  135. messageHandlers[message.method](thread, message);
  136. };
  137. const spawnThread = async ({ name, path, instanced, destroyWhenEmptyForMs = -1 }) => {
  138. let cbOnInitialized;
  139. const promise = new Promise(resolveOnReady => {
  140. cbOnInitialized = resolveOnReady;
  141. });
  142. const worker = childProcess.fork('./world/worker', [name]);
  143. const id = instanced ? _.getGuid() : name;
  144. const thread = {
  145. id,
  146. name,
  147. instanced,
  148. path,
  149. worker,
  150. isReady: false,
  151. promise,
  152. cbOnInitialized,
  153. players: [],
  154. playersCurrent: [],
  155. birthEpoch: +new Date(),
  156. destroyWhenEmptyForMs,
  157. emptySinceEpoch: null
  158. };
  159. worker.on('message', onMessage.bind(null, thread));
  160. threads.push(thread);
  161. return promise;
  162. };
  163. const getThread = async ({ zoneName, zoneId, playerId }) => {
  164. const result = {
  165. resetObjPosition: false,
  166. thread: null
  167. };
  168. let map = mapList.find(m => m.name === zoneName);
  169. if (!map)
  170. map = mapList.find(m => m.name === clientConfig.config.defaultZone);
  171. let thread = threads.find(t => t.id === zoneId && t.name === zoneName);
  172. //Maybe this player has been in a thread for this map before
  173. if (!thread)
  174. thread = threads.find(t => t.name === zoneName && t.players.includes(playerId));
  175. if (!thread) {
  176. if (map.instanced) {
  177. result.resetObjPosition = true;
  178. thread = await spawnThread(map);
  179. } else
  180. thread = getThreadFromName(map.name);
  181. }
  182. if (!thread.isReady)
  183. await thread.promise;
  184. result.thread = thread;
  185. return result;
  186. };
  187. const killThread = thread => {
  188. thread.worker.kill();
  189. threads.spliceWhere(t => t === thread);
  190. };
  191. const sendMessageToThread = ({ threadId, msg }) => {
  192. const thread = threads.find(t => t.id === threadId);
  193. if (thread)
  194. thread.worker.send(msg);
  195. };
  196. const messageAllThreads = message => {
  197. threads.forEach(t => t.worker.send(message));
  198. };
  199. const returnWhenThreadsIdle = async () => {
  200. return new Promise(res => {
  201. let doneCount = 0;
  202. const onZoneIdle = thread => {
  203. doneCount++;
  204. if (doneCount.length < threads.length)
  205. return;
  206. listenersOnZoneIdle.spliceWhere(l => l === onZoneIdle);
  207. res();
  208. };
  209. listenersOnZoneIdle.push(onZoneIdle);
  210. threads.forEach(t => {
  211. t.worker.send({
  212. method: 'notifyOnceIdle'
  213. });
  214. });
  215. });
  216. };
  217. const spawnMapThreads = async () => {
  218. const promises = mapList
  219. .filter(m => !m.disabled && !m.instanced)
  220. .map(m => spawnThread(m));
  221. await Promise.all(promises);
  222. };
  223. const addPlayerToThread = (thread, playerId) => {
  224. thread.players.push(playerId);
  225. thread.playersCurrent.push(playerId);
  226. if (thread.emptySinceEpoch)
  227. thread.emptySinceEpoch = null;
  228. };
  229. const update = () => {
  230. let tLen = threads.length;
  231. for (let i = 0; i < tLen; i++) {
  232. const t = threads[i];
  233. if (!t.isReady || t.destroyWhenEmptyForMs === -1)
  234. continue;
  235. if (!t.emptySinceEpoch && t.playersCurrent.length === 0)
  236. t.emptySinceEpoch = +new Date();
  237. if (!t.emptySinceEpoch)
  238. continue;
  239. const ageInMs = (+new Date() - t.emptySinceEpoch);
  240. if (ageInMs < t.destroyWhenEmptyForMs)
  241. continue;
  242. killThread(t);
  243. i--;
  244. tLen--;
  245. }
  246. };
  247. const init = async () => {
  248. await spawnMapThreads();
  249. setInterval(update, 5000);
  250. };
  251. //Exports
  252. module.exports = {
  253. init,
  254. getThread,
  255. killThread,
  256. getThreadFromId,
  257. messageAllThreads,
  258. sendMessageToThread,
  259. returnWhenThreadsIdle,
  260. addPlayerToThread,
  261. removePlayerFromThread,
  262. gePlayerCountInThread
  263. };