From 4b1f6884ae7d6cc6430cd6559a22046cd6d365f8 Mon Sep 17 00:00:00 2001 From: Zaxiure Date: Sun, 22 Sep 2024 15:34:35 +0200 Subject: [PATCH] Example code --- src/jobs/SomeJob.ts | 5 +++++ src/managers/queueManager.ts | 19 ++++++++++++++++++- src/server.ts | 5 +++-- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/jobs/SomeJob.ts b/src/jobs/SomeJob.ts index 9009868..ae23bbe 100644 --- a/src/jobs/SomeJob.ts +++ b/src/jobs/SomeJob.ts @@ -1,9 +1,14 @@ import { TSocket } from '../utilities/types' import { Server as SocketServer } from 'socket.io' +import QueueManager from '../managers/queueManager' export default class SomeJob { constructor(private params: any) {} + public static initializeSocket(socket: TSocket) { + socket.on('character:connect', () => QueueManager.newJob('SomeJob', {}, socket)); + } + async execute(io: SocketServer, socket?: TSocket) { // Handle the event if (socket) { diff --git a/src/managers/queueManager.ts b/src/managers/queueManager.ts index ab54911..7196e62 100644 --- a/src/managers/queueManager.ts +++ b/src/managers/queueManager.ts @@ -6,15 +6,18 @@ import { TSocket } from '../utilities/types' import { queueLogger } from '../utilities/logger' import fs from 'fs' import path from 'path' +import { Dirent } from 'node:fs' class QueueManager { private connection!: IORedis private queue!: Queue private worker!: Worker private io!: SocketServer + private socket!: TSocket - public async boot(io: SocketServer) { + public async boot(io: SocketServer, socket: TSocket) { this.io = io + this.socket = socket this.connection = new IORedis(config.REDIS_URL, { maxRetriesPerRequest: null @@ -46,6 +49,20 @@ class QueueManager { }) queueLogger.info('Queue manager loaded') + await this.initializeSocketJobs() + } + + private async initializeSocketJobs() { + const dir = path.join(__dirname, '../jobs') + + const files: Dirent[] = await fs.promises.readdir(dir, { withFileTypes: true }) + + for (const file of files) { + const fullPath = path.join(dir, file.name) + + const module = await import(fullPath) + module.default.initializeSocket(this.socket); + } } private async processJob(job: Job) { diff --git a/src/server.ts b/src/server.ts index d6af995..0d0ee90 100644 --- a/src/server.ts +++ b/src/server.ts @@ -58,8 +58,7 @@ export class Server { appLogger.error(`Socket.IO failed to start: ${error.message}`) } - // Load queue manager - await QueueManager.boot(this.io) + // Add http API routes await addHttpRoutes(this.app) @@ -88,6 +87,8 @@ export class Server { private async handleConnection(socket: TSocket) { const eventsPath = path.join(__dirname, 'socketEvents') try { + // Load queue manager + await QueueManager.boot(this.io, socket); await this.loadEventHandlers(eventsPath, socket) } catch (error: any) { appLogger.error(`Failed to load event handlers: ${error.message}`)