@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.) hq.recaptime.dev/wiki/Phorge
phorge phabricator
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Namespace Aphlict clients by request path, plus other fixes

Summary:
Fixes T7130. Fixes T7041. Fixes T7012.

Major change here is partitioning clients. In the Phacility cluster, being able to get a huge pile of instances on a single server -- without needing to run a process per instance -- is desirable.

To accomplish this, just bucket clients by the path they connect with. This will let us set client URIs to `/instancename/` and then route connections to a small set of servers. This degrades cleanly in the common case and has no effect on installs which don't do instancing.

Also fix two unrelated issues:

- Fix the timeouts, which were incorrectly initializing in `open()` (which is called during reconnect, causing them to reset every time). Instead, initialize in the constructor. Cap timeout at 5 minutes.
- Probably fix subscriptions, which were using a property with an object definition. Since this is by-ref, all concrete instances of the object share the same property, so all users would be subscribed to everything. Probably.

Test Plan:
- Hit notification status page, saw version bump and instance/path name.
- Saw instance/path name in client and server logs.
- Stopped server, saw reconnects after 2, 4, 16, ... seconds.
- Sent test notification; received test notification.
- Didn't explicitly test the subscription thing but it should be obvious by looking at `/notification/status/` shortly after a push.

Reviewers: joshuaspence, btrahan

Reviewed By: btrahan

Subscribers: epriestley

Maniphest Tasks: T7041, T7012, T7130

Differential Revision: https://secure.phabricator.com/D11769

