Skip to content

Commit b8fff1f

Browse files
authored
Merge pull request #4577 from esl/skip-messages-without-body
Skip all messages without body in mod_event_pusher_rabbit
2 parents 3a099dc + 6e5a6a6 commit b8fff1f

File tree

2 files changed

+112
-72
lines changed

2 files changed

+112
-72
lines changed

big_tests/tests/mod_event_pusher_rabbit_SUITE.erl

Lines changed: 94 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -54,50 +54,47 @@ all() ->
5454
].
5555

5656
groups() ->
57-
[
58-
{pool_startup, [],
59-
[
60-
rabbit_pool_starts_with_default_config
61-
]},
62-
{module_startup, [],
63-
[
64-
exchanges_are_created_on_module_startup
65-
]},
66-
{only_presence_module_startup, [],
67-
[
68-
only_presence_exchange_is_created_on_module_startup
69-
]},
70-
{presence_status_publish, [],
71-
[
72-
connected_users_push_presence_events_when_change_status,
73-
presence_messages_are_properly_formatted
74-
]},
75-
{only_presence_status_publish, [],
76-
[
77-
connected_users_push_presence_events_when_change_status,
78-
presence_messages_are_properly_formatted,
79-
messages_published_events_are_not_executed
80-
]},
81-
{chat_message_publish, [],
82-
[
83-
chat_message_sent_event,
84-
chat_message_sent_event_properly_formatted,
85-
chat_message_received_event,
86-
chat_message_received_event_properly_formatted
87-
]},
88-
{group_chat_message_publish, [],
89-
[
90-
group_chat_message_sent_event,
91-
group_chat_message_sent_event_properly_formatted,
92-
group_chat_message_received_event,
93-
group_chat_message_received_event_properly_formatted
94-
]},
95-
{instrumentation, [],
96-
[
97-
connections_events_are_executed,
98-
messages_published_events_are_executed
99-
]}
100-
].
57+
[{pool_startup, [], pool_startup_tests()},
58+
{module_startup, [], module_startup_tests()},
59+
{only_presence_module_startup, [], only_presence_module_startup_tests()},
60+
{presence_status_publish, [], presence_status_publish_tests()},
61+
{only_presence_status_publish, [], only_presence_status_publish_tests()},
62+
{chat_message_publish, [], chat_message_publish_tests()},
63+
{group_chat_message_publish, [], group_chat_message_publish_tests()},
64+
{instrumentation, [], instrumentation_tests()}].
65+
66+
pool_startup_tests() ->
67+
[rabbit_pool_starts_with_default_config].
68+
69+
module_startup_tests() ->
70+
[exchanges_are_created_on_module_startup].
71+
72+
only_presence_module_startup_tests() ->
73+
[only_presence_exchange_is_created_on_module_startup].
74+
75+
presence_status_publish_tests() ->
76+
[connected_users_push_presence_events_when_change_status,
77+
presence_messages_are_properly_formatted].
78+
79+
only_presence_status_publish_tests() ->
80+
[messages_published_events_are_not_executed | presence_status_publish_tests()].
81+
82+
chat_message_publish_tests() ->
83+
[chat_message_sent_event,
84+
chat_message_sent_event_properly_formatted,
85+
chat_message_received_event,
86+
chat_message_received_event_properly_formatted].
87+
88+
group_chat_message_publish_tests() ->
89+
[group_chat_message_sent_event,
90+
group_chat_message_sent_event_properly_formatted,
91+
group_chat_message_received_event,
92+
group_chat_message_received_event_properly_formatted,
93+
group_chat_displayed_marker_is_skipped].
94+
95+
instrumentation_tests() ->
96+
[connections_events_are_executed,
97+
messages_published_events_are_executed].
10198

10299
suite() ->
103100
escalus:suite().
@@ -460,6 +457,31 @@ group_chat_message_received_event_properly_formatted(Config) ->
460457
get_decoded_message_from_rabbit(AliceGroupChatMsgRecvRK))
461458
end).
462459

