Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import bftp from 'basic-ftp';
- import { FtpJobFactory } from './FtpJobFactory';
- import { JobExecutor } from './JobExecutor';
- import { JobDataInterface } from './interfaces/JobDataInterface';
- import { FtpEventConstants } from './interfaces/FtpEventConstants';
- import { FtpConfigInterface } from './interfaces/FtpConfigInterface';
- export class FtpQueueService<EventConstants extends FtpEventConstants> {
- protected queue: JobDataInterface[];
- protected connections: bftp.Client[];
- protected interval: NodeJS.Timeout;
- constructor(
- protected config: FtpConfigInterface,
- protected connectionsNumber: number,
- protected jobExecutor: JobExecutor<EventConstants>,
- protected jobMaker: FtpJobFactory,
- ) {
- this.connections = [];
- this.queue = [];
- // Add events here, possibly move callback out from here
- for (let i = 0; i < connectionsNumber; i += 1) {
- const client = new bftp.Client(0);
- client.busy = false;
- this.connections.push(client);
- }
- this.interval = setInterval(() => {}, 10000);
- }
- async initFTPConnections() {
- for (const connectionId in this.connections) {
- try {
- await this.connectCliect(Number(connectionId));
- } catch (e) {
- console.error({
- user: this.config.DEFAULT_FTP_USER,
- password: this.config.DEFAULT_FTP_PASS,
- host: this.config.DEFAULT_FTP_HOST,
- port: this.config.DEFAULT_FTP_PORT,
- });
- }
- }
- }
- push(job: JobDataInterface) {
- if (!this.tryToRunJob(job)) {
- return this.queue.push(job);
- }
- }
- protected pop(): JobDataInterface | undefined {
- if (this.queue.length) {
- return this.queue.pop();
- }
- }
- protected getConnection(connectionId: number): bftp.Client {
- return this.connections[connectionId];
- }
- protected executeJob(job: JobDataInterface, freeConnectionId: number) {
- this.getConnection(freeConnectionId).busy = true;
- return this.getEnsurecConnection(freeConnectionId)
- .then(() => this.jobExecutor.executeJob(this.jobMaker.makeJob(job), this.getConnection(freeConnectionId)))
- .then(() => this.getConnection(freeConnectionId).cd('/'))
- .then(() => {
- const job = this.pop();
- if (job) {
- this.executeJob(job, freeConnectionId);
- } else {
- this.getConnection(freeConnectionId).busy = false;
- }
- })
- .catch(() => this.regenerateConnection(freeConnectionId));
- }
- protected getEnsurecConnection(freeConnectionId: number) {
- return Promise.resolve(
- Promise.resolve(this.getConnection(freeConnectionId).pwd()).catch((e) =>
- this.regenerateConnection(freeConnectionId),
- ),
- );
- }
- protected tryToRunJob(job: JobDataInterface) {
- const freeConnectionId = this.getEmptyConnectionId();
- if (freeConnectionId !== undefined) {
- return this.executeJob(job, freeConnectionId);
- }
- }
- protected getEmptyConnectionId(): number | undefined {
- for (const connectionId in this.connections) {
- if (this.connections[connectionId] && !this.connections[connectionId].busy) {
- return Number(connectionId);
- }
- }
- }
- protected regenerateConnection(connectionId: number) {
- this.getConnection(connectionId).busy = false;
- this.getConnection(connectionId).close();
- delete this.connections[connectionId];
- this.connections[connectionId] = new bftp.Client(0);
- this.connections[connectionId].busy = false;
- return this.connectCliect(connectionId);
- }
- protected connectCliect(connectionId: number) {
- return this.connections[connectionId].access({
- user: this.config.DEFAULT_FTP_USER,
- password: this.config.DEFAULT_FTP_PASS,
- host: this.config.DEFAULT_FTP_HOST,
- port: this.config.DEFAULT_FTP_PORT,
- });
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement