Skip to content

Commit 8016f27

Browse files
authored
Merge pull request #4578 from esl/event-metadata
Use a hook with handlers in mod_event_pusher
2 parents b64d895 + a12ce09 commit 8016f27

13 files changed

+240
-107
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
-module(mod_event_pusher_filter).
2+
-moduledoc """
3+
An example filter and metadata provider for mod_event_pusher
4+
5+
It performs two actions:
6+
1. Filters out any events other than 'user_status_event'.
7+
2. Adds the following event metadata:
8+
- timestamp in milliseconds (taken from mongoose_acc)
9+
- number of user's active sessions
10+
The metadata can be injected into the event published by the configured backends
11+
(currently only 'rabbit' supports this).
12+
""".
13+
14+
-behaviour(gen_mod).
15+
16+
%% gen_mod callbacks
17+
-export([start/2, stop/1, hooks/1]).
18+
19+
%% hook handlers
20+
-export([push_event/3]).
21+
22+
-include_lib("../../include/mod_event_pusher_events.hrl").
23+
24+
%% gen_mod callbacks
25+
26+
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
27+
start(_HostType, _Opts) ->
28+
ok.
29+
30+
-spec stop(mongooseim:host_type()) -> ok.
31+
stop(_HostType) ->
32+
ok.
33+
34+
-spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
35+
hooks(HostType) ->
36+
[{push_event, HostType, fun ?MODULE:push_event/3, #{}, 10}]. % needs to run before other handlers
37+
38+
%% hook handlers
39+
40+
-doc "For user status events, add timestamp and session count. Filter out other events.".
41+
-spec push_event(mod_event_pusher:push_event_acc(), mod_event_pusher:push_event_params(),
42+
gen_hook:extra()) -> gen_hook:hook_fn_ret(mod_event_pusher:push_event_acc()).
43+
push_event(HookAcc = #{acc := Acc}, #{event := #user_status_event{jid = JID}}, _Extra) ->
44+
#{metadata := Metadata} = HookAcc,
45+
NewMetadata = Metadata#{timestamp => mongoose_acc:timestamp(Acc),
46+
session_count => count_user_sessions(JID)},
47+
{ok, HookAcc#{metadata := NewMetadata}};
48+
push_event(HookAcc, _Params, _Extra) ->
49+
{stop, HookAcc}.
50+
51+
%% helpers
52+
53+
-spec count_user_sessions(jid:jid()) -> non_neg_integer().
54+
count_user_sessions(JID) ->
55+
length(ejabberd_sm:get_user_present_resources(JID)).

big_tests/tests/mod_event_pusher_rabbit_SUITE.erl

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ all() ->
5050
{group, only_presence_status_publish},
5151
{group, chat_message_publish},
5252
{group, group_chat_message_publish},
53-
{group, instrumentation}
53+
{group, instrumentation},
54+
{group, filter_and_metadata}
5455
].
5556

5657
groups() ->
@@ -61,7 +62,8 @@ groups() ->
6162
{only_presence_status_publish, [], only_presence_status_publish_tests()},
6263
{chat_message_publish, [], chat_message_publish_tests()},
6364
{group_chat_message_publish, [], group_chat_message_publish_tests()},
64-
{instrumentation, [], instrumentation_tests()}].
65+
{instrumentation, [], instrumentation_tests()},
66+
{filter_and_metadata, [], filter_and_metadata_tests()}].
6567

6668
pool_startup_tests() ->
6769
[rabbit_pool_starts_with_default_config].
@@ -96,6 +98,10 @@ instrumentation_tests() ->
9698
[connections_events_are_executed,
9799
messages_published_events_are_executed].
98100

101+
filter_and_metadata_tests() ->
102+
[messages_published_events_are_not_executed,
103+
presence_messages_are_properly_formatted_with_metadata].
104+
99105
suite() ->
100106
escalus:suite().
101107

@@ -112,6 +118,7 @@ init_per_suite(Config) ->
112118
start_rabbit_wpool(domain()),
113119
{ok, _} = application:ensure_all_started(amqp_client),
114120
muc_helper:load_muc(),
121+
mongoose_helper:inject_module(mod_event_pusher_filter),
115122
escalus:init_per_suite(Config);
116123
false ->
117124
{skip, "RabbitMQ server is not available on default port."}
@@ -132,6 +139,9 @@ init_per_group(GroupName, Config0) ->
132139

