Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.
 
 
 

357 rindas
8.2 KiB

  1. /*
  2. This module contains an array of all threads. Each thread looks like this:
  3. {
  4. id: 'The zoneId',
  5. name: 'The name of the map',
  6. instanced: 'Boolean value indicating whether the thread is instanced or not',
  7. path: 'The path to the map file',
  8. worker: 'The actual thread that has been spawned',
  9. isReady: 'Boolean value that turns from false to true as soon as the thread is ready to accept players',
  10. promise: 'Used by the getThread method to wait for the thread to be ready, so it can be sent to the atlas',
  11. cbOnInitialized: 'The promise resolver',
  12. players: 'An array of all player id's that have been in this thread',
  13. playersCurrent: 'An array of player id's that are currently in this thread',
  14. birthEpoch: 'Defines the epoch when this was created',
  15. destroyWhenEmptyForMs: 'If not equal to -1, defines whether the thread should be destroyed if it has been empty for this amount of milliseconds',
  16. emptySinceEpoch: 'An epoch of when the thread became empty, or null if not empty'
  17. }
  18. */
  19. //System Imports
  20. const childProcess = require('child_process');
  21. //Imports
  22. const objects = require('../objects/objects');
  23. const connections = require('../security/connections');
  24. const { mapList } = require('./mapManager');
  25. const { registerCallback } = require('./atlas/registerCallback');
  26. const events = require('../misc/events');
  27. //Internals
  28. const threads = [];
  29. const listenersOnZoneIdle = [];
  30. //Helpers
  31. const getThreadFromName = name => {
  32. return threads.find(t => t.name === name);
  33. };
  34. const getThreadFromId = threadId => {
  35. return threads.find(t => t.id === threadId);
  36. };
  37. const gePlayerCountInThread = async thread => {
  38. const { playerCount } = await new Promise(res => {
  39. const cb = registerCallback(res);
  40. thread.worker.send({
  41. method: 'getPlayerCount',
  42. args: {
  43. callbackId: cb
  44. }
  45. });
  46. });
  47. return playerCount;
  48. };
  49. const removePlayerFromThread = (thread, playerId) => {
  50. thread.playersCurrent.spliceWhere(p => p === playerId);
  51. };
  52. const messageHandlers = {
  53. onReady: function (thread) {
  54. thread.worker.send({
  55. method: 'init',
  56. args: {
  57. zoneName: thread.name,
  58. zoneId: thread.id,
  59. path: thread.path
  60. }
  61. });
  62. },
  63. onInitialized: function (thread) {
  64. thread.isReady = true;
  65. thread.cbOnInitialized(thread);
  66. delete thread.cbOnInitialized;
  67. delete thread.promise;
  68. },
  69. event: function (thread, message) {
  70. objects.sendEvent(message, thread);
  71. },
  72. events: function (thread, message) {
  73. objects.sendEvents(message, thread);
  74. },
  75. object: function (thread, message) {
  76. objects.updateObject(message);
  77. },
  78. track: function (thread, message) {
  79. let player = objects.objects.find(o => o.id === message.serverId);
  80. if (!player)
  81. return;
  82. player.auth.gaTracker.track(message.obj);
  83. },
  84. callDifferentThread: function (thread, message) {
  85. let obj = connections.players.find(p => (p.name === message.playerName));
  86. if (!obj)
  87. return;
  88. let newThread = getThreadFromName(obj.zoneName);
  89. if (!newThread)
  90. return;
  91. newThread.worker.send({
  92. module: message.data.module,
  93. method: message.data.method,
  94. args: message.data.args
  95. });
  96. },
  97. rezone: async function (thread, message) {
  98. const { args: { obj, newZone, keepPos = true } } = message;
  99. removePlayerFromThread(thread, obj.serverId);
  100. //When messages are sent from map threads, they have an id (id of the object in the map thread)
  101. // as well as a serverId (id of the object in the main thread)
  102. const serverId = obj.serverId;
  103. obj.id = serverId;
  104. obj.destroyed = false;
  105. const serverObj = objects.objects.find(o => o.id === obj.id);
  106. const mapExists = mapList.some(m => m.name === newZone);
  107. if (mapExists) {
  108. serverObj.zoneName = newZone;
  109. obj.zoneName = newZone;
  110. } else {
  111. obj.zoneName = clientConfig.config.defaultZone;
  112. serverObj.zoneName = clientConfig.config.defaultZone;
  113. }
  114. delete serverObj.zoneId;
  115. delete obj.zoneId;
  116. const isRezone = true;
  117. await atlas.addObject(obj, keepPos, isRezone);
  118. },
  119. onZoneIdle: function (thread) {
  120. listenersOnZoneIdle.forEach(l => l(thread));
  121. }
  122. };
  123. const onMessage = (thread, message) => {
  124. if (message.module) {
  125. try {
  126. if (message.includeThreadInArgs)
  127. message.threadId = thread.id;
  128. global[message.module][message.method](message);
  129. } catch (e) {
  130. /* eslint-disable-next-line no-console */
  131. console.log('No global method found', message.module, message.method);
  132. process.exit();
  133. }
  134. } else if (message.event === 'onCrashed') {
  135. thread.worker.kill();
  136. process.exit();
  137. } else
  138. messageHandlers[message.method](thread, message);
  139. };
  140. const spawnThread = async ({ map: { name, path, instanced, destroyWhenEmptyForMs }, obj }) => {
  141. destroyWhenEmptyForMs = destroyWhenEmptyForMs ?? consts.destroyThreadWhenEmptyForMs;
  142. let cbOnInitialized;
  143. const promise = new Promise(resolveOnReady => {
  144. cbOnInitialized = resolveOnReady;
  145. });
  146. let id = instanced ? _.getGuid() : name;
  147. const thread = {
  148. id,
  149. name,
  150. instanced,
  151. path,
  152. worker: null,
  153. isReady: false,
  154. promise,
  155. preConfig: {},
  156. cbOnInitialized,
  157. players: [],
  158. playersCurrent: [],
  159. birthEpoch: +new Date(),
  160. destroyWhenEmptyForMs,
  161. emptySinceEpoch: null,
  162. sendArgsToWorker: ['name', 'id', 'preConfig'],
  163. workerArgs: null
  164. };
  165. const emBeforeSpawnThread = {
  166. thread,
  167. spawnForObject: obj
  168. };
  169. events.emit('beforeSpawnThread', emBeforeSpawnThread);
  170. thread.workerArgs = Object.fromEntries(
  171. thread.sendArgsToWorker.map(a => [a, thread[a]])
  172. );
  173. _.log(`Spawning: ${JSON.stringify({ id: thread.id, name: thread.name }, null, '\t')}`);
  174. thread.worker = childProcess.fork('./world/worker', [JSON.stringify(thread.workerArgs)]);
  175. thread.worker.on('message', onMessage.bind(null, thread));
  176. threads.push(thread);
  177. return promise;
  178. };
  179. const getThread = async ({ zoneName, zoneId, obj }) => {
  180. const result = {
  181. resetObjPosition: false,
  182. thread: null
  183. };
  184. let map = mapList.find(m => m.name === zoneName);
  185. if (!map)
  186. map = mapList.find(m => m.name === clientConfig.config.defaultZone);
  187. let thread = threads.find(t => t.id === zoneId && t.name === zoneName);
  188. //Maybe this player has been in a thread for this map before
  189. if (!thread)
  190. thread = threads.find(t => t.name === zoneName && t.players.includes(obj.id));
  191. if (!thread) {
  192. if (map.instanced)
  193. result.resetObjPosition = true;
  194. thread = await spawnThread({
  195. map,
  196. obj
  197. });
  198. }
  199. if (!thread.isReady)
  200. await thread.promise;
  201. result.thread = thread;
  202. return result;
  203. };
  204. const killThread = thread => {
  205. _.log(`Killing: ${thread.workerArgs.id}`);
  206. thread.worker.kill();
  207. threads.spliceWhere(t => t === thread);
  208. };
  209. const sendMessageToThread = ({ threadId, msg }) => {
  210. const thread = threads.find(t => t.id === threadId);
  211. if (thread)
  212. thread.worker.send(msg);
  213. };
  214. const messageAllThreads = message => {
  215. threads.forEach(t => t.worker.send(message));
  216. };
  217. const returnWhenThreadsIdle = async () => {
  218. return new Promise(res => {
  219. let doneCount = 0;
  220. const onZoneIdle = thread => {
  221. doneCount++;
  222. if (doneCount.length < threads.length)
  223. return;
  224. listenersOnZoneIdle.spliceWhere(l => l === onZoneIdle);
  225. res();
  226. };
  227. listenersOnZoneIdle.push(onZoneIdle);
  228. threads.forEach(t => {
  229. t.worker.send({
  230. method: 'notifyOnceIdle'
  231. });
  232. });
  233. });
  234. };
  235. const spawnMapThreads = async () => {
  236. const promises = mapList
  237. .filter(m => m.autoSpawn === true)
  238. .map(m => spawnThread({ map: m }));
  239. await Promise.all(promises);
  240. };
  241. const addPlayerToThread = (thread, playerId) => {
  242. thread.players.push(playerId);
  243. thread.playersCurrent.push(playerId);
  244. if (thread.emptySinceEpoch)
  245. thread.emptySinceEpoch = null;
  246. };
  247. const update = () => {
  248. let tLen = threads.length;
  249. for (let i = 0; i < tLen; i++) {
  250. const t = threads[i];
  251. if (!t.isReady || t.destroyWhenEmptyForMs === -1)
  252. continue;
  253. if (!t.emptySinceEpoch && t.playersCurrent.length === 0)
  254. t.emptySinceEpoch = +new Date();
  255. if (!t.emptySinceEpoch)
  256. continue;
  257. const ageInMs = (+new Date() - t.emptySinceEpoch);
  258. if (ageInMs < t.destroyWhenEmptyForMs)
  259. continue;
  260. killThread(t);
  261. i--;
  262. tLen--;
  263. }
  264. };
  265. const init = async () => {
  266. await spawnMapThreads();
  267. setInterval(update, 5000);
  268. };
  269. //Exports
  270. module.exports = {
  271. init,
  272. getThread,
  273. killThread,
  274. getThreadFromId,
  275. messageAllThreads,
  276. sendMessageToThread,
  277. returnWhenThreadsIdle,
  278. addPlayerToThread,
  279. removePlayerFromThread,
  280. gePlayerCountInThread
  281. };