You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

343 lines
7.7 KiB

  1. /*
  2. This module contains an array of all threads. Each thread looks like this:
  3. {
  4. id: 'The zoneId',
  5. name: 'The name of the map',
  6. instanced: 'Boolean value indicating whether the thread is instanced or not',
  7. path: 'The path to the map file',
  8. worker: 'The actual thread that has been spawned',
  9. isReady: 'Boolean value that turns from false to true as soon as the thread is ready to accept players',
  10. promise: 'Used by the getThread method to wait for the thread to be ready, so it can be sent to the atlas',
  11. cbOnInitialized: 'The promise resolver',
  12. players: 'An array of all player id's that have been in this thread',
  13. playersCurrent: 'An array of player id's that are currently in this thread',
  14. birthEpoch: 'Defines the epoch when this was created',
  15. destroyWhenEmptyForMs: 'If not equal to -1, defines whether the thread should be destroyed if it has been empty for this amount of milliseconds',
  16. emptySinceEpoch: 'An epoch of when the thread became empty, or null if not empty'
  17. }
  18. */
  19. //System Imports
  20. const childProcess = require('child_process');
  21. //Imports
  22. const objects = require('../objects/objects');
  23. const connections = require('../security/connections');
  24. const { mapList } = require('./mapManager');
  25. const { registerCallback } = require('./atlas/registerCallback');
  26. const events = require('../misc/events');
  27. //Internals
  28. const threads = [];
  29. const listenersOnZoneIdle = [];
  30. //Helpers
  31. const getThreadFromName = name => {
  32. return threads.find(t => t.name === name);
  33. };
  34. const getThreadFromId = threadId => {
  35. return threads.find(t => t.id === threadId);
  36. };
  37. const gePlayerCountInThread = async thread => {
  38. const { playerCount } = await new Promise(res => {
  39. const cb = registerCallback(res);
  40. thread.worker.send({
  41. method: 'getPlayerCount',
  42. args: {
  43. callbackId: cb
  44. }
  45. });
  46. });
  47. return playerCount;
  48. };
  49. const removePlayerFromThread = (thread, playerId) => {
  50. thread.playersCurrent.spliceWhere(p => p === playerId);
  51. };
  52. const messageHandlers = {
  53. onReady: function (thread) {
  54. thread.worker.send({
  55. method: 'init',
  56. args: {
  57. zoneName: thread.name,
  58. zoneId: thread.id,
  59. path: thread.path
  60. }
  61. });
  62. },
  63. onInitialized: function (thread) {
  64. thread.isReady = true;
  65. thread.cbOnInitialized(thread);
  66. delete thread.cbOnInitialized;
  67. delete thread.promise;
  68. },
  69. event: function (thread, message) {
  70. objects.sendEvent(message, thread);
  71. },
  72. events: function (thread, message) {
  73. objects.sendEvents(message, thread);
  74. },
  75. object: function (thread, message) {
  76. objects.updateObject(message);
  77. },
  78. track: function (thread, message) {
  79. let player = objects.objects.find(o => o.id === message.serverId);
  80. if (!player)
  81. return;
  82. player.auth.gaTracker.track(message.obj);
  83. },
  84. callDifferentThread: function (thread, message) {
  85. let obj = connections.players.find(p => (p.name === message.playerName));
  86. if (!obj)
  87. return;
  88. let newThread = getThreadFromName(obj.zoneName);
  89. if (!newThread)
  90. return;
  91. newThread.worker.send({
  92. module: message.data.module,
  93. method: message.data.method,
  94. args: message.data.args
  95. });
  96. },
  97. rezone: async function (thread, message) {
  98. const { args: { obj, newZone, keepPos = true } } = message;
  99. removePlayerFromThread(thread, obj.serverId);
  100. //When messages are sent from map threads, they have an id (id of the object in the map thread)
  101. // as well as a serverId (id of the object in the main thread)
  102. const serverId = obj.serverId;
  103. obj.id = serverId;
  104. obj.destroyed = false;
  105. const serverObj = objects.objects.find(o => o.id === obj.id);
  106. const mapExists = mapList.some(m => m.name === newZone);
  107. if (mapExists) {
  108. serverObj.zoneName = newZone;
  109. obj.zoneName = newZone;
  110. } else {
  111. obj.zoneName = clientConfig.config.defaultZone;
  112. serverObj.zoneName = clientConfig.config.defaultZone;
  113. }
  114. delete serverObj.zoneId;
  115. delete obj.zoneId;
  116. const isRezone = true;
  117. await atlas.addObject(obj, keepPos, isRezone);
  118. },
  119. onZoneIdle: function (thread) {
  120. listenersOnZoneIdle.forEach(l => l(thread));
  121. }
  122. };
  123. const onMessage = (thread, message) => {
  124. if (message.module) {
  125. try {
  126. global[message.module][message.method](message);
  127. } catch (e) {
  128. /* eslint-disable-next-line no-console */
  129. console.log('No global method found', message.module, message.method);
  130. process.exit();
  131. }
  132. } else if (message.event === 'onCrashed') {
  133. thread.worker.kill();
  134. process.exit();
  135. } else
  136. messageHandlers[message.method](thread, message);
  137. };
  138. const spawnThread = async ({ name, path, instanced, destroyWhenEmptyForMs = -1 }, obj) => {
  139. let cbOnInitialized;
  140. const promise = new Promise(resolveOnReady => {
  141. cbOnInitialized = resolveOnReady;
  142. });
  143. const worker = childProcess.fork('./world/worker', [name]);
  144. let id = instanced ? _.getGuid() : name;
  145. const emBeforeGetThreadId = {
  146. id,
  147. obj,
  148. name,
  149. instanced
  150. };
  151. events.emit('beforeGetThreadId', emBeforeGetThreadId);
  152. id = emBeforeGetThreadId.id;
  153. const thread = {
  154. id,
  155. name,
  156. instanced,
  157. path,
  158. worker,
  159. isReady: false,
  160. promise,
  161. cbOnInitialized,
  162. players: [],
  163. playersCurrent: [],
  164. birthEpoch: +new Date(),
  165. destroyWhenEmptyForMs,
  166. emptySinceEpoch: null
  167. };
  168. worker.on('message', onMessage.bind(null, thread));
  169. threads.push(thread);
  170. return promise;
  171. };
  172. const getThread = async ({ zoneName, zoneId, obj }) => {
  173. const result = {
  174. resetObjPosition: false,
  175. thread: null
  176. };
  177. let map = mapList.find(m => m.name === zoneName);
  178. if (!map)
  179. map = mapList.find(m => m.name === clientConfig.config.defaultZone);
  180. let thread = threads.find(t => t.id === zoneId && t.name === zoneName);
  181. //Maybe this player has been in a thread for this map before
  182. if (!thread)
  183. thread = threads.find(t => t.name === zoneName && t.players.includes(obj.id));
  184. if (!thread) {
  185. if (map.instanced)
  186. result.resetObjPosition = true;
  187. thread = await spawnThread(map, obj);
  188. }
  189. if (!thread.isReady)
  190. await thread.promise;
  191. result.thread = thread;
  192. return result;
  193. };
  194. const killThread = thread => {
  195. thread.worker.kill();
  196. threads.spliceWhere(t => t === thread);
  197. };
  198. const sendMessageToThread = ({ threadId, msg }) => {
  199. const thread = threads.find(t => t.id === threadId);
  200. if (thread)
  201. thread.worker.send(msg);
  202. };
  203. const messageAllThreads = message => {
  204. threads.forEach(t => t.worker.send(message));
  205. };
  206. const returnWhenThreadsIdle = async () => {
  207. return new Promise(res => {
  208. let doneCount = 0;
  209. const onZoneIdle = thread => {
  210. doneCount++;
  211. if (doneCount.length < threads.length)
  212. return;
  213. listenersOnZoneIdle.spliceWhere(l => l === onZoneIdle);
  214. res();
  215. };
  216. listenersOnZoneIdle.push(onZoneIdle);
  217. threads.forEach(t => {
  218. t.worker.send({
  219. method: 'notifyOnceIdle'
  220. });
  221. });
  222. });
  223. };
  224. const spawnMapThreads = async () => {
  225. const promises = mapList
  226. .filter(m => !m.disabled && !m.instanced && m.autoSpawn !== false)
  227. .map(m => spawnThread(m));
  228. await Promise.all(promises);
  229. };
  230. const addPlayerToThread = (thread, playerId) => {
  231. thread.players.push(playerId);
  232. thread.playersCurrent.push(playerId);
  233. if (thread.emptySinceEpoch)
  234. thread.emptySinceEpoch = null;
  235. };
  236. const update = () => {
  237. let tLen = threads.length;
  238. for (let i = 0; i < tLen; i++) {
  239. const t = threads[i];
  240. if (!t.isReady || t.destroyWhenEmptyForMs === -1)
  241. continue;
  242. if (!t.emptySinceEpoch && t.playersCurrent.length === 0)
  243. t.emptySinceEpoch = +new Date();
  244. if (!t.emptySinceEpoch)
  245. continue;
  246. const ageInMs = (+new Date() - t.emptySinceEpoch);
  247. if (ageInMs < t.destroyWhenEmptyForMs)
  248. continue;
  249. killThread(t);
  250. i--;
  251. tLen--;
  252. }
  253. };
  254. const init = async () => {
  255. await spawnMapThreads();
  256. setInterval(update, 5000);
  257. };
  258. //Exports
  259. module.exports = {
  260. init,
  261. getThread,
  262. killThread,
  263. getThreadFromId,
  264. messageAllThreads,
  265. sendMessageToThread,
  266. returnWhenThreadsIdle,
  267. addPlayerToThread,
  268. removePlayerFromThread,
  269. gePlayerCountInThread
  270. };