|
|
|
@ -382,20 +382,19 @@ void WebhookActor::load_updates() {
|
|
|
|
for (auto &update : updates) {
|
|
|
|
for (auto &update : updates) {
|
|
|
|
VLOG(webhook) << "Load update " << update.id;
|
|
|
|
VLOG(webhook) << "Load update " << update.id;
|
|
|
|
if (update_map_.find(update.id) != update_map_.end()) {
|
|
|
|
if (update_map_.find(update.id) != update_map_.end()) {
|
|
|
|
LOG(ERROR) << "Receive duplicated event from tqueue " << update.id;
|
|
|
|
LOG(ERROR) << "Receive duplicated event " << update.id << " from tqueue";
|
|
|
|
continue;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
auto &dest = update_map_[update.id];
|
|
|
|
auto &dest = update_map_[update.id];
|
|
|
|
dest.id_ = update.id;
|
|
|
|
dest.id_ = update.id;
|
|
|
|
dest.json_ = update.data.str();
|
|
|
|
dest.json_ = update.data.str();
|
|
|
|
dest.state_ = Update::State::Begin;
|
|
|
|
|
|
|
|
dest.delay_ = 1;
|
|
|
|
dest.delay_ = 1;
|
|
|
|
dest.wakeup_at_ = now;
|
|
|
|
dest.wakeup_at_ = now;
|
|
|
|
CHECK(update.expires_at >= unix_time_now);
|
|
|
|
CHECK(update.expires_at >= unix_time_now);
|
|
|
|
dest.expires_at_ = update.expires_at;
|
|
|
|
dest.expires_at_ = update.expires_at;
|
|
|
|
dest.queue_id_ = update.extra;
|
|
|
|
dest.queue_id_ = update.extra;
|
|
|
|
tqueue_offset_ = update.id.next().move_as_ok();
|
|
|
|
tqueue_offset_ = update.id.next().move_as_ok();
|
|
|
|
begin_updates_n_++;
|
|
|
|
pending_update_count_++;
|
|
|
|
|
|
|
|
|
|
|
|
if (dest.queue_id_ == 0) {
|
|
|
|
if (dest.queue_id_ == 0) {
|
|
|
|
dest.queue_id_ = unique_queue_id_++;
|
|
|
|
dest.queue_id_ = unique_queue_id_++;
|
|
|
|
@ -485,14 +484,13 @@ void WebhookActor::on_update_error(td::TQueue::EventId event_id, td::Slice error
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
auto &update = it->second;
|
|
|
|
auto &update = it->second;
|
|
|
|
update.state_ = Update::State::Begin;
|
|
|
|
|
|
|
|
update.delay_ = next_delay;
|
|
|
|
update.delay_ = next_delay;
|
|
|
|
update.wakeup_at_ = now + next_effective_delay;
|
|
|
|
update.wakeup_at_ = now + next_effective_delay;
|
|
|
|
update.fail_count_++;
|
|
|
|
update.fail_count_++;
|
|
|
|
queues_.emplace(update.wakeup_at_, update.queue_id_);
|
|
|
|
queues_.emplace(update.wakeup_at_, update.queue_id_);
|
|
|
|
VLOG(webhook) << "Delay update " << event_id << " for " << (update.wakeup_at_ - now) << " seconds because of "
|
|
|
|
VLOG(webhook) << "Delay update " << event_id << " for " << (update.wakeup_at_ - now) << " seconds because of "
|
|
|
|
<< error << " after " << update.fail_count_ << " fails";
|
|
|
|
<< error << " after " << update.fail_count_ << " fails";
|
|
|
|
begin_updates_n_++;
|
|
|
|
pending_update_count_++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
td::Status WebhookActor::send_update() {
|
|
|
|
td::Status WebhookActor::send_update() {
|
|
|
|
@ -533,8 +531,7 @@ td::Status WebhookActor::send_update() {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
auto &connection = *Connection::from_list_node(ready_connections_.get());
|
|
|
|
auto &connection = *Connection::from_list_node(ready_connections_.get());
|
|
|
|
begin_updates_n_--;
|
|
|
|
pending_update_count_--;
|
|
|
|
update.state_ = Update::State::Send;
|
|
|
|
|
|
|
|
connection.event_id_ = update.id_;
|
|
|
|
connection.event_id_ = update.id_;
|
|
|
|
|
|
|
|
|
|
|
|
VLOG(webhook) << "Send update " << update.id_ << " from queue " << queue_id << " into connection " << connection.id_
|
|
|
|
VLOG(webhook) << "Send update " << update.id_ << " from queue " << queue_id << " into connection " << connection.id_
|
|
|
|
@ -548,8 +545,8 @@ td::Status WebhookActor::send_update() {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void WebhookActor::send_updates() {
|
|
|
|
void WebhookActor::send_updates() {
|
|
|
|
VLOG(webhook) << "Have " << begin_updates_n_ << " pending updates to send";
|
|
|
|
VLOG(webhook) << "Have " << pending_update_count_ << " pending updates to send";
|
|
|
|
while (begin_updates_n_ > 0) {
|
|
|
|
while (pending_update_count_ > 0) {
|
|
|
|
if (send_update().is_error()) {
|
|
|
|
if (send_update().is_error()) {
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|