You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

336 lines
7.7 KiB

  1. /*
  2. This module contains an array of all threads. Each thread looks like this:
  3. {
  4. id: 'The zoneId',
  5. name: 'The name of the map',
  6. instanced: 'Boolean value indicating whether the thread is instanced or not',
  7. path: 'The path to the map file',
  8. worker: 'The actual thread that has been spawned',
  9. isReady: 'Boolean value that turns from false to true as soon as the thread is ready to accept players',
  10. promise: 'Used by the getThread method to wait for the thread to be ready, so it can be sent to the atlas',
  11. cbOnInitialized: 'The promise resolver',
  12. players: 'An array of all player id's that have been in this thread',
  13. playersCurrent: 'An array of player id's that are currently in this thread',
  14. birthEpoch: 'Defines the epoch when this was created',
  15. destroyWhenEmptyForMs: 'If not equal to -1, defines whether the thread should be destroyed if it has been empty for this amount of milliseconds',
  16. emptySinceEpoch: 'An epoch of when the thread became empty, or null if not empty'
  17. }
  18. */
  19. //System Imports
  20. const childProcess = require('child_process');
  21. //Imports
  22. const objects = require('../objects/objects');
  23. const { mapList } = require('./mapManager');
  24. const { registerCallback } = require('./atlas/registerCallback');
  25. const events = require('../misc/events');
  26. //Internals
  27. const threads = [];
  28. const listenersOnZoneIdle = [];
  29. //Helpers
  30. const getThreadFromId = threadId => {
  31. return threads.find(t => t.id === threadId);
  32. };
  33. const gePlayerCountInThread = async thread => {
  34. const { playerCount } = await new Promise(res => {
  35. const cb = registerCallback(res);
  36. thread.worker.send({
  37. method: 'getPlayerCount',
  38. args: {
  39. callbackId: cb
  40. }
  41. });
  42. });
  43. return playerCount;
  44. };
  45. const removePlayerFromThread = (thread, playerId) => {
  46. thread.playersCurrent.spliceWhere(p => p === playerId);
  47. };
  48. const messageHandlers = {
  49. onReady: function (thread) {
  50. thread.worker.send({
  51. method: 'init',
  52. args: {
  53. zoneName: thread.name,
  54. zoneId: thread.id,
  55. path: thread.path
  56. }
  57. });
  58. },
  59. onInitialized: function (thread) {
  60. thread.isReady = true;
  61. thread.cbOnInitialized(thread);
  62. delete thread.cbOnInitialized;
  63. delete thread.promise;
  64. },
  65. event: function (thread, message) {
  66. objects.sendEvent(message, thread);
  67. },
  68. events: function (thread, message) {
  69. objects.sendEvents(message, thread);
  70. },
  71. object: function (thread, message) {
  72. objects.updateObject(message);
  73. },
  74. track: function (thread, message) {
  75. let player = objects.objects.find(o => o.id === message.serverId);
  76. if (!player)
  77. return;
  78. player.auth.gaTracker.track(message.obj);
  79. },
  80. rezone: async function (thread, message) {
  81. const { args: { obj, newZone, keepPos = true } } = message;
  82. removePlayerFromThread(thread, obj.serverId);
  83. //When messages are sent from map threads, they have an id (id of the object in the map thread)
  84. // as well as a serverId (id of the object in the main thread)
  85. const serverId = obj.serverId;
  86. obj.id = serverId;
  87. obj.destroyed = false;
  88. const serverObj = objects.objects.find(o => o.id === obj.id);
  89. const mapExists = mapList.some(m => m.name === newZone);
  90. if (mapExists) {
  91. serverObj.zoneName = newZone;
  92. obj.zoneName = newZone;
  93. } else {
  94. obj.zoneName = clientConfig.config.defaultZone;
  95. serverObj.zoneName = clientConfig.config.defaultZone;
  96. }
  97. delete serverObj.zoneId;
  98. delete obj.zoneId;
  99. const isRezone = true;
  100. await atlas.addObject(obj, keepPos, isRezone);
  101. },
  102. onZoneIdle: function (thread) {
  103. listenersOnZoneIdle.forEach(l => l(thread));
  104. }
  105. };
  106. const onMessage = (thread, message) => {
  107. if (message.module) {
  108. try {
  109. if (message.includeThreadInArgs)
  110. message.threadId = thread.id;
  111. global[message.module][message.method](message);
  112. } catch (e) {
  113. /* eslint-disable-next-line no-console */
  114. console.log('No global method found', message.module, message.method);
  115. process.exit();
  116. }
  117. } else if (message.event === 'onCrashed') {
  118. thread.worker.kill();
  119. process.exit();
  120. } else
  121. messageHandlers[message.method](thread, message);
  122. };
  123. const spawnThread = async ({ map: { name, path, instanced, destroyWhenEmptyForMs }, obj }) => {
  124. destroyWhenEmptyForMs = destroyWhenEmptyForMs ?? consts.destroyThreadWhenEmptyForMs;
  125. let cbOnInitialized;
  126. const promise = new Promise(resolveOnReady => {
  127. cbOnInitialized = resolveOnReady;
  128. });
  129. let id = instanced ? _.getGuid() : name;
  130. const thread = {
  131. id,
  132. name,
  133. instanced,
  134. path,
  135. worker: null,
  136. isReady: false,
  137. promise,
  138. preConfig: {},
  139. cbOnInitialized,
  140. players: [],
  141. playersCurrent: [],
  142. birthEpoch: +new Date(),
  143. destroyWhenEmptyForMs,
  144. emptySinceEpoch: null,
  145. sendArgsToWorker: ['name', 'id', 'preConfig'],
  146. workerArgs: null
  147. };
  148. const emBeforeSpawnThread = {
  149. thread,
  150. spawnForObject: obj
  151. };
  152. events.emit('beforeSpawnThread', emBeforeSpawnThread);
  153. thread.workerArgs = Object.fromEntries(
  154. thread.sendArgsToWorker.map(a => [a, thread[a]])
  155. );
  156. _.log(`Spawning: ${JSON.stringify({ id: thread.id, name: thread.name }, null, '\t')}`);
  157. thread.worker = childProcess.fork('./world/worker', [JSON.stringify(thread.workerArgs)]);
  158. thread.worker.on('message', onMessage.bind(null, thread));
  159. threads.push(thread);
  160. return promise;
  161. };
  162. const getThread = async ({ zoneName, zoneId, obj }) => {
  163. const result = {
  164. resetObjPosition: false,
  165. thread: null
  166. };
  167. let map = mapList.find(m => m.name === zoneName);
  168. if (!map)
  169. map = mapList.find(m => m.name === clientConfig.config.defaultZone);
  170. let thread = threads.find(t => (zoneId === null || t.id === zoneId) && t.name === zoneName);
  171. //Maybe this player has been in a thread for this map before
  172. if (!thread)
  173. thread = threads.find(t => t.name === zoneName && t.players.includes(obj.id));
  174. if (!thread) {
  175. if (map.instanced)
  176. result.resetObjPosition = true;
  177. thread = await spawnThread({
  178. map,
  179. obj
  180. });
  181. }
  182. if (!thread.isReady)
  183. await thread.promise;
  184. result.thread = thread;
  185. return result;
  186. };
  187. const killThread = thread => {
  188. _.log(`Killing: ${thread.workerArgs.id}`);
  189. thread.worker.kill();
  190. threads.spliceWhere(t => t === thread);
  191. };
  192. const sendMessageToThread = ({ threadId, msg }) => {
  193. const thread = threads.find(t => t.id === threadId);
  194. if (thread)
  195. thread.worker.send(msg);
  196. };
  197. const messageAllThreads = message => {
  198. threads.forEach(t => t.worker.send(message));
  199. };
  200. const returnWhenThreadsIdle = async () => {
  201. return new Promise(res => {
  202. let doneCount = 0;
  203. const onZoneIdle = thread => {
  204. doneCount++;
  205. if (doneCount.length < threads.length)
  206. return;
  207. listenersOnZoneIdle.spliceWhere(l => l === onZoneIdle);
  208. res();
  209. };
  210. listenersOnZoneIdle.push(onZoneIdle);
  211. threads.forEach(t => {
  212. t.worker.send({
  213. method: 'notifyOnceIdle'
  214. });
  215. });
  216. });
  217. };
  218. const spawnMapThreads = async () => {
  219. const promises = mapList
  220. .filter(m => m.autoSpawn === true)
  221. .map(m => spawnThread({ map: m }));
  222. await Promise.all(promises);
  223. };
  224. const addPlayerToThread = (thread, playerId) => {
  225. thread.players.push(playerId);
  226. thread.playersCurrent.push(playerId);
  227. if (thread.emptySinceEpoch)
  228. thread.emptySinceEpoch = null;
  229. };
  230. const update = () => {
  231. let tLen = threads.length;
  232. for (let i = 0; i < tLen; i++) {
  233. const t = threads[i];
  234. if (!t.isReady || t.destroyWhenEmptyForMs === -1)
  235. continue;
  236. if (!t.emptySinceEpoch && t.playersCurrent.length === 0)
  237. t.emptySinceEpoch = +new Date();
  238. if (!t.emptySinceEpoch)
  239. continue;
  240. const ageInMs = (+new Date() - t.emptySinceEpoch);
  241. if (ageInMs < t.destroyWhenEmptyForMs)
  242. continue;
  243. killThread(t);
  244. i--;
  245. tLen--;
  246. }
  247. };
  248. const init = async () => {
  249. await spawnMapThreads();
  250. setInterval(update, 5000);
  251. };
  252. //Exports
  253. module.exports = {
  254. init,
  255. getThread,
  256. killThread,
  257. getThreadFromId,
  258. messageAllThreads,
  259. sendMessageToThread,
  260. returnWhenThreadsIdle,
  261. addPlayerToThread,
  262. removePlayerFromThread,
  263. gePlayerCountInThread
  264. };