averainy's Blog

averainy

15 Jun 2023

Angular RxJS: Limiting Concurrent HTTP Requests

详细内容青参考Angular/RxJS: Limiting Concurrent HTTP Requests
其逻辑主要是用到了mergemap的concurrent参数,mergemap会尽可能多的执行请求,加上concurrent参数之后就能够使得一次执行concurrent个请求,这样在上传文件的时候可以避免服务端因为请求过多而返回异常的问题。

import { HttpClient, HttpErrorResponse, HttpRequest } from "@angular/common/http";
import { from, of } from "rxjs";
import { catchError, mergeMap, tap } from "rxjs/operators";

// constructor, other things, etc...

ngOnInit(): void {
    this.uploadLimitConcurrency().then(res => {
        if (res instanceof Error) {
        // throw error
        } else {
        // your results
        }
    })
}

async uploadLimitConcurrency(): Promise<any> {
    const uploadRequests: Array<HttpRequest> = [...] // your many HTTP requests
    const CONCURRENT_UPLOADS = 3 // # of concurrent uploads to go at once
    let results: Array<any> = []
    let requestsCompleted: number = 0
    const concurrentPipe = await from(uploadRequests).pipe(
        mergeMap(obj => this.httpClient.request(obj), CONCURRENT_UPLOADS), // this allows us to limit concurrent uploads
        tap((res: any) => {
        if(res?.status == 200) { // do something with the results
            // push the headers of the response to results
            results.push(res)

            // you can keep track of progress of overall upload
            requestsCompleted++
        }
        }),
        catchError((err: HttpErrorResponse) => {
        if (err.status == 403) {
            return of(new Error(`A 403 error ocurred while trying to upload the file: ${err.error}`))
        }
        return of(new Error(`Some other error occurred: ${err.error}`))
        })
    ).toPromise()

    // handle error outside the pipe
    if (concurrentPipe instanceof Error) {
        console.log(concurrentPipe) // contains error
        return concurrentPipe
    }

    return results
}