You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

353 lines
8.0 KiB

  1. /*
  2. This module contains an array of all threads. Each thread looks like this:
  3. {
  4. id: 'The zoneId',
  5. name: 'The name of the map',
  6. instanced: 'Boolean value indicating whether the thread is instanced or not',
  7. path: 'The path to the map file',
  8. worker: 'The actual thread that has been spawned',
  9. isReady: 'Boolean value that turns from false to true as soon as the thread is ready to accept players',
  10. promise: 'Used by the getThread method to wait for the thread to be ready, so it can be sent to the atlas',
  11. cbOnInitialized: 'The promise resolver',
  12. players: 'An array of all player id's that have been in this thread',
  13. playersCurrent: 'An array of player id's that are currently in this thread',
  14. birthEpoch: 'Defines the epoch when this was created',
  15. destroyWhenEmptyForMs: 'If not equal to -1, defines whether the thread should be destroyed if it has been empty for this amount of milliseconds',
  16. emptySinceEpoch: 'An epoch of when the thread became empty, or null if not empty'
  17. }
  18. */
  19. //System Imports
  20. const childProcess = require('child_process');
  21. //Imports
  22. const objects = require('../objects/objects');
  23. const connections = require('../security/connections');
  24. const { mapList } = require('./mapManager');
  25. const { registerCallback } = require('./atlas/registerCallback');
  26. const events = require('../misc/events');
  27. //Internals
  28. const threads = [];
  29. const listenersOnZoneIdle = [];
  30. //Helpers
  31. const getThreadFromName = name => {
  32. return threads.find(t => t.name === name);
  33. };
  34. const getThreadFromId = threadId => {
  35. return threads.find(t => t.id === threadId);
  36. };
  37. const gePlayerCountInThread = async thread => {
  38. const { playerCount } = await new Promise(res => {
  39. const cb = registerCallback(res);
  40. thread.worker.send({
  41. method: 'getPlayerCount',
  42. args: {
  43. callbackId: cb
  44. }
  45. });
  46. });
  47. return playerCount;
  48. };
  49. const removePlayerFromThread = (thread, playerId) => {
  50. thread.playersCurrent.spliceWhere(p => p === playerId);
  51. };
  52. const messageHandlers = {
  53. onReady: function (thread) {
  54. thread.worker.send({
  55. method: 'init',
  56. args: {
  57. zoneName: thread.name,
  58. zoneId: thread.id,
  59. path: thread.path
  60. }
  61. });
  62. },
  63. onInitialized: function (thread) {
  64. thread.isReady = true;
  65. thread.cbOnInitialized(thread);
  66. delete thread.cbOnInitialized;
  67. delete thread.promise;
  68. },
  69. event: function (thread, message) {
  70. objects.sendEvent(message, thread);
  71. },
  72. events: function (thread, message) {
  73. objects.sendEvents(message, thread);
  74. },
  75. object: function (thread, message) {
  76. objects.updateObject(message);
  77. },
  78. track: function (thread, message) {
  79. let player = objects.objects.find(o => o.id === message.serverId);
  80. if (!player)
  81. return;
  82. player.auth.gaTracker.track(message.obj);
  83. },
  84. callDifferentThread: function (thread, message) {
  85. let obj = connections.players.find(p => (p.name === message.playerName));
  86. if (!obj)
  87. return;
  88. let newThread = getThreadFromName(obj.zoneName);
  89. if (!newThread)
  90. return;
  91. newThread.worker.send({
  92. module: message.data.module,
  93. method: message.data.method,
  94. args: message.data.args
  95. });
  96. },
  97. rezone: async function (thread, message) {
  98. const { args: { obj, newZone, keepPos = true } } = message;
  99. removePlayerFromThread(thread, obj.serverId);
  100. //When messages are sent from map threads, they have an id (id of the object in the map thread)
  101. // as well as a serverId (id of the object in the main thread)
  102. const serverId = obj.serverId;
  103. obj.id = serverId;
  104. obj.destroyed = false;
  105. const serverObj = objects.objects.find(o => o.id === obj.id);
  106. const mapExists = mapList.some(m => m.name === newZone);
  107. if (mapExists) {
  108. serverObj.zoneName = newZone;
  109. obj.zoneName = newZone;
  110. } else {
  111. obj.zoneName = clientConfig.config.defaultZone;
  112. serverObj.zoneName = clientConfig.config.defaultZone;
  113. }
  114. delete serverObj.zoneId;
  115. delete obj.zoneId;
  116. const isRezone = true;
  117. await atlas.addObject(obj, keepPos, isRezone);
  118. },
  119. onZoneIdle: function (thread) {
  120. listenersOnZoneIdle.forEach(l => l(thread));
  121. }
  122. };
  123. const onMessage = (thread, message) => {
  124. if (message.module) {
  125. try {
  126. global[message.module][message.method](message);
  127. } catch (e) {
  128. /* eslint-disable-next-line no-console */
  129. console.log('No global method found', message.module, message.method);
  130. process.exit();
  131. }
  132. } else if (message.event === 'onCrashed') {
  133. thread.worker.kill();
  134. process.exit();
  135. } else
  136. messageHandlers[message.method](thread, message);
  137. };
  138. const spawnThread = async ({ map: { name, path, instanced, destroyWhenEmptyForMs }, obj }) => {
  139. destroyWhenEmptyForMs = destroyWhenEmptyForMs ?? consts.destroyThreadWhenEmptyForMs;
  140. let cbOnInitialized;
  141. const promise = new Promise(resolveOnReady => {
  142. cbOnInitialized = resolveOnReady;
  143. });
  144. let id = instanced ? _.getGuid() : name;
  145. const thread = {
  146. id,
  147. name,
  148. instanced,
  149. path,
  150. worker: null,
  151. isReady: false,
  152. promise,
  153. cbOnInitialized,
  154. players: [],
  155. playersCurrent: [],
  156. birthEpoch: +new Date(),
  157. destroyWhenEmptyForMs,
  158. emptySinceEpoch: null,
  159. sendArgsToWorker: ['name', 'id'],
  160. workerArgs: null
  161. };
  162. const emBeforeSpawnThread = {
  163. thread,
  164. spawnForObject: obj
  165. };
  166. events.emit('beforeSpawnThread', emBeforeSpawnThread);
  167. thread.workerArgs = Object.fromEntries(
  168. thread.sendArgsToWorker.map(a => [a, thread[a]])
  169. );
  170. _.log(`Spawning: ${JSON.stringify(thread.workerArgs, null, '\t')}`);
  171. thread.worker = childProcess.fork('./world/worker', [JSON.stringify(thread.workerArgs)]);
  172. thread.worker.on('message', onMessage.bind(null, thread));
  173. threads.push(thread);
  174. return promise;
  175. };
  176. const getThread = async ({ zoneName, zoneId, obj }) => {
  177. const result = {
  178. resetObjPosition: false,
  179. thread: null
  180. };
  181. let map = mapList.find(m => m.name === zoneName);
  182. if (!map)
  183. map = mapList.find(m => m.name === clientConfig.config.defaultZone);
  184. let thread = threads.find(t => t.id === zoneId && t.name === zoneName);
  185. //Maybe this player has been in a thread for this map before
  186. if (!thread)
  187. thread = threads.find(t => t.name === zoneName && t.players.includes(obj.id));
  188. if (!thread) {
  189. if (map.instanced)
  190. result.resetObjPosition = true;
  191. thread = await spawnThread({
  192. map,
  193. obj
  194. });
  195. }
  196. if (!thread.isReady)
  197. await thread.promise;
  198. result.thread = thread;
  199. return result;
  200. };
  201. const killThread = thread => {
  202. _.log(`Killing: ${thread.workerArgs.id}`);
  203. thread.worker.kill();
  204. threads.spliceWhere(t => t === thread);
  205. };
  206. const sendMessageToThread = ({ threadId, msg }) => {
  207. const thread = threads.find(t => t.id === threadId);
  208. if (thread)
  209. thread.worker.send(msg);
  210. };
  211. const messageAllThreads = message => {
  212. threads.forEach(t => t.worker.send(message));
  213. };
  214. const returnWhenThreadsIdle = async () => {
  215. return new Promise(res => {
  216. let doneCount = 0;
  217. const onZoneIdle = thread => {
  218. doneCount++;
  219. if (doneCount.length < threads.length)
  220. return;
  221. listenersOnZoneIdle.spliceWhere(l => l === onZoneIdle);
  222. res();
  223. };
  224. listenersOnZoneIdle.push(onZoneIdle);
  225. threads.forEach(t => {
  226. t.worker.send({
  227. method: 'notifyOnceIdle'
  228. });
  229. });
  230. });
  231. };
  232. const spawnMapThreads = async () => {
  233. const promises = mapList
  234. .filter(m => m.autoSpawn === true)
  235. .map(m => spawnThread({ map: m }));
  236. await Promise.all(promises);
  237. };
  238. const addPlayerToThread = (thread, playerId) => {
  239. thread.players.push(playerId);
  240. thread.playersCurrent.push(playerId);
  241. if (thread.emptySinceEpoch)
  242. thread.emptySinceEpoch = null;
  243. };
  244. const update = () => {
  245. let tLen = threads.length;
  246. for (let i = 0; i < tLen; i++) {
  247. const t = threads[i];
  248. if (!t.isReady || t.destroyWhenEmptyForMs === -1)
  249. continue;
  250. if (!t.emptySinceEpoch && t.playersCurrent.length === 0)
  251. t.emptySinceEpoch = +new Date();
  252. if (!t.emptySinceEpoch)
  253. continue;
  254. const ageInMs = (+new Date() - t.emptySinceEpoch);
  255. if (ageInMs < t.destroyWhenEmptyForMs)
  256. continue;
  257. killThread(t);
  258. i--;
  259. tLen--;
  260. }
  261. };
  262. const init = async () => {
  263. await spawnMapThreads();
  264. setInterval(update, 5000);
  265. };
  266. //Exports
  267. module.exports = {
  268. init,
  269. getThread,
  270. killThread,
  271. getThreadFromId,
  272. messageAllThreads,
  273. sendMessageToThread,
  274. returnWhenThreadsIdle,
  275. addPlayerToThread,
  276. removePlayerFromThread,
  277. gePlayerCountInThread
  278. };