this repo has no description
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix build issue and darlingserver CPU consumption

We weren't sleeping properly because we were using edge-triggered epoll with EPOLLOUT (which kept returning true since there was space in the buffer). Now we use edge-triggered epoll.

Note that we have to keep track of when the descriptor is readable/writable ourselves because as long as the descriptor remains writable, epoll won't notify us; we use an eventfd to notify the main loop when we have a message ready to send. The main loop will monitor it when the descriptor is writable; whenever the descriptor becomes unwritable, we stop monitoring the eventfd (since we can't send anything until it becomes writable again).

+101 -16
+1 -1
src/CMakeLists.txt
··· 56 56 #add_subdirectory(libdyld) 57 57 add_subdirectory(buildtools) 58 58 add_subdirectory(libelfloader/wrapgen) 59 - add_subdirectory(startup) 60 59 add_subdirectory(darlingserver) 60 + add_subdirectory(startup) 61 61 62 62 include_directories(${CMAKE_SOURCE_DIR}/basic-headers) 63 63
+6 -2
src/darlingserver/internal-include/darlingserver/message.hpp
··· 27 27 #include <deque> 28 28 #include <mutex> 29 29 #include <optional> 30 + #include <functional> 30 31 31 32 namespace DarlingServer { 32 33 class Address { ··· 157 158 private: 158 159 std::deque<Message> _messages; 159 160 std::mutex _lock; 161 + std::function<void()> _messageArrivalNotificationCallback = nullptr; 160 162 161 163 public: 164 + void setMessageArrivalNotificationCallback(std::function<void()> messageArrivalNotificationCallback); 165 + 162 166 void push(Message&& message); 163 167 std::optional<Message> pop(); 164 168 165 - void sendMany(int socket); 166 - void receiveMany(int socket); 169 + bool sendMany(int socket); 170 + bool receiveMany(int socket); 167 171 }; 168 172 }; 169 173
+3
src/darlingserver/internal-include/darlingserver/server.hpp
··· 40 40 MessageQueue _inbox; 41 41 MessageQueue _outbox; 42 42 WorkQueue<Message> _workQueue; 43 + bool _canRead = false; 44 + bool _canWrite = true; 45 + int _wakeupFD; 43 46 44 47 void _worker(DarlingServer::Message message); 45 48
+33 -4
src/darlingserver/src/message.cpp
··· 453 453 }; 454 454 455 455 void DarlingServer::MessageQueue::push(Message&& message) { 456 - std::scoped_lock lock(_lock); 456 + std::unique_lock lock(_lock); 457 457 _messages.push_back(std::move(message)); 458 + 459 + if (_messages.size() > 0) { 460 + auto callback = _messageArrivalNotificationCallback; 461 + lock.unlock(); 462 + if (callback) { 463 + callback(); 464 + } 465 + } 458 466 }; 459 467 460 468 std::optional<DarlingServer::Message> DarlingServer::MessageQueue::pop() { ··· 468 476 } 469 477 }; 470 478 471 - void DarlingServer::MessageQueue::sendMany(int socket) { 479 + bool DarlingServer::MessageQueue::sendMany(int socket) { 480 + bool canSendMore = true; 472 481 std::scoped_lock lock(_lock); 473 482 struct mmsghdr mmsgs[16]; 474 483 size_t len = 0; ··· 490 499 491 500 if (ret < 0) { 492 501 if (errno == EAGAIN) { 502 + canSendMore = false; 493 503 break; 494 504 } else if (errno == EINTR) { 495 505 ret = 0; ··· 502 512 _messages.pop_front(); 503 513 } 504 514 } 515 + 516 + return canSendMore; 505 517 }; 506 518 507 - void DarlingServer::MessageQueue::receiveMany(int socket) { 508 - std::scoped_lock lock(_lock); 519 + bool DarlingServer::MessageQueue::receiveMany(int socket) { 520 + bool canReadMore = true; 521 + std::unique_lock lock(_lock); 509 522 struct mmsghdr mmsgs[16]; 510 523 int ret = 0; 511 524 ··· 521 534 522 535 if (ret < 0) { 523 536 if (errno == EAGAIN) { 537 + canReadMore = false; 524 538 break; 525 539 } else if (errno == EINTR) { 526 540 ret = 0; ··· 536 550 _messages.push_back(std::move(messages[i])); 537 551 } 538 552 } 553 + 554 + if (_messages.size() > 0) { 555 + auto callback = _messageArrivalNotificationCallback; 556 + lock.unlock(); 557 + if (callback) { 558 + callback(); 559 + } 560 + } 561 + 562 + return canReadMore; 563 + }; 564 + 565 + void DarlingServer::MessageQueue::setMessageArrivalNotificationCallback(std::function<void()> messageArrivalNotificationCallback) { 566 + std::unique_lock lock(_lock); 567 + _messageArrivalNotificationCallback = messageArrivalNotificationCallback; 539 568 };
+2 -1
src/darlingserver/src/process.cpp
··· 41 41 void DarlingServer::Process::_unregisterThreads() { 42 42 std::unique_lock lock(_rwlock); 43 43 while (!_threads.empty()) { 44 - auto thread = _threads.front().lock(); 44 + auto thread = _threads.back().lock(); 45 45 lock.unlock(); 46 46 if (thread) { 47 47 thread->_process = std::weak_ptr<Process>(); 48 48 threadRegistry().unregisterEntry(thread); 49 49 } 50 50 lock.lock(); 51 + _threads.pop_back(); 51 52 } 52 53 }; 53 54
+52 -8
src/darlingserver/src/server.cpp
··· 30 30 #include <thread> 31 31 #include <array> 32 32 #include <darlingserver/registry.hpp> 33 + #include <sys/eventfd.h> 33 34 34 35 static DarlingServer::Server* sharedInstancePointer = nullptr; 35 36 ··· 63 64 throw std::system_error(errno, std::generic_category(), "Failed to bind socket"); 64 65 } 65 66 67 + _wakeupFD = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); 68 + if (_wakeupFD < 0) { 69 + throw std::system_error(errno, std::generic_category(), "Failed to create eventfd for on-demand epoll wakeups"); 70 + } 71 + 66 72 _epollFD = epoll_create1(EPOLL_CLOEXEC); 67 73 if (_epollFD < 0) { 68 74 throw std::system_error(errno, std::generic_category(), "Failed to create epoll context"); ··· 70 76 71 77 struct epoll_event settings; 72 78 settings.data.ptr = this; 73 - settings.events = EPOLLIN | EPOLLOUT; 79 + settings.events = EPOLLIN | EPOLLOUT | EPOLLET; 74 80 75 81 if (epoll_ctl(_epollFD, EPOLL_CTL_ADD, _listenerSocket, &settings) < 0) { 76 82 throw std::system_error(errno, std::generic_category(), "Failed to add listener socket to epoll context"); 77 83 } 84 + 85 + settings.data.ptr = &_wakeupFD; 86 + settings.events = EPOLLIN | EPOLLONESHOT; 87 + 88 + if (epoll_ctl(_epollFD, EPOLL_CTL_ADD, _wakeupFD, &settings) < 0) { 89 + throw std::system_error(errno, std::generic_category(), "Failed to add eventfd to epoll context"); 90 + } 91 + 92 + _outbox.setMessageArrivalNotificationCallback([this]() { 93 + // we don't really have to worry about the eventfd overflowing; 94 + // if it does, that means the main loop has been waiting a LONG time for the listener socket to become writable again. 95 + // in that case, we don't really care if the eventfd is being incremented; we can't send anything anyways. 96 + // once the socket becomes writable again, the eventfd will be monitored again. 97 + eventfd_write(_wakeupFD, 1); 98 + }); 78 99 }; 79 100 80 101 DarlingServer::Server::~Server() { 81 102 close(_epollFD); 103 + close(_wakeupFD); 82 104 close(_listenerSocket); 83 105 unlink(_socketPath.c_str()); 84 106 }; 85 107 86 108 void DarlingServer::Server::start() { 87 109 while (true) { 110 + if (_canRead) { 111 + _canRead = _inbox.receiveMany(_listenerSocket); 112 + 113 + // TODO: receive messages directly onto the work queue 114 + while (auto msg = _inbox.pop()) { 115 + _workQueue.push(std::move(msg.value())); 116 + } 117 + } 118 + 119 + if (_canWrite) { 120 + // reset the eventfd by reading from it 121 + eventfd_t value; 122 + eventfd_read(_wakeupFD, &value); 123 + _canWrite = _outbox.sendMany(_listenerSocket); 124 + } 125 + 126 + struct epoll_event settings; 127 + settings.data.ptr = &_wakeupFD; 128 + settings.events = (_canWrite) ? (EPOLLIN | EPOLLONESHOT) : 0; 129 + 130 + if (epoll_ctl(_epollFD, EPOLL_CTL_MOD, _wakeupFD, &settings) < 0) { 131 + throw std::system_error(errno, std::generic_category(), "Failed to modify eventfd in epoll context"); 132 + } 133 + 88 134 struct epoll_event events[16]; 89 135 int ret = epoll_wait(_epollFD, events, 16, -1); 90 136 ··· 101 147 102 148 if (event->data.ptr == this) { 103 149 if (event->events & EPOLLIN) { 104 - _inbox.receiveMany(_listenerSocket); 105 - 106 - // TODO: receive messages directly onto the work queue 107 - while (auto msg = _inbox.pop()) { 108 - _workQueue.push(std::move(msg.value())); 109 - } 150 + _canRead = true; 110 151 } 111 152 112 153 if (event->events & EPOLLOUT) { 113 - _outbox.sendMany(_listenerSocket); 154 + _canWrite = true; 114 155 } 156 + } else if (event->data.ptr == &_wakeupFD) { 157 + // we allow the loop to go back to the top and try to send some messages 158 + // (if _canWrite is true, the eventfd will be reset; otherwise, there's no point in resetting it) 115 159 } else if (event->events & EPOLLIN) { 116 160 std::shared_ptr<Process>& process = *reinterpret_cast<std::shared_ptr<Process>*>(event->data.ptr); 117 161
+4
src/startup/mldr/CMakeLists.txt
··· 4 4 5 5 include_directories(include) 6 6 7 + set_source_files_properties(${CMAKE_BINARY_DIR}/src/darlingserver/src/rpc.c PROPERTIES 8 + GENERATED TRUE 9 + ) 10 + 7 11 add_library(mldr_dserver_rpc ${CMAKE_BINARY_DIR}/src/darlingserver/src/rpc.c) 8 12 add_library(mldr32_dserver_rpc ${CMAKE_BINARY_DIR}/src/darlingserver/src/rpc.c) 9 13