import IORedis from 'ioredis' import { Job, Queue, Worker } from 'bullmq' import config from '../utilities/config' import { Server as SocketServer } from 'socket.io' import { TSocket } from '../utilities/types' import { queueLogger } from '../utilities/logger' import fs from 'fs' import path from 'path' class QueueManager { private connection!: IORedis private queue!: Queue private worker!: Worker private io!: SocketServer public async boot(io: SocketServer) { this.io = io this.connection = new IORedis(config.REDIS_URL, { maxRetriesPerRequest: null }) try { await this.connection.ping() queueLogger.info('Successfully connected to Redis') } catch (error) { queueLogger.error('Failed to connect to Redis:', error) process.exit(1) } this.queue = new Queue('jobs', { connection: this.connection }) this.worker = new Worker('jobs', this.processJob.bind(this), { connection: this.connection, concurrency: 10000 }) this.worker.on('completed', (job) => { queueLogger.info(`Job ${job?.id} has completed`) }) this.worker.on('failed', (job, err) => { queueLogger.error(`Job ${job?.id} failed with error: ${err}`) }) queueLogger.info('Queue manager loaded') } private async processJob(job: Job) { const { jobName, params, socketId } = job.data; const jobsDir = path.join(process.cwd(), 'src', 'jobs'); const extension = config.ENV === 'development' ? '.ts' : '.js'; const jobPath = path.join(jobsDir, `${jobName}${extension}`); queueLogger.info(`Processing job: ${jobName}`); if (!fs.existsSync(jobPath)) { queueLogger.warn(`Job file not found: ${jobPath}`); return; } try { const module = await import(jobPath); if (typeof module.default !== 'function') { queueLogger.warn(`Unrecognized export in ${jobName}`); return; } const JobClass = module.default; const jobInstance = new JobClass(params); if (socketId && this.io) { const socket = this.io.sockets.sockets.get(socketId); if (socket) { await jobInstance.execute(this.io, socket); } else { queueLogger.warn(`Socket not found for job: ${socketId}`); await jobInstance.execute(this.io); } } else { await jobInstance.execute(this.io); } } catch (error) { queueLogger.error(`Error processing job ${jobName}: ${error instanceof Error ? error.message : String(error)}`); } } public async newJob(jobName: string, params: any, socket?: TSocket) { const jobData = { jobName, params, socketId: socket?.id } await this.queue.add('job', jobData) } } export default new QueueManager()