You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

279 lines
5.8 KiB

  1. //System Imports
  2. const childProcess = require('child_process');
  3. //Imports
  4. const objects = require('../objects/objects');
  5. const connections = require('../security/connections');
  6. const { mapList } = require('./mapManager');
  7. const { registerCallback } = require('./atlas/registerCallback');
  8. //Internals
  9. const threads = [];
  10. const listenersOnZoneIdle = [];
  11. //Helpers
  12. const getThreadFromName = name => {
  13. return threads.find(t => t.name === name);
  14. };
  15. const getThreadFromId = threadId => {
  16. return threads.find(t => t.id === threadId);
  17. };
  18. const getPlayerCountInThread = async thread => {
  19. const { playerCount } = await new Promise(res => {
  20. const cb = registerCallback(res);
  21. thread.worker.send({
  22. method: 'getPlayerCount',
  23. args: {
  24. callbackId: cb
  25. }
  26. });
  27. });
  28. return playerCount;
  29. };
  30. const messageHandlers = {
  31. onReady: function (thread) {
  32. thread.worker.send({
  33. method: 'init',
  34. args: {
  35. zoneName: thread.name,
  36. zoneId: thread.id,
  37. path: thread.path
  38. }
  39. });
  40. },
  41. onInitialized: function (thread) {
  42. thread.isReady = true;
  43. thread.cbOnInitialized(thread);
  44. delete thread.cbOnInitialized;
  45. delete thread.promise;
  46. },
  47. event: function (thread, message) {
  48. objects.sendEvent(message, thread);
  49. },
  50. events: function (thread, message) {
  51. objects.sendEvents(message, thread);
  52. },
  53. object: function (thread, message) {
  54. objects.updateObject(message);
  55. },
  56. track: function (thread, message) {
  57. let player = objects.objects.find(o => o.id === message.serverId);
  58. if (!player)
  59. return;
  60. player.auth.gaTracker.track(message.obj);
  61. },
  62. callDifferentThread: function (thread, message) {
  63. let obj = connections.players.find(p => (p.name === message.playerName));
  64. if (!obj)
  65. return;
  66. let newThread = getThreadFromName(obj.zoneName);
  67. if (!newThread)
  68. return;
  69. newThread.worker.send({
  70. module: message.data.module,
  71. method: message.data.method,
  72. args: message.data.args
  73. });
  74. },
  75. rezone: async function (thread, message) {
  76. const { args: { obj, newZone, keepPos = true } } = message;
  77. if (thread.instanced && (await getPlayerCountInThread(thread)) === 0) {
  78. thread.worker.kill();
  79. threads.spliceWhere(t => t === thread);
  80. }
  81. //When messages are sent from map threads, they have an id (id of the object in the map thread)
  82. // as well as a serverId (id of the object in the main thread)
  83. const serverId = obj.serverId;
  84. obj.id = serverId;
  85. obj.destroyed = false;
  86. const serverObj = objects.objects.find(o => o.id === obj.id);
  87. const mapExists = mapList.some(m => m.name === newZone);
  88. if (mapExists) {
  89. serverObj.zoneName = newZone;
  90. obj.zoneName = newZone;
  91. } else {
  92. obj.zoneName = clientConfig.config.defaultZone;
  93. serverObj.zoneName = clientConfig.config.defaultZone;
  94. }
  95. delete serverObj.zoneId;
  96. delete obj.zoneId;
  97. const isRezone = true;
  98. await atlas.addObject(obj, keepPos, isRezone);
  99. },
  100. onZoneIdle: function (thread) {
  101. listenersOnZoneIdle.forEach(l => l(thread));
  102. }
  103. };
  104. const onMessage = (thread, message) => {
  105. if (message.module) {
  106. try {
  107. global[message.module][message.method](message);
  108. } catch (e) {
  109. /* eslint-disable-next-line no-console */
  110. console.log('No global method found', message.module, message.method);
  111. process.exit();
  112. }
  113. } else if (message.event === 'onCrashed') {
  114. thread.worker.kill();
  115. process.exit();
  116. } else
  117. messageHandlers[message.method](thread, message);
  118. };
  119. const spawnThread = async ({ name, path, instanced }) => {
  120. let cbOnInitialized;
  121. const promise = new Promise(resolveOnReady => {
  122. cbOnInitialized = resolveOnReady;
  123. });
  124. const worker = childProcess.fork('./world/worker', [name]);
  125. const id = instanced ? _.getGuid() : name;
  126. const thread = {
  127. id,
  128. name,
  129. instanced,
  130. path,
  131. worker,
  132. isReady: false,
  133. promise,
  134. cbOnInitialized
  135. };
  136. worker.on('message', onMessage.bind(null, thread));
  137. threads.push(thread);
  138. return promise;
  139. };
  140. const doesThreadExist = ({ zoneName, zoneId }) => {
  141. const exists = threads.some(t => t.id === zoneId && t.name === zoneName);
  142. return exists;
  143. };
  144. const getThread = async ({ zoneName, zoneId }) => {
  145. const result = {
  146. resetObjPosition: false,
  147. thread: null
  148. };
  149. let map = mapList.find(m => m.name === zoneName);
  150. if (!map)
  151. map = mapList.find(m => m.name === clientConfig.config.defaultZone);
  152. let thread = threads.find(t => t.id === zoneId && t.name === zoneName);
  153. if (!thread) {
  154. if (map.instanced) {
  155. result.resetObjPosition = true;
  156. thread = await spawnThread(map);
  157. } else
  158. thread = getThreadFromName(map.name);
  159. }
  160. if (!thread.isReady)
  161. await thread.promise;
  162. result.thread = thread;
  163. return result;
  164. };
  165. const killThread = thread => {
  166. thread.worker.kill();
  167. threads.spliceWhere(t => t === thread);
  168. };
  169. const killThreadIfEmpty = async thread => {
  170. const playerCount = await getPlayerCountInThread(thread);
  171. if (playerCount === 0)
  172. killThread(thread);
  173. };
  174. const spawnMapThreads = async () => {
  175. const promises = mapList
  176. .filter(m => !m.disabled && !m.instanced)
  177. .map(m => spawnThread(m));
  178. await Promise.all(promises);
  179. };
  180. const sendMessageToThread = ({ threadId, msg }) => {
  181. const thread = threads.find(t => t.id === threadId);
  182. if (thread)
  183. thread.worker.send(msg);
  184. };
  185. const messageAllThreads = message => {
  186. threads.forEach(t => t.worker.send(message));
  187. };
  188. const returnWhenThreadsIdle = async () => {
  189. return new Promise(res => {
  190. let doneCount = 0;
  191. const onZoneIdle = thread => {
  192. doneCount++;
  193. if (doneCount.length < threads.length)
  194. return;
  195. listenersOnZoneIdle.spliceWhere(l => l === onZoneIdle);
  196. res();
  197. };
  198. listenersOnZoneIdle.push(onZoneIdle);
  199. threads.forEach(t => {
  200. t.worker.send({
  201. method: 'notifyOnceIdle'
  202. });
  203. });
  204. });
  205. };
  206. //Exports
  207. module.exports = {
  208. getThread,
  209. killThread,
  210. getThreadFromId,
  211. doesThreadExist,
  212. spawnMapThreads,
  213. messageAllThreads,
  214. killThreadIfEmpty,
  215. sendMessageToThread,
  216. returnWhenThreadsIdle,
  217. getPlayerCountInThread
  218. };