@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.) hq.recaptime.dev/wiki/Phorge
phorge phabricator
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Begin generalizing Aphlict server to prepare for clustering/sensible config file

Summary:
Ref T10697. Currently, `aphlict` takes a ton of command line flags to configure exactly one admin server and exactly one client server.

I want to replace this with a config file. Additionally, I plan to support:

- arbitrary numbers of listening client ports;
- arbitrary numbers of listening admin ports;
- SSL on any port.

For now, just transform the arguments to look like they're a config file. In the future, I'll load from a config file instead.

This greater generality will allow you to do stuff like run separate HTTP and HTTPS admin ports if you really want. I don't think there's a ton of use for this, but it tends to make the code cleaner anyway and there may be some weird cross-datacneter cases for it. Certainly, we undershot with the initial design and lots of users want to terminate SSL in nginx and run only HTTP on this server.

(Some sort-of-plausible use cases are running separate HTTP and HTTPS client servers, if your Phabricator install supports both, or running multiple HTTPS servers with different certificates if you have a bizarre VPN.)

Test Plan: Started Aphlict, connected to it, sent myself test notifications, viewed status page, reviewed logfile.

Reviewers: chad

Reviewed By: chad

Maniphest Tasks: T10697

Differential Revision: https://secure.phabricator.com/D15700

