[073EN] The implementation of a multithread software in MATLAB – Part 2: Two Way Communication

In the second part of this series, we discuss how to enable two way communication between the user process and the worker processes. Read part I of the series here.

Process as a running function

Suppose we want to write a process that sends a message over the network every 10 seconds. We firstly implement this as a function, and then use MATLAB’s parfeval function to run this on a worker.

function processToBeRun(ipAddress, port)

    d = tcpclient(ipAddress, port);
    configureTerminator(d,"CR/LF");

    while true
        writeline(d, "Hello world!");
        pause(10);
    end

end

We can run this function on a worker thread using processRunning = parfeval(processToBeRun,0,ipAddress,port); The variable processRunning is a Future object that can be used to query the function’s running status and see the errors reported, if any. The second argument 0 is for the number of function outputs to be gathered. Because our process is not meant to return useful data when it terminates, no output variable is set.

Process receives data from user

The first refinement we are going to add is to allow the user to manually change the message sent each time. To do this, we require a data queue to be established. The user process can put data into the queue and the worker process can receive the data. MATLAB offers two kinds of data queue: the parallel.pool.DataQueue and the parallel.pool.PollableDataQueue. The difference is that the former relies on a callback function to handle each data parcel that arrives, and the latter needs to be programmatically polled to get the data. Given that our worker process needs to interface with the hardware, it is desirable that we can programmatically control when the process looks for and processes data, as opposed to having a callback function. If a callback function is used, then it may be executed too frequently if the user sends an overwhelming amount of data, therefore interrupt the TCP communication routine.

All MATLAB data queues are monodirectional. They can only allow data to go in one direction, either from the user to the worker or the other way around. There is no bidirectional data queue. Imagine a queue as a pipe where data can only flow from one end to the other. The queue needs to be created on the receiving end. Therefore, to receive data on the worker, the PollableDataQueue object needs to be created on the worker. How, then, can we get hold of the other end in the user process?

The solution is as follows. Prior to sheduling our processToBeRun to run, execute the following code:

workerQueueConstant = parallel.pool.Constant(@parallel.pool.PollableDataQueue);
workerQueueUserEnd = fetchOutputs(parfevalOnAll(@(x) x.Value, 1, obj.workerQueueConstant));

The above commands firstly create a parallel.pool.Constant from the class initialiser of parallel.pool.PollableDataQueue. C = parallel.pool.Constant(fcn) uses the function handle fcn to create a Constant object C. Use the Value property to get the result from running fcn() with one output. In our case, when the Value property is called, the parallel.pool.PollableDataQueue initialiser is run, and the one output will be a parallel.pool.PollableDataQueue object.

The second line of the code calls the Value property of the Constant on all parallel workers and fetches the outputs. The result workerQueueUserEnd is an array of parallel.pool.PollableDataQueue, each leading to a parallel worker.

The following code will, therefore, send the same data to all parallel workers. Each worker can poll their queue to get the data.

message = "This message goes to every worker!";
for i=1:length(workerQueueUserEnd)
    send(workerQueueUserEnd(i),message);
end

Process sends data to user

Sending data from a process to the user is more straightforward to implement using queues, although this method is only suitable for small amounts of data. Because a queue needs to be created on the receiving end, we simply create the queue in the user process and pass it to the worker process as an argument. Because the user process, rather than managing a tightly clocked cycle, needs to handle arriving data promptly and dispose of them quickly, a pollable queue would not be suitable. Instead, a parallel.pool.DataQueue is used and a listener function is attached using the afterEach method to handle arriving data the instant it is received.

The user-end code looks like this:

% Create a data queue to receive data from worker
queue = parallel.pool.DataQueue;
% Attach a listener to display anything received from the worker
afterEach(queue, @disp);

% Schedule the function to run with access to both queues
% Note that queue and workerQueueConstant have been passed as arguments
processRunning = parfeval(@processToBeRun,0,queue,workerQueueConstant,ipAddress,port);

% Send message to the worker
send(queue, "Write this string instead!");

The processToBeRun code now looks like this, with queues enabling bidirectional communication. The following code checks for any message that may have been sent by the user in each cycle. If nothing is received, it writes a default message to the TCP server. If anything is received, it writes the received message instead.

function processToBeRun(queue,workerQueueConstant,ipAddress, port)

    % Bi-directional communications
    queueToUser = queue; % A queue
    queueFromUser = workerQueueConstant.Value; % A pollable queue

    d = tcpclient(ipAddress, port);
    configureTerminator(d,"CR/LF");

    % Send a message to the user
    send(queueToUser, "TCP configuration successful, starting to send messages...");

    while true
        % Check if the user has sent anything
        [userMessage,ok] = poll(queueFromUser);
        if ~ok % If nothing received from user
            writeline(d, "Default message.");
        else % If a message is received from user
            writeline(d,userMessage);
        end
        pause(10);
    end

end

When executed, the user will see this message displayed: TCP configuration successful, starting to send messages... This message is sent by processToBeRun, arrived on the user end through the data queue, and displayed by the disp function attached as the afterEach listener to the said queue. After displaying, the message is disposed of.

Identifying processes

Now that we have a means of sending data to all parallel workers, how do we know which worker our processToBeRun is executing on? Because workerQueueUserEnd is obtained by using fetchOutputs on parfevalOnAll, the order of the queues in workerQueueUserEnd is random. That is to say, workerQueueUserEnd(1) does not necessarily lead to the first worker in the parallel pool or the worker that the first process will be allocated.

The solution, which I will refer to as ‘identifying’, is simple in principle. We send the value of 1 to workerQueueUserEnd(1), send 2 to workerQueueUserEnd(2), and so on. Within processToBeRun, we poll queueFromUser to see the value i sent to us, and send the same number back to the user through queueToUser. In this way, the user can use workerQueueUserEnd(i) to communicate with processToBeRun.

To implement this method of identification, a communications protocol needs to be designed such that processToBeRun returns the number i to the user when such a message is received. I recommend making each data object sent through a queue a struct, or better still, an object. In this way, a field can be added to the object to define its type. The processToBeRun can be designed to handle each type of data differently using a switch command. Similarly, the queue listener function on the user’s end must know different types of data arriving and handle them correspondingly.

Next up…

In the next article of this series, we discuss how to handle large amounts of data transfer and data streaming between workers and the user using memory maps which is faster than using queues.

Leave a Reply