import AttachmentUploadRequest from "../request/AttachmentUploadRequest";
import AttachmentUploadRequestQueue from '../request/AttachmentUploadRequestQueue';
import AttachmentUploadWorkerError from '../errors/AttachmentUploadWorkerError';
import AttachmentUploadWorkerFactory from './AttachmentUploadWorkerFactory';
import ClientLogger from "../../../logging/ClientLogger";
import { Semaphore } from 'async-mutex';
import Task from "../../../util/concurrency/Task";

export default class AttachmentUploadScheduler implements Task<void> {

    private static readonly MAX_WORKERS: number = 5;

    private readonly attachmentUploadWorkerFactory: AttachmentUploadWorkerFactory;
    private readonly attachmentUploadRequestQueue: AttachmentUploadRequestQueue;
    private readonly logger: ClientLogger;
    private readonly semaphore: Semaphore = new Semaphore(AttachmentUploadScheduler.MAX_WORKERS);
    private readonly workInProgressAttachmentIds: Set<string> = new Set<string>([]);

    constructor(
        attachmentUploadWorkerFactory: AttachmentUploadWorkerFactory,
        attachmentUploadRequestQueue: AttachmentUploadRequestQueue,
        logger: ClientLogger
    ) {
        this.attachmentUploadWorkerFactory = attachmentUploadWorkerFactory;
        this.attachmentUploadRequestQueue = attachmentUploadRequestQueue;
        this.logger = logger;
    }

    public async run(): Promise<void> {
        let attachmentUploadRequest: AttachmentUploadRequest | null = await this.attachmentUploadRequestQueue.getNext(new Set(this.workInProgressAttachmentIds));
        this.workInProgressAttachmentIds.add(attachmentUploadRequest?.record.id!);
        while (attachmentUploadRequest) {
            const [value, release] = await this.semaphore.acquire();
            const worker: Task<string> = this.attachmentUploadWorkerFactory.createWorker(attachmentUploadRequest!);
            worker.run().then(attachmentId => {
                this.workInProgressAttachmentIds.delete(attachmentId);
                this.attachmentUploadRequestQueue.removeAttachmentFromUploadQueue(attachmentId);
                release();
            }).catch((error) => {
                const attachmentId: string = (error as AttachmentUploadWorkerError).getAttachmentId();
                this.logger.error(
                    `AttachmentUploadWorker failed for attachment id: ${attachmentId}`,
                    error,
                    ['AttachmentUploadWorkerFailure']
                );
                this.workInProgressAttachmentIds.delete(attachmentId);
                release();
            });
            if (this.semaphore.isLocked()) {
                await this.semaphore.waitForUnlock();
            }
            attachmentUploadRequest = await this.attachmentUploadRequestQueue.getNext(new Set(this.workInProgressAttachmentIds));
            this.workInProgressAttachmentIds.add(attachmentUploadRequest?.record.id!);
        }
    }
}
