Custom connector that can send messages to Azure Service Bus via AMQP 1.0

Hi all,

I’m trying to build a custom connector that can send messages to Azure Service Bus via AMQP 1.0.

I tested the connector individually, it works. But after I added this connector to Camunda as an extension, it gives me this error when I try to run a process that is using the connector.

Caused by: java.lang.ClassCastException: org.apache.qpid.jms.JmsQueue cannot be cast to javax.jms.Destination
at org.camunda.bpm.extension.mail.send.SendMailConnector.execute(SendMailConnector.java:87)

Connector code:

    ConnectionStringBuilder csb = new ConnectionStringBuilder(ConnectionString);
    
    // set up JNDI context
    Hashtable<String, String> hashtable = new Hashtable<>();
    hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
    hashtable.put("queue.QUEUE", QueueName);

    hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");

    Context context = new InitialContext(hashtable);

    Destination queue = (Destination) context.lookup("QUEUE");  //  ERROR occurs here.
    ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

    Connection connection = cf.createConnection(csbSasKeyName, csbSasKey);
    javax.jms.Session session = connection.createSession(false, javax.jms.Session.CLIENT_ACKNOWLEDGE);

    MessageProducer producer = session.createProducer(queue);

    BytesMessage m = session.createBytesMessage();
    m.writeBytes(String.valueOf(1).getBytes());
    producer.send(m);

    System.out.printf("Sent message %d.\n");

    producer.close();
    session.close();
    connection.stop();
    connection.close();

Dependencies I’m using:

com.microsoft.azure
azure-servicebus
1.1.2

<dependency>
	<groupId>org.apache.qpid</groupId>
	<artifactId>proton-j</artifactId>
	<version>[0.26.0,]</version>
</dependency>

<dependency>
	<groupId>org.apache.qpid</groupId>
	<artifactId>qpid-jms-client</artifactId>
	<version>[0.27.0,]</version>
</dependency>

<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>[1.2.0,]</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>[1.7.25,]</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>1.7.25</version>
</dependency>

Any idea why does this append?

Thank you.

Hi @Ruiheng_Zhao,

I don’t know Qpid but it looks odd that you lookup for “QUEUE” and try to cast it to a Destination. Maybe you should declare the destination in the config/context instead of the queue. For example:

hashtable.put("destination.topicExchange", "amq.topic");
...
Destination destination = (Destination) context.lookup("topicExchange");

Does this help you?

Best regards,
Philipp