460+
group_chat_displayed_marker_is_skipped(Config) ->
461+
escalus:story(
462+
Config, [{alice, 1}],
463+
fun(Alice) ->
464+
%% GIVEN basic variables
465+
Room = ?config(room, Config),
466+
RoomAddr = muc_helper:room_address(Room),
467+
AliceJID = client_lower_short_jid(Alice),
468+
RoutingKeys = [group_chat_msg_sent_rk(AliceJID), group_chat_msg_recv_rk(AliceJID)],
469+
SentBindings = group_chat_msg_sent_bindings(?QUEUE_NAME, [AliceJID]),
470+
RecvBindings = group_chat_msg_recv_bindings(?QUEUE_NAME, [AliceJID]),
471+
%% GIVEN users in room
472+
escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room, nick(Alice))),
473+
% wait for all room stanzas to be processed
474+
escalus:wait_for_stanzas(Alice, 2),
475+
listen_to_events_from_rabbit(SentBindings ++ RecvBindings, Config),
476+
%% WHEN Alice sends a displayed marker (which is a groupchat message w/o body)
477+
Marker = escalus_stanza:chat_marker(RoomAddr, <<"displayed">>, <<"id-123">>),
478+
escalus:send(Alice, escalus_stanza:setattr(Marker, <<"type">>, <<"groupchat">>)),
479+
escalus:assert(is_chat_marker, [<<"displayed">>, <<"id-123">>],
480+
escalus:wait_for_stanza(Alice)),
481+
%% THEN there are no sent/recv events in Rabbit
482+
assert_no_message_from_rabbit(tl(RoutingKeys))
483+
end).
484+
463485
%%--------------------------------------------------------------------
464486
%% GROUP instrumentation
465487
%%--------------------------------------------------------------------
@@ -676,15 +698,28 @@ send_presence_stanza(User, NumOfMsgs) ->
676698
[escalus:send(User, escalus_stanza:presence(make_pres_type(X)))
677699
|| X <- lists:seq(1, NumOfMsgs)].
678700

