@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.) hq.recaptime.dev/wiki/Phorge
phorge phabricator
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at upstream/main 207 lines 5.2 kB view raw
1'use strict'; 2 3var JX = require('./javelin').JX; 4 5require('./AphlictListenerList'); 6 7var url = require('url'); 8var util = require('util'); 9var WebSocket = require('ws'); 10 11JX.install('AphlictClientServer', { 12 13 construct: function(server) { 14 server.on('request', JX.bind(this, this._onrequest)); 15 16 this._server = server; 17 this._lists = {}; 18 this._adminServers = []; 19 }, 20 21 properties: { 22 logger: null, 23 adminServers: null 24 }, 25 26 members: { 27 _server: null, 28 _lists: null, 29 30 getListenerList: function(instance) { 31 if (!this._lists[instance]) { 32 this._lists[instance] = new JX.AphlictListenerList(instance); 33 } 34 return this._lists[instance]; 35 }, 36 37 getHistory: function(age) { 38 var results = []; 39 40 var servers = this.getAdminServers(); 41 for (var ii = 0; ii < servers.length; ii++) { 42 var messages = servers[ii].getHistory(age); 43 for (var jj = 0; jj < messages.length; jj++) { 44 results.push(messages[jj]); 45 } 46 } 47 48 return results; 49 }, 50 51 log: function() { 52 var logger = this.getLogger(); 53 if (!logger) { 54 return; 55 } 56 57 logger.log.apply(logger, arguments); 58 59 return this; 60 }, 61 62 trace: function() { 63 var logger = this.getLogger(); 64 if (!logger) { 65 return; 66 } 67 68 logger.trace.apply(logger, arguments); 69 70 return this; 71 }, 72 73 _onrequest: function(request, response) { 74 // The websocket code upgrades connections before they get here, so 75 // this only handles normal HTTP connections. We just fail them with 76 // a 501 response. 77 response.writeHead(501); 78 response.end('HTTP/501 Use Websockets\n'); 79 }, 80 81 _parseInstanceFromPath: function(path) { 82 // If there's no "~" marker in the path, it's not an instance name. 83 // Users sometimes configure nginx or Apache to proxy based on the 84 // path. 85 if (path.indexOf('~') === -1) { 86 return 'default'; 87 } 88 89 var instance = path.split('~')[1]; 90 91 // Remove any "/" characters. 92 instance = instance.replace(/\//g, ''); 93 if (!instance.length) { 94 return 'default'; 95 } 96 97 return instance; 98 }, 99 100 listen: function() { 101 var self = this; 102 var server = this._server.listen.apply(this._server, arguments); 103 var wss = new WebSocket.Server({server: server}); 104 105 // This function checks for upgradeReq which is only available in 106 // ws2 by default, not ws3. See T12755 for more information. 107 wss.on('connection', function(ws, request) { 108 if ('upgradeReq' in ws) { 109 request = ws.upgradeReq; 110 } 111 112 var path = url.parse(request.url).pathname; 113 var instance = self._parseInstanceFromPath(path); 114 115 var listener = self.getListenerList(instance).addListener(ws); 116 117 function msg(argv) { 118 return util.format('<%s>', listener.getDescription()) + 119 ' ' + 120 util.format.apply(null, argv); 121 } 122 123 function log() { 124 self.log(msg(arguments)); 125 } 126 127 function trace() { 128 self.trace(msg(arguments)); 129 } 130 131 trace('Connected from %s.', ws._socket.remoteAddress); 132 133 ws.on('message', function(data) { 134 trace('Received message: %s', data); 135 136 var message; 137 try { 138 message = JSON.parse(data); 139 } catch (err) { 140 log('Message is invalid: %s', err.message); 141 return; 142 } 143 144 switch (message.command) { 145 case 'subscribe': 146 trace( 147 'Subscribed to: %s', 148 JSON.stringify(message.data)); 149 listener.subscribe(message.data); 150 break; 151 152 case 'unsubscribe': 153 trace( 154 'Unsubscribed from: %s', 155 JSON.stringify(message.data)); 156 listener.unsubscribe(message.data); 157 break; 158 159 case 'replay': 160 var age = message.data.age || 60000; 161 var min_age = (new Date().getTime() - age); 162 163 var old_messages = self.getHistory(min_age); 164 for (var ii = 0; ii < old_messages.length; ii++) { 165 var old_message = old_messages[ii]; 166 167 if (!listener.isSubscribedToAny(old_message.subscribers)) { 168 continue; 169 } 170 171 try { 172 listener.writeMessage(old_message); 173 } catch (error) { 174 break; 175 } 176 } 177 break; 178 179 case 'ping': 180 var pong = { 181 type: 'pong' 182 }; 183 184 try { 185 listener.writeMessage(pong); 186 } catch (error) { 187 // Ignore any issues here, we'll clean up elsewhere. 188 } 189 break; 190 191 default: 192 log( 193 'Unrecognized command "%s".', 194 message.command || '<undefined>'); 195 } 196 }); 197 198 ws.on('close', function() { 199 self.getListenerList(instance).removeListener(listener); 200 trace('Disconnected.'); 201 }); 202 }); 203 204 } 205 } 206 207});