Hi,
I have a scenario where I need to consume the messages published on Rabbit MQ and create the User Task instance based on the message. So I designed the BPMN like this.
Here is the code for receiver
public class DocSignalReceiver extends AbstractBpmnActivityBehavior {
private static final String DOC_NAME = "documentName"; private final static String QUEUE_NAME = "TestMQ";
@Autowired
ProcessEngine engine;
public static final String EXECUTION_ID = "executionId";
public void execute(final ActivityExecution execution) throws Exception {
/*
* Message will be places by external service
* here.
*/
}
public void signal(ActivityExecution execution, String signalName, Object signalData) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
createWorkItem(message);
} finally {
System.out.println(" [x] Done");
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
leave(execution);
}
When I ran my test case I am getting the exception
public class TestAsynchronousServiceTask {
@Rule
public ProcessEngineRule processEngineRule = new ProcessEngineRule();
@Test
@Deployment(resources = { "testprocess.bpmn" })
public void testServiceInvocationSuccessful() {
final ProcessEngine processEngine = processEngineRule.getProcessEngine();
final RuntimeService runtimeService = processEngineRule.getRuntimeService();
final TaskService taskService = processEngineRule.getTaskService();
// start the process instance
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("testprocess", "");
Map<String, Object> callbackPayload = Collections.<String,Object>singletonMap("DocumentName", "Test.pdf");
Execution execution = processEngine.getRuntimeService()
.createExecutionQuery().processInstanceId(processInstance.getId()).singleResult();
processEngine.getRuntimeService().signal(execution.getId(),callbackPayload);
}
}
The Exception is coming in the last line of the test case saying that the execution id does not exists.
Fundamental questions I do have
- Is my thought process is right or do I need to use send and receive activities?
- From the last line I and getting the execution id same as process id, Is that right?
Please help me out in solving the issue.
Regards,
Subbu