|
- /*
- This module contains an array of all threads. Each thread looks like this:
-
- {
- id: 'The zoneId',
- name: 'The name of the map',
- instanced: 'Boolean value indicating whether the thread is instanced or not',
- path: 'The path to the map file',
- worker: 'The actual thread that has been spawned',
- isReady: 'Boolean value that turns from false to true as soon as the thread is ready to accept players',
- promise: 'Used by the getThread method to wait for the thread to be ready, so it can be sent to the atlas',
- cbOnInitialized: 'The promise resolver',
- players: 'An array of all player id's that have been in this thread',
- playersCurrent: 'An array of player id's that are currently in this thread',
- birthEpoch: 'Defines the epoch when this was created',
- destroyWhenEmptyForMs: 'If not equal to -1, defines whether the thread should be destroyed if it has been empty for this amount of milliseconds',
- emptySinceEpoch: 'An epoch of when the thread became empty, or null if not empty'
- }
- */
-
- //System Imports
- const childProcess = require('child_process');
-
- //Imports
- const objects = require('../objects/objects');
- const connections = require('../security/connections');
- const { mapList } = require('./mapManager');
- const { registerCallback } = require('./atlas/registerCallback');
- const events = require('../misc/events');
-
- //Internals
- const threads = [];
- const listenersOnZoneIdle = [];
-
- //Helpers
- const getThreadFromName = name => {
- return threads.find(t => t.name === name);
- };
-
- const getThreadFromId = threadId => {
- return threads.find(t => t.id === threadId);
- };
-
- const gePlayerCountInThread = async thread => {
- const { playerCount } = await new Promise(res => {
- const cb = registerCallback(res);
-
- thread.worker.send({
- method: 'getPlayerCount',
- args: {
- callbackId: cb
- }
- });
- });
-
- return playerCount;
- };
-
- const removePlayerFromThread = (thread, playerId) => {
- thread.playersCurrent.spliceWhere(p => p === playerId);
- };
-
- const messageHandlers = {
- onReady: function (thread) {
- thread.worker.send({
- method: 'init',
- args: {
- zoneName: thread.name,
- zoneId: thread.id,
- path: thread.path
- }
- });
- },
-
- onInitialized: function (thread) {
- thread.isReady = true;
-
- thread.cbOnInitialized(thread);
- delete thread.cbOnInitialized;
- delete thread.promise;
- },
-
- event: function (thread, message) {
- objects.sendEvent(message, thread);
- },
-
- events: function (thread, message) {
- objects.sendEvents(message, thread);
- },
-
- object: function (thread, message) {
- objects.updateObject(message);
- },
-
- track: function (thread, message) {
- let player = objects.objects.find(o => o.id === message.serverId);
- if (!player)
- return;
-
- player.auth.gaTracker.track(message.obj);
- },
-
- callDifferentThread: function (thread, message) {
- let obj = connections.players.find(p => (p.name === message.playerName));
- if (!obj)
- return;
-
- let newThread = getThreadFromName(obj.zoneName);
- if (!newThread)
- return;
-
- newThread.worker.send({
- module: message.data.module,
- method: message.data.method,
- args: message.data.args
- });
- },
-
- rezone: async function (thread, message) {
- const { args: { obj, newZone, keepPos = true } } = message;
-
- removePlayerFromThread(thread, obj.serverId);
-
- //When messages are sent from map threads, they have an id (id of the object in the map thread)
- // as well as a serverId (id of the object in the main thread)
- const serverId = obj.serverId;
- obj.id = serverId;
- obj.destroyed = false;
-
- const serverObj = objects.objects.find(o => o.id === obj.id);
- const mapExists = mapList.some(m => m.name === newZone);
-
- if (mapExists) {
- serverObj.zoneName = newZone;
- obj.zoneName = newZone;
- } else {
- obj.zoneName = clientConfig.config.defaultZone;
- serverObj.zoneName = clientConfig.config.defaultZone;
- }
-
- delete serverObj.zoneId;
- delete obj.zoneId;
-
- const isRezone = true;
- await atlas.addObject(obj, keepPos, isRezone);
- },
-
- onZoneIdle: function (thread) {
- listenersOnZoneIdle.forEach(l => l(thread));
- }
- };
-
- const onMessage = (thread, message) => {
- if (message.module) {
- try {
- if (message.includeThreadInArgs)
- message.threadId = thread.id;
-
- global[message.module][message.method](message);
- } catch (e) {
- /* eslint-disable-next-line no-console */
- console.log('No global method found', message.module, message.method);
- process.exit();
- }
- } else if (message.event === 'onCrashed') {
- thread.worker.kill();
- process.exit();
- } else
- messageHandlers[message.method](thread, message);
- };
-
- const spawnThread = async ({ map: { name, path, instanced, destroyWhenEmptyForMs }, obj }) => {
- destroyWhenEmptyForMs = destroyWhenEmptyForMs ?? consts.destroyThreadWhenEmptyForMs;
-
- let cbOnInitialized;
-
- const promise = new Promise(resolveOnReady => {
- cbOnInitialized = resolveOnReady;
- });
-
- let id = instanced ? _.getGuid() : name;
- const thread = {
- id,
- name,
- instanced,
- path,
- worker: null,
- isReady: false,
- promise,
- preConfig: {},
- cbOnInitialized,
- players: [],
- playersCurrent: [],
- birthEpoch: +new Date(),
- destroyWhenEmptyForMs,
- emptySinceEpoch: null,
- sendArgsToWorker: ['name', 'id', 'preConfig'],
- workerArgs: null
- };
-
- const emBeforeSpawnThread = {
- thread,
- spawnForObject: obj
- };
- events.emit('beforeSpawnThread', emBeforeSpawnThread);
-
- thread.workerArgs = Object.fromEntries(
- thread.sendArgsToWorker.map(a => [a, thread[a]])
- );
-
- _.log(`Spawning: ${JSON.stringify({ id: thread.id, name: thread.name }, null, '\t')}`);
- thread.worker = childProcess.fork('./world/worker', [JSON.stringify(thread.workerArgs)]);
-
- thread.worker.on('message', onMessage.bind(null, thread));
-
- threads.push(thread);
-
- return promise;
- };
-
- const getThread = async ({ zoneName, zoneId, obj }) => {
- const result = {
- resetObjPosition: false,
- thread: null
- };
-
- let map = mapList.find(m => m.name === zoneName);
-
- if (!map)
- map = mapList.find(m => m.name === clientConfig.config.defaultZone);
-
- let thread = threads.find(t => t.id === zoneId && t.name === zoneName);
-
- //Maybe this player has been in a thread for this map before
- if (!thread)
- thread = threads.find(t => t.name === zoneName && t.players.includes(obj.id));
-
- if (!thread) {
- if (map.instanced)
- result.resetObjPosition = true;
-
- thread = await spawnThread({
- map,
- obj
- });
- }
-
- if (!thread.isReady)
- await thread.promise;
-
- result.thread = thread;
-
- return result;
- };
-
- const killThread = thread => {
- _.log(`Killing: ${thread.workerArgs.id}`);
-
- thread.worker.kill();
- threads.spliceWhere(t => t === thread);
- };
-
- const sendMessageToThread = ({ threadId, msg }) => {
- console.log('ok');
- const thread = threads.find(t => t.id === threadId);
- if (thread)
- thread.worker.send(msg);
- };
-
- const messageAllThreads = message => {
- threads.forEach(t => t.worker.send(message));
- };
-
- const returnWhenThreadsIdle = async () => {
- return new Promise(res => {
- let doneCount = 0;
-
- const onZoneIdle = thread => {
- doneCount++;
-
- if (doneCount.length < threads.length)
- return;
-
- listenersOnZoneIdle.spliceWhere(l => l === onZoneIdle);
- res();
- };
-
- listenersOnZoneIdle.push(onZoneIdle);
-
- threads.forEach(t => {
- t.worker.send({
- method: 'notifyOnceIdle'
- });
- });
- });
- };
-
- const spawnMapThreads = async () => {
- const promises = mapList
- .filter(m => m.autoSpawn === true)
- .map(m => spawnThread({ map: m }));
-
- await Promise.all(promises);
- };
-
- const addPlayerToThread = (thread, playerId) => {
- thread.players.push(playerId);
- thread.playersCurrent.push(playerId);
-
- if (thread.emptySinceEpoch)
- thread.emptySinceEpoch = null;
- };
-
- const update = () => {
- let tLen = threads.length;
- for (let i = 0; i < tLen; i++) {
- const t = threads[i];
-
- if (!t.isReady || t.destroyWhenEmptyForMs === -1)
- continue;
-
- if (!t.emptySinceEpoch && t.playersCurrent.length === 0)
- t.emptySinceEpoch = +new Date();
-
- if (!t.emptySinceEpoch)
- continue;
-
- const ageInMs = (+new Date() - t.emptySinceEpoch);
-
- if (ageInMs < t.destroyWhenEmptyForMs)
- continue;
-
- killThread(t);
- i--;
- tLen--;
- }
- };
-
- const init = async () => {
- await spawnMapThreads();
-
- setInterval(update, 5000);
- };
-
- //Exports
- module.exports = {
- init,
- getThread,
- killThread,
- getThreadFromId,
- messageAllThreads,
- sendMessageToThread,
- returnWhenThreadsIdle,
- addPlayerToThread,
- removePlayerFromThread,
- gePlayerCountInThread
- };
|