You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

349 lines
7.8 KiB

  1. /*
  2. This module contains an array of all threads. Each thread looks like this:
  3. {
  4. id: 'The zoneId',
  5. name: 'The name of the map',
  6. instanced: 'Boolean value indicating whether the thread is instanced or not',
  7. path: 'The path to the map file',
  8. worker: 'The actual thread that has been spawned',
  9. isReady: 'Boolean value that turns from false to true as soon as the thread is ready to accept players',
  10. promise: 'Used by the getThread method to wait for the thread to be ready, so it can be sent to the atlas',
  11. cbOnInitialized: 'The promise resolver',
  12. players: 'An array of all player id's that have been in this thread',
  13. playersCurrent: 'An array of player id's that are currently in this thread',
  14. birthEpoch: 'Defines the epoch when this was created',
  15. destroyWhenEmptyForMs: 'If not equal to -1, defines whether the thread should be destroyed if it has been empty for this amount of milliseconds',
  16. emptySinceEpoch: 'An epoch of when the thread became empty, or null if not empty'
  17. }
  18. */
  19. //System Imports
  20. const childProcess = require('child_process');
  21. //Imports
  22. const objects = require('../objects/objects');
  23. const connections = require('../security/connections');
  24. const { mapList } = require('./mapManager');
  25. const { registerCallback } = require('./atlas/registerCallback');
  26. const events = require('../misc/events');
  27. //Internals
  28. const threads = [];
  29. const listenersOnZoneIdle = [];
  30. //Helpers
  31. const getThreadFromName = name => {
  32. return threads.find(t => t.name === name);
  33. };
  34. const getThreadFromId = threadId => {
  35. return threads.find(t => t.id === threadId);
  36. };
  37. const gePlayerCountInThread = async thread => {
  38. const { playerCount } = await new Promise(res => {
  39. const cb = registerCallback(res);
  40. thread.worker.send({
  41. method: 'getPlayerCount',
  42. args: {
  43. callbackId: cb
  44. }
  45. });
  46. });
  47. return playerCount;
  48. };
  49. const removePlayerFromThread = (thread, playerId) => {
  50. thread.playersCurrent.spliceWhere(p => p === playerId);
  51. };
  52. const messageHandlers = {
  53. onReady: function (thread) {
  54. thread.worker.send({
  55. method: 'init',
  56. args: {
  57. zoneName: thread.name,
  58. zoneId: thread.id,
  59. path: thread.path
  60. }
  61. });
  62. },
  63. onInitialized: function (thread) {
  64. thread.isReady = true;
  65. thread.cbOnInitialized(thread);
  66. delete thread.cbOnInitialized;
  67. delete thread.promise;
  68. },
  69. event: function (thread, message) {
  70. objects.sendEvent(message, thread);
  71. },
  72. events: function (thread, message) {
  73. objects.sendEvents(message, thread);
  74. },
  75. object: function (thread, message) {
  76. objects.updateObject(message);
  77. },
  78. track: function (thread, message) {
  79. let player = objects.objects.find(o => o.id === message.serverId);
  80. if (!player)
  81. return;
  82. player.auth.gaTracker.track(message.obj);
  83. },
  84. callDifferentThread: function (thread, message) {
  85. let obj = connections.players.find(p => (p.name === message.playerName));
  86. if (!obj)
  87. return;
  88. let newThread = getThreadFromName(obj.zoneName);
  89. if (!newThread)
  90. return;
  91. newThread.worker.send({
  92. module: message.data.module,
  93. method: message.data.method,
  94. args: message.data.args
  95. });
  96. },
  97. rezone: async function (thread, message) {
  98. const { args: { obj, newZone, keepPos = true } } = message;
  99. removePlayerFromThread(thread, obj.serverId);
  100. //When messages are sent from map threads, they have an id (id of the object in the map thread)
  101. // as well as a serverId (id of the object in the main thread)
  102. const serverId = obj.serverId;
  103. obj.id = serverId;
  104. obj.destroyed = false;
  105. const serverObj = objects.objects.find(o => o.id === obj.id);
  106. const mapExists = mapList.some(m => m.name === newZone);
  107. if (mapExists) {
  108. serverObj.zoneName = newZone;
  109. obj.zoneName = newZone;
  110. } else {
  111. obj.zoneName = clientConfig.config.defaultZone;
  112. serverObj.zoneName = clientConfig.config.defaultZone;
  113. }
  114. delete serverObj.zoneId;
  115. delete obj.zoneId;
  116. const isRezone = true;
  117. await atlas.addObject(obj, keepPos, isRezone);
  118. },
  119. onZoneIdle: function (thread) {
  120. listenersOnZoneIdle.forEach(l => l(thread));
  121. }
  122. };
  123. const onMessage = (thread, message) => {
  124. if (message.module) {
  125. try {
  126. global[message.module][message.method](message);
  127. } catch (e) {
  128. /* eslint-disable-next-line no-console */
  129. console.log('No global method found', message.module, message.method);
  130. process.exit();
  131. }
  132. } else if (message.event === 'onCrashed') {
  133. thread.worker.kill();
  134. process.exit();
  135. } else
  136. messageHandlers[message.method](thread, message);
  137. };
  138. const spawnThread = async ({ map: { name, path, instanced, destroyWhenEmptyForMs = -1 }, obj }) => {
  139. let cbOnInitialized;
  140. const promise = new Promise(resolveOnReady => {
  141. cbOnInitialized = resolveOnReady;
  142. });
  143. let id = instanced ? _.getGuid() : name;
  144. const thread = {
  145. id,
  146. name,
  147. instanced,
  148. path,
  149. worker: null,
  150. isReady: false,
  151. promise,
  152. cbOnInitialized,
  153. players: [],
  154. playersCurrent: [],
  155. birthEpoch: +new Date(),
  156. destroyWhenEmptyForMs,
  157. emptySinceEpoch: null,
  158. sendArgsToWorker: ['name', 'id']
  159. };
  160. const emBeforeSpawnThread = {
  161. thread,
  162. spawnForObject: obj
  163. };
  164. events.emit('beforeSpawnThread', emBeforeSpawnThread);
  165. const workerArgs = JSON.stringify(
  166. Object.fromEntries(
  167. thread.sendArgsToWorker.map(a => [a, thread[a]])
  168. )
  169. );
  170. thread.worker = childProcess.fork('./world/worker', [workerArgs]);
  171. thread.worker.on('message', onMessage.bind(null, thread));
  172. threads.push(thread);
  173. return promise;
  174. };
  175. const getThread = async ({ zoneName, zoneId, obj }) => {
  176. const result = {
  177. resetObjPosition: false,
  178. thread: null
  179. };
  180. let map = mapList.find(m => m.name === zoneName);
  181. if (!map)
  182. map = mapList.find(m => m.name === clientConfig.config.defaultZone);
  183. let thread = threads.find(t => t.id === zoneId && t.name === zoneName);
  184. //Maybe this player has been in a thread for this map before
  185. if (!thread)
  186. thread = threads.find(t => t.name === zoneName && t.players.includes(obj.id));
  187. if (!thread) {
  188. if (map.instanced)
  189. result.resetObjPosition = true;
  190. thread = await spawnThread({
  191. map,
  192. obj
  193. });
  194. }
  195. if (!thread.isReady)
  196. await thread.promise;
  197. result.thread = thread;
  198. return result;
  199. };
  200. const killThread = thread => {
  201. thread.worker.kill();
  202. threads.spliceWhere(t => t === thread);
  203. };
  204. const sendMessageToThread = ({ threadId, msg }) => {
  205. const thread = threads.find(t => t.id === threadId);
  206. if (thread)
  207. thread.worker.send(msg);
  208. };
  209. const messageAllThreads = message => {
  210. threads.forEach(t => t.worker.send(message));
  211. };
  212. const returnWhenThreadsIdle = async () => {
  213. return new Promise(res => {
  214. let doneCount = 0;
  215. const onZoneIdle = thread => {
  216. doneCount++;
  217. if (doneCount.length < threads.length)
  218. return;
  219. listenersOnZoneIdle.spliceWhere(l => l === onZoneIdle);
  220. res();
  221. };
  222. listenersOnZoneIdle.push(onZoneIdle);
  223. threads.forEach(t => {
  224. t.worker.send({
  225. method: 'notifyOnceIdle'
  226. });
  227. });
  228. });
  229. };
  230. const spawnMapThreads = async () => {
  231. const promises = mapList
  232. .filter(m => !m.disabled && !m.instanced && m.autoSpawn !== false)
  233. .map(m => spawnThread(m));
  234. await Promise.all(promises);
  235. };
  236. const addPlayerToThread = (thread, playerId) => {
  237. thread.players.push(playerId);
  238. thread.playersCurrent.push(playerId);
  239. if (thread.emptySinceEpoch)
  240. thread.emptySinceEpoch = null;
  241. };
  242. const update = () => {
  243. let tLen = threads.length;
  244. for (let i = 0; i < tLen; i++) {
  245. const t = threads[i];
  246. if (!t.isReady || t.destroyWhenEmptyForMs === -1)
  247. continue;
  248. if (!t.emptySinceEpoch && t.playersCurrent.length === 0)
  249. t.emptySinceEpoch = +new Date();
  250. if (!t.emptySinceEpoch)
  251. continue;
  252. const ageInMs = (+new Date() - t.emptySinceEpoch);
  253. if (ageInMs < t.destroyWhenEmptyForMs)
  254. continue;
  255. killThread(t);
  256. i--;
  257. tLen--;
  258. }
  259. };
  260. const init = async () => {
  261. await spawnMapThreads();
  262. setInterval(update, 5000);
  263. };
  264. //Exports
  265. module.exports = {
  266. init,
  267. getThread,
  268. killThread,
  269. getThreadFromId,
  270. messageAllThreads,
  271. sendMessageToThread,
  272. returnWhenThreadsIdle,
  273. addPlayerToThread,
  274. removePlayerFromThread,
  275. gePlayerCountInThread
  276. };