+76 -43
+11 -11
resources/celerity/map.php
··· 7 7 */ 8 8 return array( 9 9 'names' => array( 10 - 'core.pkg.css' => 'f8f4b8dc', 11 - 'core.pkg.js' => '65e04767', 10 + 'core.pkg.css' => '86353aff', 11 + 'core.pkg.js' => '23d653bb', 12 12 'darkconsole.pkg.js' => '8ab24e01', 13 13 'differential.pkg.css' => '380f07e5', 14 14 'differential.pkg.js' => '7b5a4aa4', ··· 124 124 'rsrc/css/phui/phui-action-list.css' => '9ee9910a', 125 125 'rsrc/css/phui/phui-action-panel.css' => '4bcb288d', 126 126 'rsrc/css/phui/phui-box.css' => '7b3a2eed', 127 - 'rsrc/css/phui/phui-button.css' => 'ffe12633', 127 + 'rsrc/css/phui/phui-button.css' => '008ba5e2', 128 128 'rsrc/css/phui/phui-crumbs-view.css' => '594d719e', 129 - 'rsrc/css/phui/phui-document.css' => '8240b0b1', 129 + 'rsrc/css/phui/phui-document.css' => 'a494bdf8', 130 130 'rsrc/css/phui/phui-error-view.css' => 'ad042fdd', 131 131 'rsrc/css/phui/phui-feed-story.css' => 'c9f3a0b5', 132 132 'rsrc/css/phui/phui-fontkit.css' => '9ae12677', ··· 210 210 'rsrc/externals/javelin/lib/Scrollbar.js' => '5b2f5a08', 211 211 'rsrc/externals/javelin/lib/URI.js' => '6eff08aa', 212 212 'rsrc/externals/javelin/lib/Vector.js' => '2caa8fb8', 213 - 'rsrc/externals/javelin/lib/WebSocket.js' => '3f840822', 213 + 'rsrc/externals/javelin/lib/WebSocket.js' => 'e292eaf4', 214 214 'rsrc/externals/javelin/lib/Workflow.js' => '84d6aea0', 215 215 'rsrc/externals/javelin/lib/__tests__/Cookie.js' => '5ed109e8', 216 216 'rsrc/externals/javelin/lib/__tests__/DOM.js' => 'c984504b', ··· 697 697 'javelin-view-interpreter' => 'f829edb3', 698 698 'javelin-view-renderer' => '6c2b09a2', 699 699 'javelin-view-visitor' => 'efe49472', 700 - 'javelin-websocket' => '3f840822', 700 + 'javelin-websocket' => 'e292eaf4', 701 701 'javelin-workflow' => '84d6aea0', 702 702 'lightbox-attachment-css' => '7acac05d', 703 703 'maniphest-batch-editor' => '8f380ebc', ··· 773 773 'phui-action-header-view-css' => '89c497e7', 774 774 'phui-action-panel-css' => '4bcb288d', 775 775 'phui-box-css' => '7b3a2eed', 776 - 'phui-button-css' => 'ffe12633', 776 + 'phui-button-css' => '008ba5e2', 777 777 'phui-calendar-css' => '8675968e', 778 778 'phui-calendar-day-css' => 'de035c8a', 779 779 'phui-calendar-list-css' => 'c1d0ca59', 780 780 'phui-calendar-month-css' => 'a92e47d2', 781 781 'phui-crumbs-view-css' => '594d719e', 782 - 'phui-document-view-css' => '8240b0b1', 782 + 'phui-document-view-css' => 'a494bdf8', 783 783 'phui-error-view-css' => 'ad042fdd', 784 784 'phui-feed-story-css' => 'c9f3a0b5', 785 785 'phui-font-icon-base-css' => '3dad2ae3', ··· 1063 1063 'javelin-behavior', 1064 1064 'javelin-dom', 1065 1065 'phortune-credit-card-form', 1066 - ), 1067 - '3f840822' => array( 1068 - 'javelin-install', 1069 1066 ), 1070 1067 '40a6a403' => array( 1071 1068 'javelin-install', ··· 1806 1803 'javelin-behavior', 1807 1804 'javelin-stratcom', 1808 1805 'javelin-dom', 1806 + ), 1807 + 'e292eaf4' => array( 1808 + 'javelin-install', 1809 1809 ), 1810 1810 'e32d14ab' => array( 1811 1811 'javelin-behavior',
+10 -3
src/applications/notification/client/PhabricatorNotificationClient.php
··· 2 2 3 3 final class PhabricatorNotificationClient { 4 4 5 - const EXPECT_VERSION = 6; 5 + const EXPECT_VERSION = 7; 6 6 7 7 public static function getServerStatus() { 8 8 $uri = PhabricatorEnv::getEnvConfig('notification.server-uri'); 9 9 $uri = id(new PhutilURI($uri)) 10 - ->setPath('/status/'); 10 + ->setPath('/status/') 11 + ->setQueryParam('instance', self::getInstance()); 11 12 12 13 list($body) = id(new HTTPSFuture($uri)) 13 14 ->setTimeout(3) ··· 40 41 private static function postMessage(array $data) { 41 42 $server_uri = PhabricatorEnv::getEnvConfig('notification.server-uri'); 42 43 $server_uri = id(new PhutilURI($server_uri)) 43 - ->setPath('/'); 44 + ->setPath('/') 45 + ->setQueryParam('instance', self::getInstance()); 44 46 45 47 id(new HTTPSFuture($server_uri, json_encode($data))) 46 48 ->setMethod('POST') 47 49 ->setTimeout(1) 48 50 ->resolvex(); 51 + } 52 + 53 + private static function getInstance() { 54 + $client_uri = PhabricatorEnv::getEnvConfig('notification.client-uri'); 55 + return id(new PhutilURI($client_uri))->getPath(); 49 56 } 50 57 51 58 }
+1
src/applications/notification/controller/PhabricatorNotificationStatusController.php
··· 44 44 $value = phutil_format_relative_time_detailed($value); 45 45 break; 46 46 case 'log': 47 + case 'instance': 47 48 break; 48 49 default: 49 50 $value = number_format($value);
+23 -18
support/aphlict/server/lib/AphlictAdminServer.js
··· 5 5 require('./AphlictListenerList'); 6 6 7 7 var http = require('http'); 8 + var url = require('url'); 8 9 9 10 JX.install('AphlictAdminServer', { 10 11 ··· 25 26 _server: null, 26 27 _startTime: null, 27 28 28 - getListeners: function() { 29 - return this.getListenerList().getListeners(); 30 - }, 31 - 32 - getListenerList: function() { 33 - return this.getClientServer().getListenerList(); 29 + getListenerList: function(instance) { 30 + return this.getClientServer().getListenerList(instance); 34 31 }, 35 32 36 33 listen: function() { ··· 39 36 40 37 _handler: function(request, response) { 41 38 var self = this; 39 + var u = url.parse(request.url, true); 40 + var instance = u.query.instance || '/'; 42 41 43 42 // Publishing a notification. 44 - if (request.url == '/') { 43 + if (u.pathname == '/') { 45 44 if (request.method == 'POST') { 46 45 var body = ''; 47 46 ··· 54 53 var msg = JSON.parse(body); 55 54 56 55 self.getLogger().log( 57 - 'Received notification: ' + JSON.stringify(msg)); 56 + 'Received notification (' + instance + '): ' + 57 + JSON.stringify(msg)); 58 58 ++self._messagesIn; 59 59 60 60 try { 61 - self._transmit(msg); 61 + self._transmit(instance, msg); 62 62 response.writeHead(200, {'Content-Type': 'text/plain'}); 63 63 } catch (err) { 64 64 self.getLogger().log( ··· 81 81 response.writeHead(405, 'Method Not Allowed'); 82 82 response.end(); 83 83 } 84 - } else if (request.url == '/status/') { 84 + } else if (u.pathname == '/status/') { 85 85 var status = { 86 + 'instance': instance, 86 87 'uptime': (new Date().getTime() - this._startTime), 87 - 'clients.active': this.getListenerList().getActiveListenerCount(), 88 - 'clients.total': this.getListenerList().getTotalListenerCount(), 88 + 'clients.active': this.getListenerList(instance) 89 + .getActiveListenerCount(), 90 + 'clients.total': this.getListenerList(instance) 91 + .getTotalListenerCount(), 89 92 'messages.in': this._messagesIn, 90 93 'messages.out': this._messagesOut, 91 - 'version': 6 94 + 'version': 7 92 95 }; 93 96 94 97 response.writeHead(200, {'Content-Type': 'application/json'}); ··· 103 106 /** 104 107 * Transmits a message to all subscribed listeners. 105 108 */ 106 - _transmit: function(message) { 107 - var listeners = this.getListeners().filter(function(client) { 108 - return client.isSubscribedToAny(message.subscribers); 109 - }); 109 + _transmit: function(instance, message) { 110 + var listeners = this.getListenerList(instance) 111 + .getListeners() 112 + .filter(function(client) { 113 + return client.isSubscribedToAny(message.subscribers); 114 + }); 110 115 111 116 for (var i = 0; i < listeners.length; i++) { 112 117 var listener = listeners[i]; ··· 119 124 '<%s> Wrote Message', 120 125 listener.getDescription()); 121 126 } catch (error) { 122 - this.getListenerList().removeListener(listener); 127 + this.getListenerList(instance).removeListener(listener); 123 128 this.getLogger().log( 124 129 '<%s> Write Error: %s', 125 130 listener.getDescription(),
+14 -5
support/aphlict/server/lib/AphlictClientServer.js
··· 5 5 require('./AphlictListenerList'); 6 6 require('./AphlictLog'); 7 7 8 + var url = require('url'); 8 9 var util = require('util'); 9 10 var WebSocket = require('ws'); 10 11 11 12 JX.install('AphlictClientServer', { 12 13 13 14 construct: function(server) { 14 - this.setListenerList(new JX.AphlictListenerList()); 15 15 this.setLogger(new JX.AphlictLog()); 16 16 this._server = server; 17 + this._lists = {}; 17 18 }, 18 19 19 20 members: { 20 21 _server: null, 22 + _lists: null, 23 + 24 + getListenerList: function(path) { 25 + if (!this._lists[path]) { 26 + this._lists[path] = new JX.AphlictListenerList(path); 27 + } 28 + return this._lists[path]; 29 + }, 21 30 22 31 listen: function() { 23 32 var self = this; ··· 25 34 var wss = new WebSocket.Server({server: server}); 26 35 27 36 wss.on('connection', function(ws) { 28 - var listener = self.getListenerList().addListener(ws); 37 + var path = url.parse(ws.upgradeReq.url).pathname; 38 + var listener = self.getListenerList(path).addListener(ws); 29 39 30 40 function log() { 31 41 self.getLogger().log( ··· 70 80 }); 71 81 72 82 ws.on('close', function() { 73 - self.getListenerList().removeListener(listener); 83 + self.getListenerList(path).removeListener(listener); 74 84 log('Disconnected.'); 75 85 }); 76 86 77 87 wss.on('close', function() { 78 - self.getListenerList().removeListener(listener); 88 + self.getListenerList(path).removeListener(listener); 79 89 log('Disconnected.'); 80 90 }); 81 91 ··· 90 100 }, 91 101 92 102 properties: { 93 - listenerList: null, 94 103 logger: null, 95 104 } 96 105
+6 -3
support/aphlict/server/lib/AphlictListener.js
··· 3 3 var JX = require('./javelin').JX; 4 4 5 5 JX.install('AphlictListener', { 6 - construct: function(id, socket) { 6 + construct: function(id, socket, path) { 7 7 this._id = id; 8 8 this._socket = socket; 9 + this._path = path; 10 + this._subscriptions = {}; 9 11 }, 10 12 11 13 members: { 12 14 _id: null, 13 15 _socket: null, 14 - _subscriptions: {}, 16 + _path: null, 17 + _subscriptions: null, 15 18 16 19 getID: function() { 17 20 return this._id; ··· 47 50 }, 48 51 49 52 getDescription: function() { 50 - return 'Listener/' + this.getID(); 53 + return 'Listener/' + this.getID() + this._path; 51 54 }, 52 55 53 56 writeMessage: function(message) {
+7 -2
support/aphlict/server/lib/AphlictListenerList.js
··· 5 5 require('./AphlictListener'); 6 6 7 7 JX.install('AphlictListenerList', { 8 - construct: function() { 8 + construct: function(path) { 9 + this._path = path; 9 10 this._listeners = {}; 10 11 }, 11 12 12 13 members: { 13 14 _listeners: null, 15 + _path: null, 14 16 _nextID: 0, 15 17 _totalListenerCount: 0, 16 18 17 19 addListener: function(socket) { 18 - var listener = new JX.AphlictListener(this._generateNextID(), socket); 20 + var listener = new JX.AphlictListener( 21 + this._generateNextID(), 22 + socket, 23 + this._path); 19 24 20 25 this._listeners[listener.getID()] = listener; 21 26 this._totalListenerCount++;
+4 -1
webroot/rsrc/externals/javelin/lib/WebSocket.js
··· 11 11 12 12 construct: function(uri) { 13 13 this.setURI(uri); 14 + this._resetDelay(); 14 15 }, 15 16 16 17 properties: { ··· 70 71 } 71 72 72 73 this._shouldClose = false; 73 - this._resetDelay(); 74 74 75 75 this._socket = new WebSocket(this.getURI()); 76 76 this._socket.onopen = JX.bind(this, this._onopen); ··· 171 171 // connection, the close handler will send us back here. We'll reconnect 172 172 // more and more slowly until we eventually get a valid connection. 173 173 this._delayUntilReconnect = this._delayUntilReconnect * 2; 174 + 175 + // Max out at 5 minutes between attempts. 176 + this._delayUntilReconnect = Math.min(this._delayUntilReconnect, 300000); 174 177 this.open(); 175 178 } 176 179