679-
-spec get_decoded_message_from_rabbit(RoutingKey :: binary()) ->
680-
map() | no_return().
701+
-spec get_decoded_message_from_rabbit(RoutingKey :: binary()) -> map().
681702
get_decoded_message_from_rabbit(RoutingKey) ->
682703
receive
683704
{#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = Msg}} ->
684705
jiffy:decode(Msg, [return_maps])
685706
after
686-
5000 -> ct:fail(io_lib:format("Timeout when decoding message, rk=~p",
687-
[RoutingKey]))
707+
5000 -> ct:fail("Timeout when decoding message, rk=~p", [RoutingKey])
708+
end.
709+
710+
-spec assert_no_message_from_rabbit(RoutingKeys :: [binary()]) -> ok.
711+
assert_no_message_from_rabbit(RoutingKeys) ->
712+
receive
713+
{#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = Msg}} ->
714+
case lists:member(RoutingKey, RoutingKeys) of
715+
true ->
716+
ct:fail("Unexpected rabbit message, rk=~p~nmessage: ~p", [RoutingKey, Msg]);
717+
false ->
718+
ct:log("Skipping rabbit message, rk=~p,~nmessage:~p", [RoutingKey, Msg]),
719+
assert_no_message_from_rabbit(RoutingKeys)
720+
end
721+
after
722+
500 -> ok % To save time, this timeout is shorter than in the positive test
688723
end.
689724

690725
%%--------------------------------------------------------------------
@@ -736,21 +771,17 @@ client_lower_full_jid(Client) ->
736771

737772
nick(User) -> escalus_utils:get_username(User).
738773

739-
maybe_prepare_muc(TestCase, Config) when
740-
TestCase == group_chat_message_sent_event orelse
741-
TestCase == group_chat_message_received_event orelse
742-
TestCase == group_chat_message_sent_event_properly_formatted orelse
743-
TestCase == group_chat_message_received_event_properly_formatted ->
744-
prepare_muc(Config);
745-
maybe_prepare_muc(_, Config) -> Config.
746-
747-
maybe_cleanup_muc(TestCase, Config) when
748-
TestCase == group_chat_message_sent_event orelse
749-
TestCase == group_chat_message_received_event orelse
750-
TestCase == group_chat_message_sent_event_properly_formatted orelse
751-
TestCase == group_chat_message_received_event_properly_formatted ->
752-
cleanup_muc(Config);
753-
maybe_cleanup_muc(_, _) -> ok.
774+
maybe_prepare_muc(TestCase, Config) ->
775+
case lists:member(TestCase, group_chat_message_publish_tests()) of
776+
true -> prepare_muc(Config);
777+
false -> Config
778+
end.
779+
780+
maybe_cleanup_muc(TestCase, Config) ->
781+
case lists:member(TestCase, group_chat_message_publish_tests()) of
782+
true -> cleanup_muc(Config);
783+
false -> ok
784+
end.
754785

755786
prepare_muc(Config) ->
756787
[User | _] = ?config(escalus_users, Config),

src/event_pusher/mod_event_pusher_rabbit.erl

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,15 @@ create_exchange(HostType, #{name := ExName, type := Type}) ->
118118
-spec handle_event(mongooseim:host_type(), #user_status_event{} | #chat_event{},
119119
exchange_opts()) -> ok.
120120
handle_event(HostType, Event, ExchangeOpts = #{name := ExchangeName}) ->
121-
RoutingKey = routing_key(Event, ExchangeOpts),
122-
PublishMethod = mongoose_amqp:basic_publish(ExchangeName, RoutingKey),
123-
AMQPMessage = mongoose_amqp:message(iolist_to_binary(jiffy:encode(message(Event)))),
124-
cast_rabbit_worker(HostType, {amqp_publish, PublishMethod, AMQPMessage}).
121+
case message(Event) of
122+
Message = #{} ->
123+
RoutingKey = routing_key(Event, ExchangeOpts),
124+
PublishMethod = mongoose_amqp:basic_publish(ExchangeName, RoutingKey),
125+
AMQPMessage = mongoose_amqp:message(iolist_to_binary(jiffy:encode(Message))),
126+
cast_rabbit_worker(HostType, {amqp_publish, PublishMethod, AMQPMessage});
127+
skip ->
128+
ok
129+
end.
125130

126131
%%%===================================================================
127132
%%% Helpers
@@ -147,15 +152,19 @@ routing_key(#chat_event{direction = in, from = From}, #{sent_topic := Topic}) ->
147152
routing_key(#chat_event{direction = out, to = To}, #{recv_topic := Topic}) ->
148153
user_topic_routing_key(To, Topic).
149154

150-
-spec message(#user_status_event{} | #chat_event{}) -> #{atom() => binary()}.
155+
-spec message(#user_status_event{} | #chat_event{}) -> skip | #{atom() => binary()}.
151156
message(#user_status_event{jid = JID, status = Status}) ->
152157
#{user_id => jid:to_binary(jid:to_lower(JID)),
153158
present => is_user_online(Status)};
154159
message(#chat_event{packet = Packet, from = From, to = To}) ->
155-
MsgBody = exml_query:path(Packet, [{element, <<"body">>}, cdata]),
156-
#{to_user_id => jid:to_binary(jid:to_lower(To)),
157-
message => MsgBody,
158-
from_user_id => jid:to_binary(jid:to_lower(From))}.
160+
case exml_query:path(Packet, [{element, <<"body">>}, cdata]) of
161+
undefined ->
162+
skip; % skip (group)chat messages w/o body, e.g. displayed markers
163+
MsgBody ->
164+
#{from_user_id => jid:to_binary(jid:to_lower(From)),
165+
to_user_id => jid:to_binary(jid:to_lower(To)),
166+
message => MsgBody}
167+
end.
159168

160169
-spec event_to_key(mod_event_pusher:event()) -> {ok, exchange_key()} | skip.
161170
event_to_key(#user_status_event{}) -> {ok, presence_exchange};

0 commit comments

Comments
 (0)