@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.) hq.recaptime.dev/wiki/Phorge
phorge phabricator
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at recaptime-dev/main 233 lines 7.3 kB view raw
1<?php 2 3/** 4 * Proxy an IO channel to an underlying command, with optional callbacks. This 5 * is a mostly a more general version of @{class:PhutilExecPassthru}. This 6 * class is used to proxy Git, SVN and Mercurial traffic to the commands which 7 * can actually serve it. 8 * 9 * Largely, this just reads an IO channel (like stdin from SSH) and writes 10 * the results into a command channel (like a command's stdin). Then it reads 11 * the command channel (like the command's stdout) and writes it into the IO 12 * channel (like stdout from SSH): 13 * 14 * IO Channel Command Channel 15 * stdin -> stdin 16 * stdout <- stdout 17 * stderr <- stderr 18 * 19 * You can provide **read and write callbacks** which are invoked as data 20 * is passed through this class. They allow you to inspect and modify traffic. 21 * 22 * IO Channel Passthru Command Channel 23 * stdout -> willWrite -> stdin 24 * stdin <- willRead <- stdout 25 * stderr <- (identity) <- stderr 26 * 27 * Primarily, this means: 28 * 29 * - the **IO Channel** can be a @{class:PhutilProtocolChannel} if the 30 * **write callback** can convert protocol messages into strings; and 31 * - the **write callback** can inspect and reject requests over the channel, 32 * e.g. to enforce policies. 33 * 34 * In practice, this is used when serving repositories to check each command 35 * issued over SSH and determine if it is a read command or a write command. 36 * Writes can then be checked for appropriate permissions. 37 */ 38final class PhabricatorSSHPassthruCommand extends Phobject { 39 40 private $commandChannel; 41 private $ioChannel; 42 private $errorChannel; 43 private $execFuture; 44 private $willWriteCallback; 45 private $willReadCallback; 46 private $pauseIOReads; 47 48 public function setCommandChannelFromExecFuture(ExecFuture $exec_future) { 49 $exec_channel = new PhutilExecChannel($exec_future); 50 $exec_channel->setStderrHandler(array($this, 'writeErrorIOCallback')); 51 52 $this->execFuture = $exec_future; 53 $this->commandChannel = $exec_channel; 54 55 return $this; 56 } 57 58 public function setIOChannel(PhutilChannel $io_channel) { 59 $this->ioChannel = $io_channel; 60 return $this; 61 } 62 63 public function setErrorChannel(PhutilChannel $error_channel) { 64 $this->errorChannel = $error_channel; 65 return $this; 66 } 67 68 public function setWillReadCallback($will_read_callback) { 69 $this->willReadCallback = $will_read_callback; 70 return $this; 71 } 72 73 public function setWillWriteCallback($will_write_callback) { 74 $this->willWriteCallback = $will_write_callback; 75 return $this; 76 } 77 78 public function writeErrorIOCallback(PhutilChannel $channel, $data) { 79 $this->errorChannel->write($data); 80 } 81 82 public function setPauseIOReads($pause) { 83 $this->pauseIOReads = $pause; 84 return $this; 85 } 86 87 public function execute() { 88 $command_channel = $this->commandChannel; 89 $io_channel = $this->ioChannel; 90 $error_channel = $this->errorChannel; 91 92 if (!$command_channel) { 93 throw new Exception( 94 pht( 95 'Set a command channel before calling %s!', 96 __FUNCTION__.'()')); 97 } 98 99 if (!$io_channel) { 100 throw new Exception( 101 pht( 102 'Set an IO channel before calling %s!', 103 __FUNCTION__.'()')); 104 } 105 106 if (!$error_channel) { 107 throw new Exception( 108 pht( 109 'Set an error channel before calling %s!', 110 __FUNCTION__.'()')); 111 } 112 113 $channels = array($command_channel, $io_channel, $error_channel); 114 115 // We want to limit the amount of data we'll hold in memory for this 116 // process. See T4241 for a discussion of this issue in general. 117 118 $buffer_size = (1024 * 1024); // 1MB 119 $io_channel->setReadBufferSize($buffer_size); 120 $command_channel->setReadBufferSize($buffer_size); 121 122 // TODO: This just makes us throw away stderr after the first 1MB, but we 123 // don't currently have the support infrastructure to buffer it correctly. 124 // It's difficult to imagine this causing problems in practice, though. 125 $this->execFuture->getStderrSizeLimit($buffer_size); 126 127 while (true) { 128 PhutilChannel::waitForAny($channels); 129 130 $io_channel->update(); 131 $command_channel->update(); 132 $error_channel->update(); 133 134 // If any channel is blocked on the other end, wait for it to flush before 135 // we continue reading. For example, if a user is running `git clone` on 136 // a 1GB repository, the underlying `git-upload-pack` may 137 // be able to produce data much more quickly than we can send it over 138 // the network. If we don't throttle the reads, we may only send a few 139 // MB over the I/O channel in the time it takes to read the entire 1GB off 140 // the command channel. That leaves us with 1GB of data in memory. 141 142 while ($command_channel->isOpen() && 143 $io_channel->isOpenForWriting() && 144 ($command_channel->getWriteBufferSize() >= $buffer_size || 145 $io_channel->getWriteBufferSize() >= $buffer_size || 146 $error_channel->getWriteBufferSize() >= $buffer_size)) { 147 PhutilChannel::waitForActivity(array(), $channels); 148 $io_channel->update(); 149 $command_channel->update(); 150 $error_channel->update(); 151 } 152 153 // If the subprocess has exited and we've read everything from it, 154 // we're all done. 155 $done = !$command_channel->isOpenForReading() && 156 $command_channel->isReadBufferEmpty(); 157 158 if (!$this->pauseIOReads) { 159 $in_message = $io_channel->read(); 160 if ($in_message !== null) { 161 $this->writeIORead($in_message); 162 } 163 } 164 165 $out_message = $command_channel->read(); 166 if (strlen($out_message)) { 167 $out_message = $this->willReadData($out_message); 168 if ($out_message !== null) { 169 $io_channel->write($out_message); 170 } 171 } 172 173 // If we have nothing left on stdin, close stdin on the subprocess. 174 if (!$io_channel->isOpenForReading()) { 175 $command_channel->closeWriteChannel(); 176 } 177 178 if ($done) { 179 break; 180 } 181 182 // If the client has disconnected, kill the subprocess and bail. 183 if (!$io_channel->isOpenForWriting()) { 184 $this->execFuture 185 ->setStdoutSizeLimit(0) 186 ->setStderrSizeLimit(0) 187 ->setReadBufferSize(null) 188 ->resolveKill(); 189 break; 190 } 191 } 192 193 list($err) = $this->execFuture 194 ->setStdoutSizeLimit(0) 195 ->setStderrSizeLimit(0) 196 ->setReadBufferSize(null) 197 ->resolve(); 198 199 return $err; 200 } 201 202 public function writeIORead($in_message) { 203 $in_message = $this->willWriteData($in_message); 204 if (strlen($in_message)) { 205 $this->commandChannel->write($in_message); 206 } 207 } 208 209 public function willWriteData($message) { 210 if ($this->willWriteCallback) { 211 return call_user_func($this->willWriteCallback, $this, $message); 212 } else { 213 if (strlen($message)) { 214 return $message; 215 } else { 216 return null; 217 } 218 } 219 } 220 221 public function willReadData($message) { 222 if ($this->willReadCallback) { 223 return call_user_func($this->willReadCallback, $this, $message); 224 } else { 225 if (strlen($message)) { 226 return $message; 227 } else { 228 return null; 229 } 230 } 231 } 232 233}