In the second part of this series, we discuss how to enable two way communication between the user process and the worker processes. Read part I of the series here.
Process as a running function
Suppose we want to write a process that sends a message over the network every 10 seconds. We firstly implement this as a function, and then use MATLAB’s parfeval
function to run this on a worker.
function processToBeRun(ipAddress, port)
d = tcpclient(ipAddress, port);
configureTerminator(d,"CR/LF");
while true
writeline(d, "Hello world!");
pause(10);
end
end
We can run this function on a worker thread using processRunning = parfeval(processToBeRun,0,ipAddress,port);
The variable processRunning
is a Future object that can be used to query the function’s running status and see the errors reported, if any. The second argument 0 is for the number of function outputs to be gathered. Because our process is not meant to return useful data when it terminates, no output variable is set.
Process receives data from user
The first refinement we are going to add is to allow the user to manually change the message sent each time. To do this, we require a data queue to be established. The user process can put data into the queue and the worker process can receive the data. MATLAB offers two kinds of data queue: the parallel.pool.DataQueue and the parallel.pool.PollableDataQueue
. The difference is that the former relies on a callback function to handle each data parcel that arrives, and the latter needs to be programmatically polled to get the data. Given that our worker process needs to interface with the hardware, it is desirable that we can programmatically control when the process looks for and processes data, as opposed to having a callback function. If a callback function is used, then it may be executed too frequently if the user sends an overwhelming amount of data, therefore interrupt the TCP communication routine.
All MATLAB data queues are monodirectional. They can only allow data to go in one direction, either from the user to the worker or the other way around. There is no bidirectional data queue. Imagine a queue as a pipe where data can only flow from one end to the other. The queue needs to be created on the receiving end. Therefore, to receive data on the worker, the PollableDataQueue
object needs to be created on the worker. How, then, can we get hold of the other end in the user process?
The solution is as follows. Prior to sheduling our processToBeRun
to run, execute the following code:
workerQueueConstant = parallel.pool.Constant(@parallel.pool.PollableDataQueue);
workerQueueUserEnd = fetchOutputs(parfevalOnAll(@(x) x.Value, 1, obj.workerQueueConstant));
The above commands firstly create a parallel.pool.Constant
from the class initialiser of parallel.pool.PollableDataQueue
. C = parallel.pool.Constant(
uses the function handle fcn
)fcn
to create a Constant
object C
. Use the Value
property to get the result from running fcn()
with one output. In our case, when the Value
property is called, the parallel.pool.PollableDataQueue
initialiser is run, and the one output will be a parallel.pool.PollableDataQueue
object.
The second line of the code calls the Value property of the Constant on all parallel workers and fetches the outputs. The result workerQueueUserEnd
is an array of parallel.pool.PollableDataQueue
, each leading to a parallel worker.
The following code will, therefore, send the same data to all parallel workers. Each worker can poll their queue to get the data.
message = "This message goes to every worker!";
for i=1:length(workerQueueUserEnd)
send(workerQueueUserEnd(i),message);
end
Process sends data to user
Sending data from a process to the user is more straightforward to implement using queues, although this method is only suitable for small amounts of data. Because a queue needs to be created on the receiving end, we simply create the queue in the user process and pass it to the worker process as an argument. Because the user process, rather than managing a tightly clocked cycle, needs to handle arriving data promptly and dispose of them quickly, a pollable queue would not be suitable. Instead, a parallel.pool.DataQueue
is used and a listener function is attached using the afterEach
method to handle arriving data the instant it is received.
The user-end code looks like this:
% Create a data queue to receive data from worker
queue = parallel.pool.DataQueue;
% Attach a listener to display anything received from the worker
afterEach(queue, @disp);
% Schedule the function to run with access to both queues
% Note that queue and workerQueueConstant have been passed as arguments
processRunning = parfeval(@processToBeRun,0,queue,workerQueueConstant,ipAddress,port);
% Send message to the worker
send(queue, "Write this string instead!");
The processToBeRun code now looks like this, with queues enabling bidirectional communication. The following code checks for any message that may have been sent by the user in each cycle. If nothing is received, it writes a default message to the TCP server. If anything is received, it writes the received message instead.
function processToBeRun(queue,workerQueueConstant,ipAddress, port)
% Bi-directional communications
queueToUser = queue; % A queue
queueFromUser = workerQueueConstant.Value; % A pollable queue
d = tcpclient(ipAddress, port);
configureTerminator(d,"CR/LF");
% Send a message to the user
send(queueToUser, "TCP configuration successful, starting to send messages...");
while true
% Check if the user has sent anything
[userMessage,ok] = poll(queueFromUser);
if ~ok % If nothing received from user
writeline(d, "Default message.");
else % If a message is received from user
writeline(d,userMessage);
end
pause(10);
end
end
When executed, the user will see this message displayed: TCP configuration successful, starting to send messages...
This message is sent by processToBeRun
, arrived on the user end through the data queue, and displayed by the disp
function attached as the afterEach
listener to the said queue. After displaying, the message is disposed of.
Identifying processes
Now that we have a means of sending data to all parallel workers, how do we know which worker our processToBeRun
is executing on? Because workerQueueUserEnd
is obtained by using fetchOutputs
on parfevalOnAll
, the order of the queues in workerQueueUserEnd
is random. That is to say, workerQueueUserEnd(1)
does not necessarily lead to the first worker in the parallel pool or the worker that the first process will be allocated.
The solution, which I will refer to as ‘identifying’, is simple in principle. We send the value of 1
to workerQueueUserEnd(1)
, send 2
to workerQueueUserEnd(2)
, and so on. Within processToBeRun
, we poll queueFromUser
to see the value i
sent to us, and send the same number back to the user through queueToUser
. In this way, the user can use workerQueueUserEnd(i)
to communicate with processToBeRun
.
To implement this method of identification, a communications protocol needs to be designed such that processToBeRun
returns the number i
to the user when such a message is received. I recommend making each data object sent through a queue a struct, or better still, an object. In this way, a field can be added to the object to define its type. The processToBeRun
can be designed to handle each type of data differently using a switch
command. Similarly, the queue listener function on the user’s end must know different types of data arriving and handle them correspondingly.
Next up…
In the next article of this series, we discuss how to handle large amounts of data transfer and data streaming between workers and the user using memory maps which is faster than using queues.