+167 -67
+62 -19
support/aphlict/server/aphlict_server.js
··· 99 99 if (ssl_config.enabled) { 100 100 ssl_config.key = fs.readFileSync(config['ssl-key']); 101 101 ssl_config.cert = fs.readFileSync(config['ssl-cert']); 102 + } else { 103 + ssl_config.key = null; 104 + ssl_config.cert = null; 102 105 } 106 + 107 + var servers = []; 108 + 109 + servers.push({ 110 + type: 'client', 111 + port: config['client-port'], 112 + listen: config['client-host'], 113 + 'ssl.key': ssl_config.key, 114 + 'ssl.certificate': ssl_config.cert 115 + }); 116 + 117 + servers.push({ 118 + type: 'admin', 119 + port: config['admin-port'], 120 + listen: config['admin-host'], 121 + 'ssl.key': null, 122 + 'ssl.cert': null 123 + }); 103 124 104 125 // If we're just doing a configuration test, exit here before starting any 105 126 // servers. ··· 109 130 return; 110 131 } 111 132 112 - var server; 113 - if (ssl_config.enabled) { 114 - server = https.createServer({ 115 - key: ssl_config.key, 116 - cert: ssl_config.cert 117 - }, function(req, res) { 118 - res.writeHead(501); 119 - res.end('HTTP/501 Use Websockets\n'); 120 - }); 121 - } else { 122 - server = http.createServer(function() {}); 123 - } 133 + var aphlict_servers = []; 134 + var aphlict_clients = []; 135 + var aphlict_admins = []; 136 + 137 + var ii; 138 + for (ii = 0; ii < servers.length; ii++) { 139 + var server = servers[ii]; 140 + var is_client = (server.type == 'client'); 141 + 142 + var http_server; 143 + if (server['ssl.key']) { 144 + var https_config = { 145 + key: server['ssl.key'], 146 + cert: server['ssl.cert'] 147 + }; 148 + 149 + http_server = https.createServer(https_config); 150 + } else { 151 + http_server = http.createServer(); 152 + } 124 153 125 - var client_server = new JX.AphlictClientServer(server); 126 - var admin_server = new JX.AphlictAdminServer(); 154 + var aphlict_server; 155 + if (is_client) { 156 + aphlict_server = new JX.AphlictClientServer(http_server); 157 + } else { 158 + aphlict_server = new JX.AphlictAdminServer(http_server); 159 + } 127 160 128 - client_server.setLogger(debug); 129 - admin_server.setLogger(debug); 130 - admin_server.setClientServer(client_server); 161 + aphlict_server.setLogger(debug); 162 + aphlict_server.listen(server.port, server.listen); 131 163 132 - client_server.listen(config['client-port'], config['client-host']); 133 - admin_server.listen(config['admin-port'], config['admin-host']); 164 + aphlict_servers.push(aphlict_server); 165 + 166 + if (is_client) { 167 + aphlict_clients.push(aphlict_server); 168 + } else { 169 + aphlict_admins.push(aphlict_server); 170 + } 171 + } 172 + 173 + for (ii = 0; ii < aphlict_admins.length; ii++) { 174 + var admin_server = aphlict_admins[ii]; 175 + admin_server.setClientServers(aphlict_clients); 176 + } 134 177 135 178 debug.log('Started Server (PID %d)', process.pid);
+79 -42
support/aphlict/server/lib/AphlictAdminServer.js
··· 9 9 10 10 JX.install('AphlictAdminServer', { 11 11 12 - construct: function() { 13 - this.setLogger(new JX.AphlictLog()); 14 - 12 + construct: function(server) { 15 13 this._startTime = new Date().getTime(); 16 14 this._messagesIn = 0; 17 15 this._messagesOut = 0; 18 16 19 - var handler = this._handler.bind(this); 20 - this._server = http.createServer(handler); 17 + server.on('request', JX.bind(this, this._onrequest)); 18 + this._server = server; 19 + this._clientServers = []; 20 + }, 21 + 22 + properties: { 23 + clientServers: null, 24 + logger: null, 21 25 }, 22 26 23 27 members: { ··· 26 30 _server: null, 27 31 _startTime: null, 28 32 29 - getListenerList: function(instance) { 30 - return this.getClientServer().getListenerList(instance); 33 + getListenerLists: function(instance) { 34 + var clients = this.getClientServers(); 35 + 36 + var lists = []; 37 + for (var ii = 0; ii < clients.length; ii++) { 38 + lists.push(clients[ii].getListenerList(instance)); 39 + } 40 + return lists; 41 + }, 42 + 43 + log: function() { 44 + var logger = this.getLogger(); 45 + if (!logger) { 46 + return; 47 + } 48 + 49 + logger.log.apply(logger, arguments); 50 + 51 + return this; 31 52 }, 32 53 33 54 listen: function() { 34 55 return this._server.listen.apply(this._server, arguments); 35 56 }, 36 57 37 - _handler: function(request, response) { 58 + _onrequest: function(request, response) { 38 59 var self = this; 39 60 var u = url.parse(request.url, true); 40 61 var instance = u.query.instance || '/'; ··· 52 73 try { 53 74 var msg = JSON.parse(body); 54 75 55 - self.getLogger().log( 76 + self.log( 56 77 'Received notification (' + instance + '): ' + 57 78 JSON.stringify(msg)); 58 79 ++self._messagesIn; ··· 61 82 self._transmit(instance, msg); 62 83 response.writeHead(200, {'Content-Type': 'text/plain'}); 63 84 } catch (err) { 64 - self.getLogger().log( 85 + self.log( 65 86 '<%s> Internal Server Error! %s', 66 87 request.socket.remoteAddress, 67 88 err); 68 89 response.writeHead(500, 'Internal Server Error'); 69 90 } 70 91 } catch (err) { 71 - self.getLogger().log( 92 + self.log( 72 93 '<%s> Bad Request! %s', 73 94 request.socket.remoteAddress, 74 95 err); ··· 82 103 response.end(); 83 104 } 84 105 } else if (u.pathname == '/status/') { 85 - var status = { 86 - 'instance': instance, 87 - 'uptime': (new Date().getTime() - this._startTime), 88 - 'clients.active': this.getListenerList(instance) 89 - .getActiveListenerCount(), 90 - 'clients.total': this.getListenerList(instance) 91 - .getTotalListenerCount(), 92 - 'messages.in': this._messagesIn, 93 - 'messages.out': this._messagesOut, 94 - 'version': 7 95 - }; 96 - 97 - response.writeHead(200, {'Content-Type': 'application/json'}); 98 - response.write(JSON.stringify(status)); 99 - response.end(); 106 + this._handleStatusRequest(request, response, instance); 100 107 } else { 101 108 response.writeHead(404, 'Not Found'); 102 109 response.end(); 103 110 } 104 111 }, 105 112 113 + _handleStatusRequest: function(request, response, instance) { 114 + var active_count = 0; 115 + var total_count = 0; 116 + 117 + var lists = this.getListenerLists(instance); 118 + for (var ii = 0; ii < lists.length; ii++) { 119 + var list = lists[ii]; 120 + active_count += list.getActiveListenerCount(); 121 + total_count += list.getTotalListenerCount(); 122 + } 123 + 124 + var server_status = { 125 + 'instance': instance, 126 + 'uptime': (new Date().getTime() - this._startTime), 127 + 'clients.active': active_count, 128 + 'clients.total': total_count, 129 + 'messages.in': this._messagesIn, 130 + 'messages.out': this._messagesOut, 131 + 'version': 7 132 + }; 133 + 134 + response.writeHead(200, {'Content-Type': 'application/json'}); 135 + response.write(JSON.stringify(server_status)); 136 + response.end(); 137 + }, 138 + 106 139 /** 107 140 * Transmits a message to all subscribed listeners. 108 141 */ 109 142 _transmit: function(instance, message) { 110 - var listeners = this.getListenerList(instance) 111 - .getListeners() 112 - .filter(function(client) { 113 - return client.isSubscribedToAny(message.subscribers); 114 - }); 143 + var lists = this.getListenerLists(instance); 115 144 116 - for (var i = 0; i < listeners.length; i++) { 117 - var listener = listeners[i]; 145 + for (var ii = 0; ii < lists.length; ii++) { 146 + var list = lists[ii]; 147 + var listeners = list.getListeners(); 148 + this._transmitToListeners(list, listeners, message); 149 + } 150 + }, 151 + 152 + _transmitToListeners: function(list, listeners, message) { 153 + for (var ii = 0; ii < listeners.length; ii++) { 154 + var listener = listeners[ii]; 155 + 156 + if (!listener.isSubscribedToAny(message.subscribers)) { 157 + continue; 158 + } 118 159 119 160 try { 120 161 listener.writeMessage(message); 121 162 122 163 ++this._messagesOut; 123 - this.getLogger().log( 164 + this.log( 124 165 '<%s> Wrote Message', 125 166 listener.getDescription()); 126 167 } catch (error) { 127 - this.getListenerList(instance).removeListener(listener); 128 - this.getLogger().log( 168 + list.removeListener(listener); 169 + 170 + this.log( 129 171 '<%s> Write Error: %s', 130 172 listener.getDescription(), 131 173 error); 132 174 } 133 175 } 134 - }, 135 - }, 136 - 137 - properties: { 138 - clientServer: null, 139 - logger: null, 176 + } 140 177 } 141 178 142 179 });
+26 -6
support/aphlict/server/lib/AphlictClientServer.js
··· 12 12 JX.install('AphlictClientServer', { 13 13 14 14 construct: function(server) { 15 - this.setLogger(new JX.AphlictLog()); 15 + server.on('request', JX.bind(this, this._onrequest)); 16 + 16 17 this._server = server; 17 18 this._lists = {}; 19 + }, 20 + 21 + properties: { 22 + logger: null, 18 23 }, 19 24 20 25 members: { ··· 28 33 return this._lists[path]; 29 34 }, 30 35 36 + log: function() { 37 + var logger = this.getLogger(); 38 + if (!logger) { 39 + return; 40 + } 41 + 42 + logger.log.apply(logger, arguments); 43 + 44 + return this; 45 + }, 46 + 47 + _onrequest: function(request, response) { 48 + // The websocket code upgrades connections before they get here, so 49 + // this only handles normal HTTP connections. We just fail them with 50 + // a 501 response. 51 + response.writeHead(501); 52 + response.end('HTTP/501 Use Websockets\n'); 53 + }, 54 + 31 55 listen: function() { 32 56 var self = this; 33 57 var server = this._server.listen.apply(this._server, arguments); ··· 38 62 var listener = self.getListenerList(path).addListener(ws); 39 63 40 64 function log() { 41 - self.getLogger().log( 65 + self.log( 42 66 util.format('<%s>', listener.getDescription()) + 43 67 ' ' + 44 68 util.format.apply(null, arguments)); ··· 97 121 98 122 }, 99 123 100 - }, 101 - 102 - properties: { 103 - logger: null, 104 124 } 105 125 106 126 });