You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

306 lines
6.2 KiB

  1. //System Imports
  2. const childProcess = require('child_process');
  3. //Imports
  4. const objects = require('../objects/objects');
  5. const { mapList } = require('./mapManager');
  6. const { registerCallback } = require('./atlas/registerCallback');
  7. //Internals
  8. const threads = [];
  9. const listenersOnZoneIdle = [];
  10. //Helpers
  11. const getThreadFromName = name => {
  12. return threads.find(t => t.name === name);
  13. };
  14. const getThreadFromId = threadId => {
  15. return threads.find(t => t.id === threadId);
  16. };
  17. const getPlayerCountInThread = async thread => {
  18. const { playerCount } = await new Promise(res => {
  19. const cb = registerCallback(res);
  20. thread.worker.send({
  21. method: 'getPlayerCount',
  22. args: {
  23. callbackId: cb
  24. }
  25. });
  26. });
  27. return playerCount;
  28. };
  29. const messageHandlers = {
  30. onReady: function (thread) {
  31. thread.worker.send({
  32. method: 'init',
  33. args: {
  34. zoneName: thread.name,
  35. zoneId: thread.id,
  36. path: thread.path
  37. }
  38. });
  39. },
  40. onInitialized: function (thread) {
  41. thread.isReady = true;
  42. thread.cbOnInitialized(thread);
  43. delete thread.cbOnInitialized;
  44. delete thread.promise;
  45. },
  46. event: function (thread, message) {
  47. objects.sendEvent(message, thread);
  48. },
  49. events: function (thread, message) {
  50. objects.sendEvents(message, thread);
  51. },
  52. object: function (thread, message) {
  53. objects.updateObject(message);
  54. },
  55. track: function (thread, message) {
  56. let player = objects.objects.find(o => o.id === message.serverId);
  57. if (!player)
  58. return;
  59. player.auth.gaTracker.track(message.obj);
  60. },
  61. callDifferentThread: function (thread, message) {
  62. let obj = cons.players.find(p => (p.name === message.playerName));
  63. if (!obj)
  64. return;
  65. let newThread = getThreadFromName(obj.zoneName);
  66. if (!newThread)
  67. return;
  68. newThread.worker.send({
  69. module: message.data.module,
  70. method: message.data.method,
  71. args: message.data.args
  72. });
  73. },
  74. rezone: async function (thread, message) {
  75. const { args: { obj, newZone, keepPos = true } } = message;
  76. if (thread.instanced && (await getPlayerCountInThread(thread)) === 0) {
  77. thread.worker.kill();
  78. threads.spliceWhere(t => t === thread);
  79. }
  80. //When messages are sent from map threads, they have an id (id of the object in the map thread)
  81. // as well as a serverId (id of the object in the main thread)
  82. const serverId = obj.serverId;
  83. obj.id = serverId;
  84. obj.destroyed = false;
  85. const serverObj = objects.objects.find(o => o.id === obj.id);
  86. const mapExists = mapList.some(m => m.name === newZone);
  87. if (mapExists) {
  88. serverObj.zoneName = newZone;
  89. obj.zoneName = newZone;
  90. } else {
  91. obj.zoneName = clientConfig.config.defaultZone;
  92. serverObj.zoneName = clientConfig.config.defaultZone;
  93. }
  94. delete serverObj.zoneId;
  95. delete obj.zoneId;
  96. const isRezone = true;
  97. await atlas.addObject(obj, keepPos, isRezone);
  98. },
  99. onZoneIdle: function (thread) {
  100. listenersOnZoneIdle.forEach(l => l(thread));
  101. }
  102. };
  103. const onMessage = (thread, message) => {
  104. if (message.module) {
  105. try {
  106. global[message.module][message.method](message);
  107. } catch (e) {
  108. /* eslint-disable-next-line no-console */
  109. console.log('No global method found', message.module, message.method);
  110. process.exit();
  111. }
  112. } else if (message.event === 'onCrashed') {
  113. thread.worker.kill();
  114. process.exit();
  115. } else
  116. messageHandlers[message.method](thread, message);
  117. };
  118. const spawnThread = async ({ name, path, instanced }) => {
  119. let cbOnInitialized;
  120. const promise = new Promise(resolveOnReady => {
  121. cbOnInitialized = resolveOnReady;
  122. });
  123. const worker = childProcess.fork('./world/worker', [name]);
  124. const id = instanced ? _.getGuid() : name;
  125. const thread = {
  126. id,
  127. name,
  128. instanced,
  129. path,
  130. worker,
  131. isReady: false,
  132. promise,
  133. cbOnInitialized
  134. };
  135. worker.on('message', onMessage.bind(null, thread));
  136. threads.push(thread);
  137. return promise;
  138. };
  139. const doesThreadExist = ({ zoneName, zoneId }) => {
  140. let map = mapList.find(m => m.name === zoneName);
  141. if (!map)
  142. map = mapList.find(m => m.name === clientConfig.config.defaultZone);
  143. const exists = threads.some(t => t.id === zoneId && t.name === zoneName);
  144. if (exists)
  145. return true;
  146. if (map.instanced)
  147. return false;
  148. const thread = getThreadFromName(map.name);
  149. return !!thread;
  150. };
  151. const getThread = async ({ zoneName, zoneId }) => {
  152. const result = {
  153. resetObjPosition: false,
  154. thread: null
  155. };
  156. let map = mapList.find(m => m.name === zoneName);
  157. if (!map)
  158. map = mapList.find(m => m.name === clientConfig.config.defaultZone);
  159. let thread = threads.find(t => t.id === zoneId && t.name === zoneName);
  160. if (!thread) {
  161. if (map.instanced) {
  162. result.resetObjPosition = true;
  163. thread = await spawnThread(map);
  164. } else
  165. thread = getThreadFromName(map.name);
  166. }
  167. if (!thread) {
  168. io.logError({
  169. sourceModule: 'threadManager',
  170. sourceMethod: 'getThread',
  171. error: 'No thread found',
  172. info: {
  173. requestedZoneName: zoneName,
  174. requestedZoneId: zoneId,
  175. useMapName: map.name
  176. }
  177. });
  178. process.exit();
  179. }
  180. if (!thread.isReady)
  181. await thread.promise;
  182. result.thread = thread;
  183. return result;
  184. };
  185. const killThread = thread => {
  186. thread.worker.kill();
  187. threads.spliceWhere(t => t === thread);
  188. };
  189. const killThreadIfEmpty = async thread => {
  190. const playerCount = await getPlayerCountInThread(thread);
  191. if (playerCount === 0)
  192. killThread(thread);
  193. };
  194. const spawnMapThreads = async () => {
  195. const promises = mapList
  196. .filter(m => !m.disabled && !m.instanced)
  197. .map(m => spawnThread(m));
  198. await Promise.all(promises);
  199. };
  200. const sendMessageToThread = ({ threadId, msg }) => {
  201. const thread = threads.find(t => t.id === threadId);
  202. if (thread)
  203. thread.worker.send(msg);
  204. };
  205. const messageAllThreads = message => {
  206. threads.forEach(t => t.worker.send(message));
  207. };
  208. const returnWhenThreadsIdle = async () => {
  209. return new Promise(res => {
  210. let doneCount = 0;
  211. const onZoneIdle = thread => {
  212. doneCount++;
  213. if (doneCount.length < threads.length)
  214. return;
  215. listenersOnZoneIdle.spliceWhere(l => l === onZoneIdle);
  216. res();
  217. };
  218. listenersOnZoneIdle.push(onZoneIdle);
  219. threads.forEach(t => {
  220. t.worker.send({
  221. method: 'notifyOnceIdle'
  222. });
  223. });
  224. });
  225. };
  226. //Exports
  227. module.exports = {
  228. getThread,
  229. killThread,
  230. getThreadFromId,
  231. doesThreadExist,
  232. spawnMapThreads,
  233. messageAllThreads,
  234. killThreadIfEmpty,
  235. sendMessageToThread,
  236. returnWhenThreadsIdle,
  237. getPlayerCountInThread
  238. };