From 9cdc706a4b077080ff0ce24b8213a5028b3324b3 Mon Sep 17 00:00:00 2001 From: Shaun Date: Sat, 20 May 2023 16:13:41 +0200 Subject: [PATCH] work --- src/server/index.js | 9 +-- src/server/world/atlas.js | 20 +++--- src/server/world/threadManager.js | 103 +++++++++++++++++++++++++----- 3 files changed, 100 insertions(+), 32 deletions(-) diff --git a/src/server/index.js b/src/server/index.js index f6e7a5a0..5d4a1871 100644 --- a/src/server/index.js +++ b/src/server/index.js @@ -1,5 +1,5 @@ +//Imports require('./globals'); - const server = require('./server/index'); const components = require('./components/components'); const mods = require('./misc/mods'); @@ -16,9 +16,10 @@ const mapManager = require('./world/mapManager'); const fixes = require('./fixes/fixes'); const profanities = require('./misc/profanities'); const routerConfig = require('./security/routerConfig'); -const { spawnMapThreads } = require('./world/threadManager'); +const threadManager = require('./world/threadManager'); -let startup = { +//Module +const startup = { init: function () { io.init(this.onDbReady.bind(this)); }, @@ -58,7 +59,7 @@ let startup = { await leaderboard.init(); - await spawnMapThreads(); + await threadManager.init(); }, onError: async function (e) { diff --git a/src/server/world/atlas.js b/src/server/world/atlas.js index dcd1389f..a4fe4486 100644 --- a/src/server/world/atlas.js +++ b/src/server/world/atlas.js @@ -2,7 +2,7 @@ const objects = require('../objects/objects'); const events = require('../misc/events'); const { - getThread, killThread, sendMessageToThread, getThreadFromId, returnWhenThreadsIdle, gePlayerCountInThread + getThread, killThread, sendMessageToThread, getThreadFromId, returnWhenThreadsIdle, gePlayerCountInThread, addPlayerToThread, removePlayerFromThread } = require('./threadManager'); const { registerCallback, removeCallback } = require('./atlas/registerCallback'); @@ -29,7 +29,8 @@ module.exports = { const { thread, resetObjPosition } = await getThread({ zoneName, - zoneId + zoneId, + playerId: obj.id }); if (resetObjPosition) { @@ -47,6 +48,8 @@ module.exports = { const simpleObj = obj.getSimple ? obj.getSimple(true, true) : obj; + addPlayerToThread(thread, obj.id); + sendMessageToThread({ threadId: obj.zoneId, msg: { @@ -60,7 +63,7 @@ module.exports = { }); }, - removeObjectFromInstancedZone: async function (thread, objId, callback) { + /*removeObjectFromInstancedZone: async function (thread, objId, callback) { await new Promise(res => { const cb = this.registerCallback(res); @@ -77,12 +80,9 @@ module.exports = { if (callback) callback(); - }, + },*/ removeObject: async function (obj, skipLocal, callback) { - //We need to store the player id because the calling thread might delete it (connections.unzone) - const playerId = obj.id; - if (!skipLocal) objects.removeObject(obj); @@ -93,11 +93,7 @@ module.exports = { return; } - if (thread.instanced && (await gePlayerCountInThread(thread)) === 1) { - this.removeObjectFromInstancedZone(thread, playerId, callback); - - return; - } + removePlayerFromThread(thread, obj.id); let callbackId = null; if (callback) diff --git a/src/server/world/threadManager.js b/src/server/world/threadManager.js index 707f64bb..ede4ba3c 100644 --- a/src/server/world/threadManager.js +++ b/src/server/world/threadManager.js @@ -1,3 +1,23 @@ +/* + This module contains an array of all threads. Each thread looks like this: + + { + id: 'The zoneId', + name: 'The name of the map', + instanced: 'Boolean value indicating whether the thread is instanced or not', + path: 'The path to the map file', + worker: 'The actual thread that has been spawned', + isReady: 'Boolean value that turns from false to true as soon as the thread is ready to accept players', + promise: 'Used by the getThread method to wait for the thread to be ready, so it can be sent to the atlas', + cbOnInitialized: 'The promise resolver', + players: 'An array of all player id's that have been in this thread', + playersCurrent: 'An array of player id's that are currently in this thread', + birthEpoch: 'Defines the epoch when this was created', + destroyWhenEmptyForMs: 'If not equal to -1, defines whether the thread should be destroyed if it has been empty for this amount of milliseconds', + emptySinceEpoch: 'An epoch of when the thread became empty, or null if not empty' + } +*/ + //System Imports const childProcess = require('child_process'); @@ -35,6 +55,10 @@ const gePlayerCountInThread = async thread => { return playerCount; }; +const removePlayerFromThread = (thread, playerId) => { + thread.playersCurrent.spliceWhere(p => p === playerId); +}; + const messageHandlers = { onReady: function (thread) { thread.worker.send({ @@ -94,10 +118,7 @@ const messageHandlers = { rezone: async function (thread, message) { const { args: { obj, newZone, keepPos = true } } = message; - if (thread.instanced && (await gePlayerCountInThread(thread)) === 0) { - thread.worker.kill(); - threads.spliceWhere(t => t === thread); - } + removePlayerFromThread(thread, obj.serverId); //When messages are sent from map threads, they have an id (id of the object in the map thread) // as well as a serverId (id of the object in the main thread) @@ -144,7 +165,7 @@ const onMessage = (thread, message) => { messageHandlers[message.method](thread, message); }; -const spawnThread = async ({ name, path, instanced }) => { +const spawnThread = async ({ name, path, instanced, destroyWhenEmptyForMs = -1 }) => { let cbOnInitialized; const promise = new Promise(resolveOnReady => { @@ -163,7 +184,12 @@ const spawnThread = async ({ name, path, instanced }) => { worker, isReady: false, promise, - cbOnInitialized + cbOnInitialized, + players: [], + playersCurrent: [], + birthEpoch: +new Date(), + destroyWhenEmptyForMs, + emptySinceEpoch: null }; worker.on('message', onMessage.bind(null, thread)); @@ -173,7 +199,7 @@ const spawnThread = async ({ name, path, instanced }) => { return promise; }; -const getThread = async ({ zoneName, zoneId }) => { +const getThread = async ({ zoneName, zoneId, playerId }) => { const result = { resetObjPosition: false, thread: null @@ -185,6 +211,10 @@ const getThread = async ({ zoneName, zoneId }) => { map = mapList.find(m => m.name === clientConfig.config.defaultZone); let thread = threads.find(t => t.id === zoneId && t.name === zoneName); + + //Maybe this player has been in a thread for this map before + if (!thread) + thread = threads.find(t => t.name === zoneName && t.players.includes(playerId)); if (!thread) { if (map.instanced) { @@ -208,14 +238,6 @@ const killThread = thread => { threads.spliceWhere(t => t === thread); }; -const spawnMapThreads = async () => { - const promises = mapList - .filter(m => !m.disabled && !m.instanced) - .map(m => spawnThread(m)); - - await Promise.all(promises); -}; - const sendMessageToThread = ({ threadId, msg }) => { const thread = threads.find(t => t.id === threadId); if (thread) @@ -250,14 +272,63 @@ const returnWhenThreadsIdle = async () => { }); }; +const spawnMapThreads = async () => { + const promises = mapList + .filter(m => !m.disabled && !m.instanced) + .map(m => spawnThread(m)); + + await Promise.all(promises); +}; + +const addPlayerToThread = (thread, playerId) => { + thread.players.push(playerId); + thread.playersCurrent.push(playerId); + + if (thread.emptySinceEpoch) + thread.emptySinceEpoch = null; +}; + +const update = () => { + let tLen = threads.length; + for (let i = 0; i < tLen; i++) { + const t = threads[i]; + + if (!t.isReady || t.destroyWhenEmptyForMs === -1) + continue; + + if (!t.emptySinceEpoch && t.playersCurrent.length === 0) + t.emptySinceEpoch = +new Date(); + + if (!t.emptySinceEpoch) + continue; + + const ageInMs = (+new Date() - t.emptySinceEpoch); + + if (ageInMs < t.destroyWhenEmptyForMs) + continue; + + killThread(t); + i--; + tLen--; + } +}; + +const init = async () => { + await spawnMapThreads(); + + setInterval(update, 5000); +}; + //Exports module.exports = { + init, getThread, killThread, getThreadFromId, - spawnMapThreads, messageAllThreads, sendMessageToThread, returnWhenThreadsIdle, + addPlayerToThread, + removePlayerFromThread, gePlayerCountInThread };