forked from noxious/server
105 lines
2.8 KiB
TypeScript
105 lines
2.8 KiB
TypeScript
import fs from 'fs'
|
|
|
|
import { Job, Queue, Worker } from 'bullmq'
|
|
import IORedis from 'ioredis'
|
|
import { Server as SocketServer } from 'socket.io'
|
|
|
|
import config from '#application/config'
|
|
import Logger, { LoggerType } from '#application/logger'
|
|
import Storage from '#application/storage'
|
|
import { TSocket } from '#application/types'
|
|
import SocketManager from '#managers/socketManager'
|
|
|
|
class QueueManager {
|
|
private connection!: IORedis
|
|
private queue!: Queue
|
|
private worker!: Worker
|
|
private io!: SocketServer
|
|
private logger = Logger.type(LoggerType.QUEUE)
|
|
|
|
public async boot() {
|
|
this.io = SocketManager.getIO()
|
|
|
|
this.connection = new IORedis(config.REDIS_URL, {
|
|
maxRetriesPerRequest: null
|
|
})
|
|
|
|
try {
|
|
await this.connection.ping()
|
|
this.logger.info('Successfully connected to Redis')
|
|
} catch (error) {
|
|
this.logger.error('Failed to connect to Redis:', error)
|
|
process.exit(1)
|
|
}
|
|
|
|
this.queue = new Queue('jobs', {
|
|
connection: this.connection
|
|
})
|
|
|
|
this.worker = new Worker('jobs', this.processJob.bind(this), {
|
|
connection: this.connection,
|
|
concurrency: 10000
|
|
})
|
|
|
|
this.worker.on('completed', (job) => {
|
|
this.logger.info(`Job ${job?.id} has completed`)
|
|
})
|
|
|
|
this.worker.on('failed', (job, err) => {
|
|
this.logger.error(`Job ${job?.id} failed with error: ${err}`)
|
|
})
|
|
|
|
this.logger.info('Queue manager loaded')
|
|
}
|
|
|
|
private async processJob(job: Job) {
|
|
const { jobName, params, socketId } = job.data
|
|
|
|
try {
|
|
const jobsDir = Storage.getAppPath('jobs')
|
|
const extension = config.ENV === 'development' ? '.ts' : '.js'
|
|
const jobPath = Storage.getAppPath('jobs', `${jobName}${extension}`)
|
|
|
|
if (!fs.existsSync(jobPath)) {
|
|
this.logger.warn(`Job file not found: ${jobPath}`)
|
|
return
|
|
}
|
|
|
|
const JobModule = await import(jobPath)
|
|
const JobClass = JobModule.default
|
|
|
|
if (!JobClass || typeof JobClass !== 'function') {
|
|
this.logger.warn(`Invalid job class in file: ${jobPath}`)
|
|
return
|
|
}
|
|
|
|
const jobInstance = new JobClass(params)
|
|
|
|
if (socketId && this.io) {
|
|
const socket = this.io.sockets.sockets.get(socketId)
|
|
if (socket) {
|
|
await jobInstance.execute(this.io, socket)
|
|
} else {
|
|
this.logger.warn(`Socket not found for job: ${socketId}`)
|
|
await jobInstance.execute(this.io)
|
|
}
|
|
} else {
|
|
await jobInstance.execute(this.io)
|
|
}
|
|
} catch (error: any) {
|
|
this.logger.error(`Error processing job ${jobName}: ${error.message}`)
|
|
}
|
|
}
|
|
|
|
public async newJob(jobName: string, params: any, socket?: TSocket) {
|
|
const jobData = {
|
|
jobName,
|
|
params,
|
|
socketId: socket?.id
|
|
}
|
|
await this.queue.add('job', jobData)
|
|
}
|
|
}
|
|
|
|
export default new QueueManager()
|