forked from noxious/server
Mass replace parameter order (socket,io)>(io,socket), worked on queueing system
This commit is contained in:
@ -31,7 +31,7 @@ class CharacterManager {
|
||||
}
|
||||
|
||||
public hasResetMovement(character: ExtendedCharacter) {
|
||||
return this.characters.find(x => x.id === character.id)?.resetMovement;
|
||||
return this.characters.find((x) => x.id === character.id)?.resetMovement
|
||||
}
|
||||
|
||||
public getCharactersInZone(zone: Zone) {
|
||||
|
@ -1,29 +1,91 @@
|
||||
import IORedis from 'ioredis';
|
||||
import IORedis from 'ioredis'
|
||||
import { Job, Queue, Worker } from 'bullmq'
|
||||
import config from '../utilities/config'
|
||||
import { Server as SocketServer } from 'socket.io'
|
||||
import { TSocket } from '../utilities/types'
|
||||
import { queueLogger } from '../utilities/logger'
|
||||
|
||||
class QueueManager {
|
||||
private connection!: IORedis;
|
||||
public queue!: Queue;
|
||||
public worker!: Worker;
|
||||
private connection!: IORedis
|
||||
private queue!: Queue
|
||||
private worker!: Worker
|
||||
private io!: SocketServer
|
||||
|
||||
public async boot(io: SocketServer) {
|
||||
this.io = io
|
||||
|
||||
public async boot() {
|
||||
this.connection = new IORedis(config.REDIS_URL, {
|
||||
maxRetriesPerRequest: null
|
||||
});
|
||||
// this.queue = new Queue('myqueue', { connection: this.connection });
|
||||
// this.worker = new Worker('myqueue', async (job: Job)=>{
|
||||
// console.log('hallo')
|
||||
// console.log(job.data);
|
||||
// console.log(job.data.data.classobj);
|
||||
// const test = job.data.data.classobj();
|
||||
// console.log(test);
|
||||
// console.log(job.data);
|
||||
// }, { connection: this.connection, concurrency: 10000 });
|
||||
//
|
||||
})
|
||||
|
||||
console.log('hallo')
|
||||
try {
|
||||
await this.connection.ping()
|
||||
queueLogger.info('Successfully connected to Redis')
|
||||
} catch (error) {
|
||||
queueLogger.error('Failed to connect to Redis:', error)
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
this.queue = new Queue('jobs')
|
||||
this.worker = new Worker('jobs', this.processJob.bind(this), {
|
||||
connection: this.connection,
|
||||
concurrency: 10000
|
||||
})
|
||||
|
||||
this.worker.on('completed', (job) => {
|
||||
queueLogger.info(`Job ${job?.id} has completed`)
|
||||
})
|
||||
|
||||
this.worker.on('failed', (job, err) => {
|
||||
queueLogger.error(`Job ${job?.id} failed with error: ${err}`)
|
||||
})
|
||||
|
||||
queueLogger.info('Queue manager loaded')
|
||||
}
|
||||
|
||||
private async processJob(job: Job) {
|
||||
const { jobClass, params, socketId } = job.data
|
||||
|
||||
console.log('Processing job:', job)
|
||||
try {
|
||||
const JobClass = await import(`../jobs/${jobClass}`)
|
||||
|
||||
if (!JobClass) {
|
||||
queueLogger.warn(`Job class not found: ${jobClass}`)
|
||||
return
|
||||
}
|
||||
|
||||
console.log('Job class:', JobClass)
|
||||
|
||||
console.log('Job class:', JobClass.name)
|
||||
|
||||
const job = new JobClass(params)
|
||||
|
||||
if (socketId && this.io) {
|
||||
const socket = this.io.sockets.sockets.get(socketId)
|
||||
if (socket) {
|
||||
await job.execute(this.io, socket)
|
||||
} else {
|
||||
queueLogger.warn(`Socket not found for job: ${socketId}`)
|
||||
await job.execute(this.io)
|
||||
}
|
||||
} else {
|
||||
await job.execute(this.io)
|
||||
}
|
||||
} catch (error: any) {
|
||||
queueLogger.error(`Error processing job: ${error.message}`)
|
||||
}
|
||||
}
|
||||
|
||||
public async addToQueue(jobClass: any, params: any, socket?: TSocket) {
|
||||
const jobData = {
|
||||
jobClass: jobClass.name,
|
||||
params,
|
||||
socketId: socket?.id
|
||||
}
|
||||
|
||||
await this.queue.add('job', jobData)
|
||||
}
|
||||
}
|
||||
|
||||
export default new QueueManager();
|
||||
export default new QueueManager()
|
||||
|
Reference in New Issue
Block a user