1

I am writing a small project where the main process subscribes on RabbitMQ and waits for messages to come. Then there is a Thread that keeps a queue of related responses grouped by correlation id and is going to push them through socket connection to another component once 3 seconds has passed from the first response. The problem is that no matter what I try, the queue is not getting modified. For example when I get a message from RabbitMQ, I call update_queue, pass the message and try to update the queue with a response. However the queue is always empty.

       <?php

ini_set('display_errors', 1);
ini_set('display_startup_errors', 1);
error_reporting(E_ALL);

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class Aggregator {

    private $task;

    public function __construct(){
        $this->task = new Task();
        $this->task->start();
    }

    public function subscribe(){
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();

        $channel->queue_declare('aggregator', false, false, false, false);
        $channel->basic_consume('aggregator', '', false, true, false, false, array($this, 'on_response'));

        echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

        while(count($channel->callbacks)) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }

    public function on_response($msg){
        echo "Pushing to queue";
        $this->task->update_queue($msg);
    }
}

class Task extends Thread {

    private $queue;
    private $socket;
    const TIMEOUT = 3;
    //Flag used to pause queue check when its being updated
    private $pause;


    public function __construct(){
        $this->queue = array();
        $this->pause = false;
        //$this->socket = stream_socket_client('localhost:9999/echo');
    }

    public function update_queue($msg){
        echo "Updating queue \n";
        $this->pause = true;
        $response = json_decode($msg->body);

        if(!array_key_exists($msg->get('correlation_id'), $this->queue)){
            //Gets in here and correlation_id is a non-null value
            echo "Correlation does not exist" . $msg->get('correlation_id').  "\n";
            $this->queue[$msg->get('correlation_id')] = array();
            $this->queue[$msg->get('correlation_id')]['created_at'] = time();
            $this->queue[$msg->get('correlation_id')]['response'] = array();
        }

        //Null here
        var_dump($this->queue[$msg->get('correlation_id')]);

        //Response is a non-nul value
        $this->queue[$msg->get('correlation_id')]['response'][] = $response;
        $this->pause = false;

        //Always 0 here
        echo "Count: " . count($this->queue) . "\n";

        echo "Queue updated \n";
    }

    public function __destruct(){
        //fclose($this->socket);
    }

    public function run(){

        while(true){
            if($this->pause){continue;}

            //Always 0 here!
            //echo "Count: " . count($this->queue) . "\n";


            foreach($this->queue as $k => $v){
                echo $v['created_at'] . "\n";
                if(self::TIMEOUT + $v['created_at'] < time()){

                    $response = array();
                    $response['correlation_id'] = $k;
                    $response['response'] = $v['response'];

                    var_dump($response);

                    $response = json_encode($response);
                    //fwrite($this->sockett, $response);
                    //unset($this->queue[$k]);
                }
            }
        }
    }
}

$aggregator = new Aggregator();
$aggregator->subscribe();
?>
Adam
  • 1,054
  • 1
  • 12
  • 26
  • I don't understand the utility of using a multi-threading in this case. In your code you just extends a class, but you didn't really use the multi-threading. If you are looking to use the multi-threading feature, check [my example](http://stackoverflow.com/questions/28493421/send-multiple-numbers-sms-requests-in-one-second-php/33254770#33254770) and use it correctly. – Alaanor Dec 22 '15 at 19:13

0 Answers0