133140
required_modules(pool_startup) ->
134141
[{mod_event_pusher, stopped}];
142+
required_modules(filter_and_metadata = GroupName) ->
143+
[{mod_event_pusher_filter, #{}},
144+
{mod_event_pusher, #{rabbit => mod_event_pusher_rabbit_opts(GroupName)}}];
135145
required_modules(GroupName) ->
136146
[{mod_event_pusher, #{rabbit => mod_event_pusher_rabbit_opts(GroupName)}}].
137147

@@ -218,7 +228,7 @@ only_presence_exchange_is_created_on_module_startup(Config) ->
218228
?assertNot(is_exchange_present(Connection, {?GROUP_CHAT_MSG_EXCHANGE, ExCustomType})).
219229

220230
%%--------------------------------------------------------------------
221-
%% GROUP (only_)presence_status_publish
231+
%% GROUP (only_)presence_status_publish, filter_and_metadata
222232
%%--------------------------------------------------------------------
223233

224234
connected_users_push_presence_events_when_change_status(Config) ->
@@ -236,20 +246,31 @@ connected_users_push_presence_events_when_change_status(Config) ->
236246
end).
237247

238248
presence_messages_are_properly_formatted(Config) ->
239-
escalus:story(
249+
escalus:fresh_story_with_config(
250+
Config, [{bob, 1}], fun presence_messages_are_properly_formatted_story/2).
251+
252+
presence_messages_are_properly_formatted_with_metadata(Config) ->
253+
escalus:fresh_story_with_config(
240254
Config, [{bob, 1}],
241-
fun(Bob) ->
242-
%% GIVEN
243-
BobJID = client_lower_short_jid(Bob),
244-
BobFullJID = client_lower_full_jid(Bob),
245-
listen_to_presence_events_from_rabbit([BobJID], Config),
246-
%% WHEN user logout
247-
escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)),
248-
%% THEN receive message
249-
?assertMatch(#{<<"user_id">> := BobFullJID, <<"present">> := false},
250-
get_decoded_message_from_rabbit(BobJID))
255+
fun(_, Bob) ->
256+
TS = rpc(mim(), erlang, system_time, [microsecond]),
257+
DecodedMessage = presence_messages_are_properly_formatted_story(Config, Bob),
258+
#{<<"timestamp">> := T, <<"session_count">> := 0} = DecodedMessage,
259+
?assert(is_integer(T) andalso T > TS)
251260
end).
252261

262+
presence_messages_are_properly_formatted_story(Config, Bob) ->
263+
%% GIVEN
264+
BobJID = client_lower_short_jid(Bob),
265+
BobFullJID = client_lower_full_jid(Bob),
266+
listen_to_presence_events_from_rabbit([BobJID], Config),
267+
%% WHEN user logout
268+
escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)),
269+
%% THEN receive message
270+
DecodedMessage = get_decoded_message_from_rabbit(BobJID),
271+
?assertMatch(#{<<"user_id">> := BobFullJID, <<"present">> := false}, DecodedMessage),
272+
DecodedMessage.
273+
253274
messages_published_events_are_not_executed(Config) ->
254275
escalus:story(
255276
Config, [{bob, 1}, {alice, 1}],
@@ -702,6 +723,7 @@ send_presence_stanza(User, NumOfMsgs) ->
702723
get_decoded_message_from_rabbit(RoutingKey) ->
703724
receive
704725
{#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = Msg}} ->
726+
ct:log("Decoded rabbit message, rk=~p~nmessage:~ts", [RoutingKey, Msg]),
705727
jiffy:decode(Msg, [return_maps])
706728
after
707729
5000 -> ct:fail("Timeout when decoding message, rk=~p", [RoutingKey])

doc/migrations/6.4.0_6.5.0.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ Instead of always sending all notifications, it only enables the ones with a rel
3232

3333
Make sure you have all the necessary sections present in the configuration file before upgrading.
3434

35+
### Custom backends for `mod_event_pusher`
36+
37+
If you have a custom backend implemented for `mod_event_pusher`, you need to update it to handle the `push_event` hook instead of implementing the `mod_event_pusher` behaviour (which no longer exists).
38+
3539
### Deprecation
3640

3741
MSSQL backend is deprecated and will be removed in the next release.

doc/modules/mod_event_pusher.md

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,10 @@
11
## Module Description
22

3-
This module is a generic interface for event-pushing backends.
4-
It defines a single callback, `push_event/2` that forwards the event to all registered backends.
5-
Each backend decides how and if to handle the event in its `push_event/2` implementation.
6-
3+
This module is a generic interface for pushing **events** to the configured **backends**.
4+
The events include presence updates and incoming/outgoing messages.
75
Currently supported backends include [http], [push], [rabbit] and [sns].
86
Refer to their specific documentation to learn more about their functions and configuration options.
97

10-
### How it works
11-
12-
The events are standardized as records that can be found in the `mod_event_pusher_events.hrl` file.
13-
Common events like user presence changes (offline and online), chat and groupchat messages (incoming
14-
and outgoing) are already handled in the `mod_event_pusher_hook_translator` module, which is a proxy between various hooks and the `push_event/2` handler.
15-
168
!!! Warning
179
This module does not support [dynamic domains](../configuration/general.md#generalhost_types).
1810

@@ -46,6 +38,36 @@ The `[modules.mod_event_pusher]` section itself is omitted - this is allowed in
4638
[modules.mod_event_pusher.rabbit]
4739
```
4840

41+
## How it works
42+
43+
The events are standardized as records that can be found in the `mod_event_pusher_events.hrl` file.
44+
Common events like user presence changes (offline and online), chat and groupchat messages (incoming and outgoing) are handled in the `mod_event_pusher_hook_translator` module.
45+
Each event has a corresponding [hook](../developers-guide/Hooks-and-handlers.md), e.g. `user_send_message` is run when a user sends a message.
46+
`mod_event_pusher_hook_translator` has a handler function for each supported hook.
47+
48+
Handling an event includes the following steps:
49+
50+
1. The event hook is executed, and the corresponding handler function in `mod_event_pusher_hook_translator` is called.
51+
1. The handler function calls `mod_event_pusher:push_event(Acc, Event)`.
52+
1. `mod_event_pusher:push_event/2` runs another hook called `push_event`.
53+
1. All configured backend modules have handlers for the `push_event` hook, and all these handlers are called.
54+
55+
### Custom event processing
56+
57+
By implementing your own module handling the `push_event` hook, you can:
58+
59+
- Push the events to a new service, such as a message queue or a database.
60+
- Filter the events by returning `{ok, ...}` to keep an event, or `{stop, ...}` to drop it.
61+
- Add a map with **metadata** to the events. The keys need to be atoms, and the values need to be atoms, binaries or numbers.
62+
63+
There is an example [mod_event_pusher_filter.erl](https://github.com/esl/MongooseIM/blob/master/big_tests/tests/mod_event_pusher_filter.erl) module, demonstrating how to filter the events and append additional metadata.
64+
65+
!!! Note
66+
Execution order of handlers depends on their priorities. In particular, filtering events or adding metadata needs to happend before pushing notifications to external services. The example handler has the priority value of 10, while backends have the priority of 50.
67+
68+
!!! Warning
69+
Currently only the [rabbit](mod_event_pusher_rabbit.md#additional-metadata) backend supports adding metadata to the published notifications.
70+
4971
[http]: ./mod_event_pusher_http.md
5072
[push]: ./mod_event_pusher_push.md
5173
[rabbit]: ./mod_event_pusher_rabbit.md

doc/modules/mod_event_pusher_rabbit.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,10 @@ and for "received" events:
185185
}
186186
```
187187

188+
## Additional metadata
189+
190+
If you decide to [customize the events](mod_event_pusher.md#event-customization) with additional metadata, the additional key-value pairs will be added directly to the JSON object. You can override existing properties, but it is counter-intuitive and thus not recommended.
191+
188192
## Metrics
189193

190194
The module provides some metrics related to RabbitMQ connections and messages

src/event_pusher/mod_event_pusher.erl

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,30 +25,26 @@
2525

2626
-type backend() :: http | push | rabbit | sns.
2727
-type event() :: #user_status_event{} | #chat_event{} | #unack_msg_event{}.
28-
-export_type([event/0]).
28+
-type metadata() :: #{atom() => atom() | binary() | number()}.
29+
-type push_event_params() :: #{event := event()}.
30+
-type push_event_acc() :: #{acc := mongoose_acc:t(), metadata := metadata()}.
31+
-export_type([event/0, metadata/0, push_event_acc/0, push_event_params/0]).
2932

3033
-export([deps/2, start/2, stop/1, config_spec/0, push_event/2]).
3134

3235
-export([config_metrics/1]).
3336

3437
-ignore_xref([behaviour_info/1]).
3538

36-
%%--------------------------------------------------------------------
37-
%% Callbacks
38-
%%--------------------------------------------------------------------
39-
40-
-callback push_event(mongoose_acc:t(), event()) -> mongoose_acc:t().
41-
4239
%%--------------------------------------------------------------------
4340
%% API
4441
%%--------------------------------------------------------------------
4542

4643
%% @doc Pushes the event to each backend registered with the event_pusher.
4744
-spec push_event(mongoose_acc:t(), event()) -> mongoose_acc:t().
4845
push_event(Acc, Event) ->
49-
HostType = mongoose_acc:host_type(Acc),
50-
Backends = maps:keys(gen_mod:get_loaded_module_opts(HostType, ?MODULE)),
51-
lists:foldl(fun(B, Acc0) -> (backend_module(B)):push_event(Acc0, Event) end, Acc, Backends).
46+
#{acc := NewAcc} = mongoose_hooks:push_event(Acc, Event),
47+
NewAcc.
5248

5349
%%--------------------------------------------------------------------
5450
%% gen_mod API

src/event_pusher/mod_event_pusher_http.erl

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
-ignore_xref([behaviour_info/1]).
1212

1313
-behaviour(gen_mod).
14-
-behaviour(mod_event_pusher).
1514
-behaviour(mongoose_module_metrics).
1615

1716
-callback should_make_req(Acc :: mongoose_acc:t(),
@@ -39,7 +38,10 @@
3938
-include("jlib.hrl").
4039

4140
%% API
42-
-export([start/2, stop/1, config_spec/0, push_event/2, instrumentation/1]).
41+
-export([start/2, stop/1, hooks/1, config_spec/0, instrumentation/1]).
42+
43+
%% hook handlers
44+
-export([push_event/3]).
4345

4446
%% config spec callbacks
4547
-export([fix_path/1]).
@@ -58,6 +60,10 @@ start(_HostType, _Opts) ->
5860
stop(_HostType) ->
5961
ok.
6062

63+
-spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
64+
hooks(HostType) ->
65+
[{push_event, HostType, fun ?MODULE:push_event/3, #{}, 50}].
66+
6167
-spec config_spec() -> mongoose_config_spec:config_section().
6268
config_spec() ->
6369
#section{items = #{<<"handlers">> => #list{items = handler_config_spec(),
@@ -81,13 +87,16 @@ instrumentation(HostType) ->
8187
[{?SENT_METRIC, #{host_type => HostType},
8288
#{metrics => #{count => spiral, response_time => histogram, failure_count => spiral}}}].
8389

84-
push_event(Acc, #chat_event{direction = Dir, from = From, to = To, packet = Packet}) ->
85-
HostType = mongoose_acc:host_type(Acc),
90+
-spec push_event(mod_event_pusher:push_event_acc(), mod_event_pusher:push_event_params(),
91+
gen_hook:extra()) -> {ok, mod_event_pusher:push_event_acc()}.
92+
push_event(HookAcc, #{event := #chat_event{direction = Dir, from = From, to = To, packet = Packet}},
93+
#{host_type := HostType}) ->
94+
#{acc := Acc} = HookAcc,
8695
lists:map(fun(Opts) -> push_event(Acc, Dir, From, To, Packet, Opts) end,
8796
gen_mod:get_module_opt(HostType, ?MODULE, handlers)),
88-
Acc;
89-
push_event(Acc, _Event) ->
90-
Acc.
97+
{ok, HookAcc};
98+
push_event(HookAcc, _Params, _Extra) ->
99+
{ok, HookAcc}.
91100

92101
push_event(Acc, Dir, From, To, Packet, Opts = #{callback_module := Mod}) ->
93102
Body = exml_query:path(Packet, [{element, <<"body">>}, cdata], <<>>),

0 commit comments

Comments
 (0)