@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.)
hq.recaptime.dev/wiki/Phorge
phorge
phabricator
1'use strict';
2
3var JX = require('./javelin').JX;
4
5require('./AphlictListenerList');
6
7var url = require('url');
8var util = require('util');
9var WebSocket = require('ws');
10
11JX.install('AphlictClientServer', {
12
13 construct: function(server) {
14 server.on('request', JX.bind(this, this._onrequest));
15
16 this._server = server;
17 this._lists = {};
18 this._adminServers = [];
19 },
20
21 properties: {
22 logger: null,
23 adminServers: null
24 },
25
26 members: {
27 _server: null,
28 _lists: null,
29
30 getListenerList: function(instance) {
31 if (!this._lists[instance]) {
32 this._lists[instance] = new JX.AphlictListenerList(instance);
33 }
34 return this._lists[instance];
35 },
36
37 getHistory: function(age) {
38 var results = [];
39
40 var servers = this.getAdminServers();
41 for (var ii = 0; ii < servers.length; ii++) {
42 var messages = servers[ii].getHistory(age);
43 for (var jj = 0; jj < messages.length; jj++) {
44 results.push(messages[jj]);
45 }
46 }
47
48 return results;
49 },
50
51 log: function() {
52 var logger = this.getLogger();
53 if (!logger) {
54 return;
55 }
56
57 logger.log.apply(logger, arguments);
58
59 return this;
60 },
61
62 trace: function() {
63 var logger = this.getLogger();
64 if (!logger) {
65 return;
66 }
67
68 logger.trace.apply(logger, arguments);
69
70 return this;
71 },
72
73 _onrequest: function(request, response) {
74 // The websocket code upgrades connections before they get here, so
75 // this only handles normal HTTP connections. We just fail them with
76 // a 501 response.
77 response.writeHead(501);
78 response.end('HTTP/501 Use Websockets\n');
79 },
80
81 _parseInstanceFromPath: function(path) {
82 // If there's no "~" marker in the path, it's not an instance name.
83 // Users sometimes configure nginx or Apache to proxy based on the
84 // path.
85 if (path.indexOf('~') === -1) {
86 return 'default';
87 }
88
89 var instance = path.split('~')[1];
90
91 // Remove any "/" characters.
92 instance = instance.replace(/\//g, '');
93 if (!instance.length) {
94 return 'default';
95 }
96
97 return instance;
98 },
99
100 listen: function() {
101 var self = this;
102 var server = this._server.listen.apply(this._server, arguments);
103 var wss = new WebSocket.Server({server: server});
104
105 // This function checks for upgradeReq which is only available in
106 // ws2 by default, not ws3. See T12755 for more information.
107 wss.on('connection', function(ws, request) {
108 if ('upgradeReq' in ws) {
109 request = ws.upgradeReq;
110 }
111
112 var path = url.parse(request.url).pathname;
113 var instance = self._parseInstanceFromPath(path);
114
115 var listener = self.getListenerList(instance).addListener(ws);
116
117 function msg(argv) {
118 return util.format('<%s>', listener.getDescription()) +
119 ' ' +
120 util.format.apply(null, argv);
121 }
122
123 function log() {
124 self.log(msg(arguments));
125 }
126
127 function trace() {
128 self.trace(msg(arguments));
129 }
130
131 trace('Connected from %s.', ws._socket.remoteAddress);
132
133 ws.on('message', function(data) {
134 trace('Received message: %s', data);
135
136 var message;
137 try {
138 message = JSON.parse(data);
139 } catch (err) {
140 log('Message is invalid: %s', err.message);
141 return;
142 }
143
144 switch (message.command) {
145 case 'subscribe':
146 trace(
147 'Subscribed to: %s',
148 JSON.stringify(message.data));
149 listener.subscribe(message.data);
150 break;
151
152 case 'unsubscribe':
153 trace(
154 'Unsubscribed from: %s',
155 JSON.stringify(message.data));
156 listener.unsubscribe(message.data);
157 break;
158
159 case 'replay':
160 var age = message.data.age || 60000;
161 var min_age = (new Date().getTime() - age);
162
163 var old_messages = self.getHistory(min_age);
164 for (var ii = 0; ii < old_messages.length; ii++) {
165 var old_message = old_messages[ii];
166
167 if (!listener.isSubscribedToAny(old_message.subscribers)) {
168 continue;
169 }
170
171 try {
172 listener.writeMessage(old_message);
173 } catch (error) {
174 break;
175 }
176 }
177 break;
178
179 case 'ping':
180 var pong = {
181 type: 'pong'
182 };
183
184 try {
185 listener.writeMessage(pong);
186 } catch (error) {
187 // Ignore any issues here, we'll clean up elsewhere.
188 }
189 break;
190
191 default:
192 log(
193 'Unrecognized command "%s".',
194 message.command || '<undefined>');
195 }
196 });
197
198 ws.on('close', function() {
199 self.getListenerList(instance).removeListener(listener);
200 trace('Disconnected.');
201 });
202 });
203
204 }
205 }
206
207});