import IORedis from 'ioredis' import { Job, Queue, Worker } from 'bullmq' import config from '../utilities/config' import { Server as SocketServer } from 'socket.io' import { TSocket } from '../utilities/types' import { queueLogger } from '../utilities/logger' import fs from 'fs' import path from 'path' class QueueManager { private connection!: IORedis private queue!: Queue private worker!: Worker private io!: SocketServer public async boot(io: SocketServer) { this.io = io this.connection = new IORedis(config.REDIS_URL, { maxRetriesPerRequest: null }) try { await this.connection.ping() queueLogger.info('Successfully connected to Redis') } catch (error) { queueLogger.error('Failed to connect to Redis:', error) process.exit(1) } this.queue = new Queue('jobs', { connection: this.connection }) this.worker = new Worker('jobs', this.processJob.bind(this), { connection: this.connection, concurrency: 10000 }) this.worker.on('completed', (job) => { queueLogger.info(`Job ${job?.id} has completed`) }) this.worker.on('failed', (job, err) => { queueLogger.error(`Job ${job?.id} failed with error: ${err}`) }) queueLogger.info('Queue manager loaded') } private async processJob(job: Job) { console.log('Processing job:', job.data.jobName) const { jobName, params, socketId } = job.data try { const jobsDir = path.join(process.cwd(), 'src', 'jobs') const extension = config.ENV === 'development' ? '.ts' : '.js' const jobPath = path.join(jobsDir, `${jobName}${extension}`) if (!fs.existsSync(jobPath)) { queueLogger.warn(`Job file not found: ${jobPath}`) return } const JobModule = await import(jobPath) const JobClass = JobModule.default if (!JobClass || typeof JobClass !== 'function') { queueLogger.warn(`Invalid job class in file: ${jobPath}`) return } const jobInstance = new JobClass(params) if (socketId && this.io) { const socket = this.io.sockets.sockets.get(socketId) if (socket) { await jobInstance.execute(this.io, socket) } else { queueLogger.warn(`Socket not found for job: ${socketId}`) await jobInstance.execute(this.io) } } else { await jobInstance.execute(this.io) } } catch (error: any) { queueLogger.error(`Error processing job ${jobName}: ${error.message}`) } } public async newJob(jobName: string, params: any, socket?: TSocket) { const jobData = { jobName, params, socketId: socket?.id } await this.queue.add('job', jobData) } } export default new QueueManager()