Hey Bernd,
Thank you very much for this quite large feedback. I will try to answer your questions from my perspective.
-
Yes, we have a major overhaul coming soon which should improve things on several fronts. What do you mean by “a bit wired”? Go in many ways follows “old school” Unix philosophy, which is quite different from Java world, so yes things may seem a bit different.
-
I’m not sure If I fully understand your question here. What do you mean by “access it as a JSON”? JSON itself is not a data structure with access pattern but a serialization format (series of bytes of how we represent data) and as such you can marshal or unmarshal it to different data structures - maps, slices or the user defined data types (which would be prefered the way of building production-grade services). I’ll try to elaborate what I mean later on. I’m completely aware of that examples and tests around payload handling are a bit primitive for non-Go people so I will try to explain how would I build a service with some job workers.
-
& 4. We are aware of those things as you might have already seen on our issue tracker. Due to new direction and prioritization of the things which we are currently working on I can’t make any promises when those tickets will be resolved.
In the examples you provided you will get payload as a map, more exact - specialized version of map with the following signature map[string]interface{}
. What this means, is that key attributes of this map will be string
and values will be interface{}
. Every type in Go implements at least 1 interface, this is this one - interface{}
. This translates that values of the map will be any type. If we think about it (and check JSON grammar in spec.) we can conclude that map[string]interface{}
is a superset of JSON spec and representing JSON tree with is perfectly valid.
Therefore, (if I understood you correctly) the following would be equal:
payload := job.GetPayload()
payload := job.GetPayloadAsJSON()
Now, the question is, if it’s there, should we use it in production? My answer, is definately no, cause we lose the type checking at runtime and since the payload is object which is coming from I/O, we should always keep type checks.
I wrote a small example on how I see the production-grade service should be done. The example can be found here. Let me try to explain it - We have the following process:
First, we have to create a client and connect to Zeebe. In the example, I will use a special feature of Go - init functions. Those functions are special module functions which will get executed before anything else in the module and are used for variables initialization, checking/fixing program’s state, registering, running one-time computations, etc.
var zbClient *zbc.Client
func init() {
client, err := zbc.NewClient(Addr)
if err != nil { log.Fatal(err) }
zbClient = client
}
Now, once we have our client we can start our program resources:
func main() {
_, err := zbClient.CreateWorkflowFromFile(Topic, zbcommon.BpmnXml, ProcessDefinitionPath)
if err != nil { log.Fatal(err) }
fooSub, err := zbClient.JobSubscription(Topic, "fooWorker", "foo", 30000, 30, new(FooJobHandler).Handle)
if err != nil { log.Fatal(err) }
fooSub.StartAsync()
barSub, err := zbClient.JobSubscription(Topic, "fooWorker", "bar", 30000, 30, new(BarJobHandler).Handle)
if err != nil { log.Fatal(err) }
barSub.StartAsync()
foobarSub, err := zbClient.JobSubscription(Topic, "fooWorker", "foobar", 30000, 30, new(FooBarJobHandler).Handle)
if err != nil { log.Fatal(err) }
foobarSub.StartAsync()
http.HandleFunc("/create/instance", createInstance)
log.Fatal(http.ListenAndServe(fmt.Sprintf("%s:%d", ServerInterface, ServerPort), nil))
}
We deployed our workflow and started all workers which we need to start by our process definition. You can notice here that I’m using a similar approach as you can find in Java. We have to define our worker types and implement Handle
function which will execute every job for a given task, you can use this or just normal functions. After that we are starting a HTTP server with one GET endpoint /create/instance
.
Now, if we look into the process definition you can see that there we have some ioMapping records for our first foo
task. Those mappings are the following:
<zeebe:ioMapping>
<zeebe:input source="$.a" target="$.foo" />
<zeebe:input source="$.name" target="$.name" />
</zeebe:ioMapping>
This is important to note during payload handling. Now let’s create our workflow instance with payload, but before let’s look into the way how we define our payload in a way that we preserve type checking.
type CreateWorkflowPayload struct {
ID int `msgpack:"a"`
Name string `msgpack:"name"`
}
and payload for foo job worker:
type FooJobPayload struct {
ID int `msgpack:"foo"`
Name string `msgpack:"name"`
Value int `msgpack:"value"`
}
If we look into the implementation of a handler for that endpoint it looks like this:
func createInstance(w http.ResponseWriter, r *http.Request) {
name := r.URL.Query().Get("name")
/// ... more code in example ...
payload := &CreateWorkflowPayload{ID: 1, Name: name}
instance := zbc.NewWorkflowInstance("demoProcess", zbc.LatestVersion, payload)
_, err := zbClient.CreateWorkflowInstance(Topic, instance)
if err != nil {
log.Printf("[http] InternalServerError %d - /create/instance?name=%s - %s\n", http.StatusInternalServerError, name, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
/// ... more code in example ...
}
Now, if we execute GET request towards our HTTP server, a handler will create a new payload with ID=1 and Name=<GETQueryParameter>
. Example:
curl http://localhost:4567/create/instance\?name\=myWorkflowInstance
If we look into our server log, we can see something like this:

Great! Let’s look into how these prints are achieved. To implement a Zeebe job handler, we need a function with the following signature:
type FooJobHandler struct {
// NOTE: atomic values which this handler might use
}
func (handler *FooJobHandler) Handle(client zbsubscribe.ZeebeAPI, event *zbsubscriptions.SubscriptionEvent) {
var payload FooJobPayload
event.LoadPayload(&payload)
log.Printf("[worker::foo] %+v\n", payload)
payload.Name = "fooWasHere"
payload.Value += 1
event.UpdatePayload(&payload)
client.CompleteJob(event)
}
We created initially our payload with CreateWorklowPayload object and now we are reading it with FooJobPayload object - wat? The reason for this is ioMapping! Couple of things to note here:
- Technically we can use the same object if there is no ioMapping
- Those
msgpack: ""
tags on payload objects have to match that ioMapping from the process definition
- Handling payload boils down to two actions: LoadPayload from the event into the user-defined object and UpdatePayload to the event before sending it to the broker
Why do we have both? Each method provides pros/cons and generally would be used in different stages of development. The typed way of dealing with payload is safer, more readable and maintainable but puts a bit more effort on the development team cause they have to define how the payload will look like ahead of time. Other, map[string]interface{} / JSON way of dealing with payload is less safe and more cumbersome but puts less constraint on the development team, so it’s useful in the prototyping stage since it will always give you all the data (without type-assertion) which comes in your worker.
You can find this example on develop branch, so don’t forget to checkout that branch on your GOPATH before you try this out. Hope I made some things a bit clearer. If not, feel free to talk to me on Slack.
p.s. When in doubt, always type
it and never panic
. 