From 9f7c48f2c2ea4e9113512e6ac7e7150829fe0dbe Mon Sep 17 00:00:00 2001 From: Dennis Postma Date: Sun, 22 Sep 2024 00:20:49 +0200 Subject: [PATCH] Working proof of concept for queue --- .../{characterLeaveZone.ts => SomeJob.ts} | 0 src/managers/queueManager.ts | 42 ++++++++++++------- src/socketEvents/character/connect.ts | 3 +- src/utilities/config.ts | 2 +- 4 files changed, 28 insertions(+), 19 deletions(-) rename src/jobs/{characterLeaveZone.ts => SomeJob.ts} (100%) diff --git a/src/jobs/characterLeaveZone.ts b/src/jobs/SomeJob.ts similarity index 100% rename from src/jobs/characterLeaveZone.ts rename to src/jobs/SomeJob.ts diff --git a/src/managers/queueManager.ts b/src/managers/queueManager.ts index bff56d6..c0bb095 100644 --- a/src/managers/queueManager.ts +++ b/src/managers/queueManager.ts @@ -4,6 +4,8 @@ import config from '../utilities/config' import { Server as SocketServer } from 'socket.io' import { TSocket } from '../utilities/types' import { queueLogger } from '../utilities/logger' +import fs from 'fs' +import path from 'path' class QueueManager { private connection!: IORedis @@ -26,7 +28,10 @@ class QueueManager { process.exit(1) } - this.queue = new Queue('jobs') + this.queue = new Queue('jobs', { + connection: this.connection + }) + this.worker = new Worker('jobs', this.processJob.bind(this), { connection: this.connection, concurrency: 10000 @@ -44,46 +49,51 @@ class QueueManager { } private async processJob(job: Job) { - const { jobClass, params, socketId } = job.data + console.log('Processing job:', job.data.jobName) + const { jobName, params, socketId } = job.data - console.log('Processing job:', job) try { - const JobClass = await import(`../jobs/${jobClass}`) + const jobsDir = path.join(process.cwd(), 'src', 'jobs') + const extension = config.ENV === 'development' ? '.ts' : '.js' + const jobPath = path.join(jobsDir, `${jobName}${extension}`) - if (!JobClass) { - queueLogger.warn(`Job class not found: ${jobClass}`) + if (!fs.existsSync(jobPath)) { + queueLogger.warn(`Job file not found: ${jobPath}`) return } - console.log('Job class:', JobClass) + const JobModule = await import(jobPath) + const JobClass = JobModule.default - console.log('Job class:', JobClass.name) + if (!JobClass || typeof JobClass !== 'function') { + queueLogger.warn(`Invalid job class in file: ${jobPath}`) + return + } - const job = new JobClass(params) + const jobInstance = new JobClass(params) if (socketId && this.io) { const socket = this.io.sockets.sockets.get(socketId) if (socket) { - await job.execute(this.io, socket) + await jobInstance.execute(this.io, socket) } else { queueLogger.warn(`Socket not found for job: ${socketId}`) - await job.execute(this.io) + await jobInstance.execute(this.io) } } else { - await job.execute(this.io) + await jobInstance.execute(this.io) } } catch (error: any) { - queueLogger.error(`Error processing job: ${error.message}`) + queueLogger.error(`Error processing job ${jobName}: ${error.message}`) } } - public async addToQueue(jobClass: any, params: any, socket?: TSocket) { + public async newJob(jobName: string, params: any, socket?: TSocket) { const jobData = { - jobClass: jobClass.name, + jobName, params, socketId: socket?.id } - await this.queue.add('job', jobData) } } diff --git a/src/socketEvents/character/connect.ts b/src/socketEvents/character/connect.ts index 7ca259d..3be6b4a 100644 --- a/src/socketEvents/character/connect.ts +++ b/src/socketEvents/character/connect.ts @@ -3,7 +3,6 @@ import { TSocket, ExtendedCharacter } from '../../utilities/types' import CharacterRepository from '../../repositories/characterRepository' import CharacterManager from '../../managers/characterManager' import QueueManager from '../../managers/queueManager' -import SomeJob from '../../jobs/characterLeaveZone' type SocketResponseT = { character_id: number @@ -16,7 +15,7 @@ export default function (io: Server, socket: TSocket) { const character = await CharacterRepository.getByUserAndId(socket?.user?.id as number, data.character_id) if (!character) return socket.characterId = character.id - await QueueManager.addToQueue(SomeJob, { someParam: 'value' }, socket) + await QueueManager.newJob('SomeJob', { someParam: 'value' }, socket) CharacterManager.initCharacter(character as ExtendedCharacter) socket.emit('character:connect', character) } catch (error: any) { diff --git a/src/utilities/config.ts b/src/utilities/config.ts index efea8ed..8424179 100644 --- a/src/utilities/config.ts +++ b/src/utilities/config.ts @@ -3,7 +3,7 @@ import dotenv from 'dotenv' dotenv.config() class config { - static ENV: string = process.env.ENV || 'prod' + static ENV: string = process.env.ENV || 'development' static REDIS_URL: string = process.env.REDIS_URL || 'redis://@127.0.0.1:6379/4' static HOST: string = process.env.HOST || '0.0.0.0' static PORT: number = process.env.PORT ? parseInt(process.env.PORT) : 6969