芝麻web文件管理V1.00
编辑当前文件:/home/soundframestudio/smith.soundframestudios.net/wp-content/plugins/worker/src/Gelf/Publisher.php
hostname = $hostname; $this->port = $port; $this->fallbackPort = $fallbackPort; $this->chunkSize = $chunkSize; } /** * Publishes a Gelf_Message, returns false if an error occurred during write. * * @throws UnexpectedValueException * * @param Gelf_Message $message * * @return boolean */ public function publish(Gelf_Message $message) { if (self::$brokenSocket) { return false; } // Check if required message parameters are set if (!$message->getShortMessage() || !$message->getHost()) { throw new UnexpectedValueException( 'Missing required data parameter: "version", "short_message" and "host" are required.' ); } // Set Graylog protocol version $message->setVersion(self::GRAYLOG2_PROTOCOL_VERSION); // Encode the message as json string and compress it using gzip $preparedMessage = $this->getPreparedMessage($message); // Infinite-loop break. self::$brokenSocket = true; // Open a connection to GrayLog server. $socket = $this->getSocketConnection(); if (!$socket) { return false; } self::$brokenSocket = false; // Several udp writes are required to publish the message if ($this->isMessageSizeGreaterChunkSize($preparedMessage)) { // A unique id which consists of the microtime and a random value $messageId = $this->getMessageId(); // Split the message into chunks. $messageChunks = $this->getMessageChunks($preparedMessage); $messageChunksCount = count($messageChunks); // Send chunks to GrayLog server. foreach (array_values($messageChunks) as $messageChunkIndex => $messageChunk) { $bytesWritten = $this->writeMessageChunkToSocket( $socket, $messageId, $messageChunk, $messageChunkIndex, $messageChunksCount ); if (false === $bytesWritten) { // Abort due to write error return false; } } } else { // A single write is enough to get the message published if (false === $this->writeMessageToSocket($socket, $preparedMessage)) { // Abort due to write error return false; } } // This increases stability a lot if messages are sent in a loop // A value of 20 means 0.02 ms usleep(20); // Message successful sent return true; } /** * @param Gelf_Message $message * * @return string */ protected function getPreparedMessage(Gelf_Message $message) { return gzcompress(json_encode($message->toArray())); } /** * @return resource|false */ protected function getSocketConnection() { if (!$this->streamSocketClient) { $hostname = gethostbyname($this->hostname); $this->streamSocketClient = stream_socket_client(sprintf('udp://%s:%d', $hostname, $this->port)); if ($this->streamSocketClient === false && $this->fallbackPort) { $this->streamSocketClient = stream_socket_client(sprintf('tcp://%s:%d', $hostname, $this->fallbackPort)); } } return $this->streamSocketClient; } /** * @param string $preparedMessage * * @return boolean */ protected function isMessageSizeGreaterChunkSize($preparedMessage) { return (strlen($preparedMessage) > $this->chunkSize); } /** * @return float */ protected function getMessageId() { return (float)(microtime(true).mt_rand(0, 10000)); } /** * @param string $preparedMessage * * @return array */ protected function getMessageChunks($preparedMessage) { return str_split($preparedMessage, $this->chunkSize); } /** * @param float $messageId * @param string $data * @param integer $sequence * @param integer $sequenceSize * * @throws InvalidArgumentException * @return string */ protected function prependChunkInformation($messageId, $data, $sequence, $sequenceSize) { if (!is_string($data) || $data === '') { throw new InvalidArgumentException('Data must be a string and not be empty.'); } if (!is_integer($sequence) || !is_integer($sequenceSize)) { throw new InvalidArgumentException('Sequence number and size must be integer.'); } if ($sequenceSize <= 0) { throw new InvalidArgumentException('Sequence size must be greater than 0.'); } if ($sequence > $sequenceSize) { throw new InvalidArgumentException('Sequence size must be greater than sequence number.'); } return pack('CC', 30, 15).substr(md5($messageId, true), 0, 8).pack('CC', $sequence, $sequenceSize).$data; } /** * @param resource $socket * @param float $messageId * @param string $messageChunk * @param integer $messageChunkIndex * @param integer $messageChunksCount * * @return integer|boolean */ protected function writeMessageChunkToSocket($socket, $messageId, $messageChunk, $messageChunkIndex, $messageChunksCount) { return fwrite( $socket, $this->prependChunkInformation($messageId, $messageChunk, $messageChunkIndex, $messageChunksCount) ); } /** * @param resource $socket * @param string $preparedMessage * * @return integer|boolean */ protected function writeMessageToSocket($socket, $preparedMessage) { return fwrite($socket, $preparedMessage); } }