Files
ActivityPubD/source/net/request_pool.d

85 lines
1.8 KiB
D

module net.request_pool;
import core.thread;
import core.sync.semaphore;
import std.container;
import std.parallelism;
import requests;
import slf4d;
struct PRequest {
string url;
string method = "GET";
QueryParam[] params;
string[string] headers;
string body = "";
string contentType = "text/plain";
}
class RequestPool {
private int m_totalWorkers;
private TaskPool m_taskPool;
private Logger _l;
private Request[] m_requests;
this(int totalWorkers = 4) {
this.m_totalWorkers = totalWorkers;
this._l = getLogger();
for (int i = 0; i < totalWorkers + 1; i++) {
Request rq = Request();
// TODO: add custom fields (such as user agent)
this.m_requests ~= rq;
}
}
void startBackground() {
_l.debugF!"Starting RequestPool with %d workers"(this.m_totalWorkers);
this.m_taskPool = new TaskPool(this.m_totalWorkers);
}
Response request(PRequest request, bool blocking = false) {
auto t = task(&this.m_run, request);
this.m_taskPool.put(t);
if (blocking)
return t.yieldForce();
return null;
}
void stop() {
this.m_taskPool.finish(true);
}
Response m_run(PRequest request) {
Request rq = this.m_requests[this.m_taskPool.workerIndex];
rq.addHeaders(request.headers);
_l.debugF!"[%d][%s] %s"(this.m_taskPool.workerIndex, request.method, request.url);
Response rs;
switch (request.method) {
case "GET":
rs = rq.get(request.url, request.params);
break;
case "POST":
rs = rq.post(request.url, request.body, request.contentType);
break;
default:
errorF!"Unknown request method: %s"(request.method);
return null;
}
_l.debugF!"[%d][%s] %s result code: %d"(this.m_taskPool.workerIndex, request.method, request.url, rs
.code);
return rs;
}
}