summaryrefslogtreecommitdiff
blob: 5f5c4b6cb7b12602a10034be846ffaef1e7201b4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
From 64048b4c218099b6adcf46cd7b4d1dc9c658009e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= <edvin.torok@citrix.com>
Date: Wed, 12 Oct 2022 19:13:04 +0100
Subject: [PATCH 107/126] tools/ocaml: Limit maximum in-flight requests /
 outstanding replies
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Introduce a limit on the number of outstanding reply packets in the xenbus
queue.  This limits the number of in-flight requests: when the output queue is
full we'll stop processing inputs until the output queue has room again.

To avoid a busy loop on the Unix socket we only add it to the watched input
file descriptor set if we'd be able to call `input` on it.  Even though Dom0
is trusted and exempt from quotas a flood of events might cause a backlog
where events are produced faster than daemons in Dom0 can consume them, which
could lead to an unbounded queue size and OOM.

Therefore the xenbus queue limit must apply to all connections, Dom0 is not
exempt from it, although if everything works correctly it will eventually
catch up.

This prevents a malicious guest from sending more commands while it has
outstanding watch events or command replies in its input ring.  However if it
can cause the generation of watch events by other means (e.g. by Dom0, or
another cooperative guest) and stop reading its own ring then watch events
would've queued up without limit.

The xenstore protocol doesn't have a back-pressure mechanism, and doesn't
allow dropping watch events.  In fact, dropping watch events is known to break
some pieces of normal functionality.  This leaves little choice to safely
implement the xenstore protocol without exposing the xenstore daemon to
out-of-memory attacks.

Implement the fix as pipes with bounded buffers:
* Use a bounded buffer for watch events
* The watch structure will have a bounded receiving pipe of watch events
* The source will have an "overflow" pipe of pending watch events it couldn't
  deliver

Items are queued up on one end and are sent as far along the pipe as possible:

  source domain -> watch -> xenbus of target -> xenstore ring/socket of target

If the pipe is "full" at any point then back-pressure is applied and we prevent
more items from being queued up.  For the source domain this means that we'll
stop accepting new commands as long as its pipe buffer is not empty.

Before we try to enqueue an item we first check whether it is possible to send
it further down the pipe, by attempting to recursively flush the pipes. This
ensures that we retain the order of events as much as possible.

We might break causality of watch events if the target domain's queue is full
and we need to start using the watch's queue.  This is a breaking change in
the xenstore protocol, but only for domains which are not processing their
incoming ring as expected.

When a watch is deleted its entire pending queue is dropped (no code is needed
for that, because it is part of the 'watch' type).

There is a cache of watches that have pending events that we attempt to flush
at every cycle if possible.

Introduce 3 limits here:
* quota-maxwatchevents on watch event destination: when this is hit the
  source will not be allowed to queue up more watch events.
* quota-maxoustanding which is the number of responses not read from the ring:
  once exceeded, no more inputs are processed until all outstanding replies
  are consumed by the client.
* overflow queue on the watch event source: all watches that cannot be stored
  on destination are queued up here, a single command can trigger multiple
  watches (e.g. due to recursion).

The overflow queue currently doesn't have an upper bound, it is difficult to
accurately calculate one as it depends on whether you are Dom0 and how many
watches each path has registered and how many watch events you can trigger
with a single command (e.g. a commit).  However these events were already
using memory, this just moves them elsewhere, and as long as we correctly
block a domain it shouldn't result in unbounded memory usage.

Note that Dom0 is not excluded from these checks, it is important that Dom0 is
especially not excluded when it is the source, since there are many ways in
which a guest could trigger Dom0 to send it watch events.

This should protect against malicious frontends as long as the backend follows
the PV xenstore protocol and only exposes paths needed by the frontend, and
changes those paths at most once as a reaction to guest events, or protocol
state.

The queue limits are per watch, and per domain-pair, so even if one
communication channel would be "blocked", others would keep working, and the
domain itself won't get blocked as long as it doesn't overflow the queue of
watch events.

Similarly a malicious backend could cause the frontend to get blocked, but
this watch queue protects the frontend as well as long as it follows the PV
protocol.  (Although note that protection against malicious backends is only a
best effort at the moment)

This is part of XSA-326 / CVE-2022-42318.

Signed-off-by: Edwin Török <edvin.torok@citrix.com>
Acked-by: Christian Lindig <christian.lindig@citrix.com>
(cherry picked from commit 9284ae0c40fb5b9606947eaaec23dc71d0540e96)
---
 tools/ocaml/libs/xb/xb.ml                |  61 +++++++--
 tools/ocaml/libs/xb/xb.mli               |  11 +-
 tools/ocaml/libs/xs/queueop.ml           |  25 ++--
 tools/ocaml/libs/xs/xsraw.ml             |   4 +-
 tools/ocaml/xenstored/connection.ml      | 155 +++++++++++++++++++++--
 tools/ocaml/xenstored/connections.ml     |  57 +++++++--
 tools/ocaml/xenstored/define.ml          |   7 +
 tools/ocaml/xenstored/oxenstored.conf.in |   2 +
 tools/ocaml/xenstored/process.ml         |  31 ++++-
 tools/ocaml/xenstored/xenstored.ml       |   2 +
 10 files changed, 296 insertions(+), 59 deletions(-)

diff --git a/tools/ocaml/libs/xb/xb.ml b/tools/ocaml/libs/xb/xb.ml
index 4197a3888a68..b292ed7a874d 100644
--- a/tools/ocaml/libs/xb/xb.ml
+++ b/tools/ocaml/libs/xb/xb.ml
@@ -134,14 +134,44 @@ type backend = Fd of backend_fd | Xenmmap of backend_mmap
 
 type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes
 
+(*
+	separate capacity reservation for replies and watch events:
+	this allows a domain to keep working even when under a constant flood of
+	watch events
+*)
+type capacity = { maxoutstanding: int; maxwatchevents: int }
+
+module Queue = BoundedQueue
+
+type packet_class =
+	| CommandReply
+	| Watchevent
+
+let string_of_packet_class = function
+	| CommandReply -> "command_reply"
+	| Watchevent -> "watch_event"
+
 type t =
 {
 	backend: backend;
-	pkt_out: Packet.t Queue.t;
+	pkt_out: (Packet.t, packet_class) Queue.t;
 	mutable partial_in: partial_buf;
 	mutable partial_out: string;
+	capacity: capacity
 }
 
+let to_read con =
+	match con.partial_in with
+		| HaveHdr partial_pkt -> Partial.to_complete partial_pkt
+		| NoHdr   (i, _)    -> i
+
+let debug t =
+	Printf.sprintf "XenBus state: partial_in: %d needed, partial_out: %d bytes, pkt_out: %d packets, %s"
+		(to_read t)
+		(String.length t.partial_out)
+		(Queue.length t.pkt_out)
+		(BoundedQueue.debug string_of_packet_class t.pkt_out)
+
 let init_partial_in () = NoHdr
 	(Partial.header_size (), Bytes.make (Partial.header_size()) '\000')
 
@@ -199,7 +229,8 @@ let output con =
 	let s = if String.length con.partial_out > 0 then
 			con.partial_out
 		else if Queue.length con.pkt_out > 0 then
-			Packet.to_string (Queue.pop con.pkt_out)
+			let pkt = Queue.pop con.pkt_out in
+			Packet.to_string pkt
 		else
 			"" in
 	(* send data from s, and save the unsent data to partial_out *)
@@ -212,12 +243,15 @@ let output con =
 	(* after sending one packet, partial is empty *)
 	con.partial_out = ""
 
+(* we can only process an input packet if we're guaranteed to have room
+   to store the response packet *)
+let can_input con = Queue.can_push con.pkt_out CommandReply
+
 (* NB: can throw Reconnect *)
 let input con =
-	let to_read =
-		match con.partial_in with
-		| HaveHdr partial_pkt -> Partial.to_complete partial_pkt
-		| NoHdr   (i, _)    -> i in
+	if not (can_input con) then None
+	else
+	let to_read = to_read con in
 
 	(* try to get more data from input stream *)
 	let b = Bytes.make to_read '\000' in
@@ -243,11 +277,22 @@ let input con =
 		None
 	)
 
-let newcon backend = {
+let classify t =
+	match t.Packet.ty with
+	| Op.Watchevent -> Watchevent
+	| _ -> CommandReply
+
+let newcon ~capacity backend =
+	let limit = function
+		| CommandReply -> capacity.maxoutstanding
+		| Watchevent -> capacity.maxwatchevents
+	in
+	{
 	backend = backend;
-	pkt_out = Queue.create ();
+	pkt_out = Queue.create ~capacity:(capacity.maxoutstanding + capacity.maxwatchevents) ~classify ~limit;
 	partial_in = init_partial_in ();
 	partial_out = "";
+	capacity = capacity;
 	}
 
 let open_fd fd = newcon (Fd { fd = fd; })
diff --git a/tools/ocaml/libs/xb/xb.mli b/tools/ocaml/libs/xb/xb.mli
index 91c682162cea..71b2754ca788 100644
--- a/tools/ocaml/libs/xb/xb.mli
+++ b/tools/ocaml/libs/xb/xb.mli
@@ -66,10 +66,11 @@ type backend_mmap = {
 type backend_fd = { fd : Unix.file_descr; }
 type backend = Fd of backend_fd | Xenmmap of backend_mmap
 type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes
+type capacity = { maxoutstanding: int; maxwatchevents: int }
 type t
 val init_partial_in : unit -> partial_buf
 val reconnect : t -> unit
-val queue : t -> Packet.t -> unit
+val queue : t -> Packet.t -> unit option
 val read_fd : backend_fd -> 'a -> bytes -> int -> int
 val read_mmap : backend_mmap -> 'a -> bytes -> int -> int
 val read : t -> bytes -> int -> int
@@ -78,13 +79,14 @@ val write_mmap : backend_mmap -> 'a -> string -> int -> int
 val write : t -> string -> int -> int
 val output : t -> bool
 val input : t -> Packet.t option
-val newcon : backend -> t
-val open_fd : Unix.file_descr -> t
-val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> t
+val newcon : capacity:capacity -> backend -> t
+val open_fd : Unix.file_descr -> capacity:capacity -> t
+val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> capacity:capacity -> t
 val close : t -> unit
 val is_fd : t -> bool
 val is_mmap : t -> bool
 val output_len : t -> int
+val can_input: t -> bool
 val has_new_output : t -> bool
 val has_old_output : t -> bool
 val has_output : t -> bool
@@ -93,3 +95,4 @@ val has_partial_input : t -> bool
 val has_more_input : t -> bool
 val is_selectable : t -> bool
 val get_fd : t -> Unix.file_descr
+val debug: t -> string
diff --git a/tools/ocaml/libs/xs/queueop.ml b/tools/ocaml/libs/xs/queueop.ml
index 9ff5bbd529ce..4e532cdaeacb 100644
--- a/tools/ocaml/libs/xs/queueop.ml
+++ b/tools/ocaml/libs/xs/queueop.ml
@@ -16,9 +16,10 @@
 open Xenbus
 
 let data_concat ls = (String.concat "\000" ls) ^ "\000"
+let queue con pkt = let r = Xb.queue con pkt in assert (r <> None)
 let queue_path ty (tid: int) (path: string) con =
 	let data = data_concat [ path; ] in
-	Xb.queue con (Xb.Packet.create tid 0 ty data)
+	queue con (Xb.Packet.create tid 0 ty data)
 
 (* operations *)
 let directory tid path con = queue_path Xb.Op.Directory tid path con
@@ -27,48 +28,48 @@ let read tid path con = queue_path Xb.Op.Read tid path con
 let getperms tid path con = queue_path Xb.Op.Getperms tid path con
 
 let debug commands con =
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands))
+	queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands))
 
 let watch path data con =
 	let data = data_concat [ path; data; ] in
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Watch data)
+	queue con (Xb.Packet.create 0 0 Xb.Op.Watch data)
 
 let unwatch path data con =
 	let data = data_concat [ path; data; ] in
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data)
+	queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data)
 
 let transaction_start con =
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat []))
+	queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat []))
 
 let transaction_end tid commit con =
 	let data = data_concat [ (if commit then "T" else "F"); ] in
-	Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data)
+	queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data)
 
 let introduce domid mfn port con =
 	let data = data_concat [ Printf.sprintf "%u" domid;
 	                         Printf.sprintf "%nu" mfn;
 	                         string_of_int port; ] in
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data)
+	queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data)
 
 let release domid con =
 	let data = data_concat [ Printf.sprintf "%u" domid; ] in
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Release data)
+	queue con (Xb.Packet.create 0 0 Xb.Op.Release data)
 
 let resume domid con =
 	let data = data_concat [ Printf.sprintf "%u" domid; ] in
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Resume data)
+	queue con (Xb.Packet.create 0 0 Xb.Op.Resume data)
 
 let getdomainpath domid con =
 	let data = data_concat [ Printf.sprintf "%u" domid; ] in
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data)
+	queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data)
 
 let write tid path value con =
 	let data = path ^ "\000" ^ value (* no NULL at the end *) in
-	Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Write data)
+	queue con (Xb.Packet.create tid 0 Xb.Op.Write data)
 
 let mkdir tid path con = queue_path Xb.Op.Mkdir tid path con
 let rm tid path con = queue_path Xb.Op.Rm tid path con
 
 let setperms tid path perms con =
 	let data = data_concat [ path; perms ] in
-	Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data)
+	queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data)
diff --git a/tools/ocaml/libs/xs/xsraw.ml b/tools/ocaml/libs/xs/xsraw.ml
index 451f8b38dbcc..cbd17280600c 100644
--- a/tools/ocaml/libs/xs/xsraw.ml
+++ b/tools/ocaml/libs/xs/xsraw.ml
@@ -36,8 +36,10 @@ type con = {
 let close con =
 	Xb.close con.xb
 
+let capacity = { Xb.maxoutstanding = 1; maxwatchevents = 0; }
+
 let open_fd fd = {
-	xb = Xb.open_fd fd;
+	xb = Xb.open_fd ~capacity fd;
 	watchevents = Queue.create ();
 }
 
diff --git a/tools/ocaml/xenstored/connection.ml b/tools/ocaml/xenstored/connection.ml
index cc20e047d2b9..9624a5f9da2c 100644
--- a/tools/ocaml/xenstored/connection.ml
+++ b/tools/ocaml/xenstored/connection.ml
@@ -20,12 +20,84 @@ open Stdext
 
 let xenstore_payload_max = 4096 (* xen/include/public/io/xs_wire.h *)
 
+type 'a bounded_sender = 'a -> unit option
+(** a bounded sender accepts an ['a] item and returns:
+    None - if there is no room to accept the item
+    Some () -  if it has successfully accepted/sent the item
+ *)
+
+module BoundedPipe : sig
+	type 'a t
+
+	(** [create ~capacity ~destination] creates a bounded pipe with a
+	    local buffer holding at most [capacity] items.  Once the buffer is
+	    full it will not accept further items.  items from the pipe are
+	    flushed into [destination] as long as it accepts items.  The
+	    destination could be another pipe.
+	 *)
+	val create: capacity:int -> destination:'a bounded_sender -> 'a t
+
+	(** [is_empty t] returns whether the local buffer of [t] is empty. *)
+	val is_empty : _ t -> bool
+
+	(** [length t] the number of items in the internal buffer *)
+	val length: _ t -> int
+
+	(** [flush_pipe t] sends as many items from the local buffer as possible,
+			which could be none. *)
+	val flush_pipe: _ t -> unit
+
+	(** [push t item] tries to [flush_pipe] and then push [item]
+	    into the pipe if its [capacity] allows.
+	    Returns [None] if there is no more room
+	 *)
+	val push : 'a t -> 'a bounded_sender
+end = struct
+	(* items are enqueued in [q], and then flushed to [connect_to] *)
+	type 'a t =
+		{ q: 'a Queue.t
+		; destination: 'a bounded_sender
+		; capacity: int
+		}
+
+	let create ~capacity ~destination =
+		{ q = Queue.create (); capacity; destination }
+
+	let rec flush_pipe t =
+		if not Queue.(is_empty t.q) then
+			let item = Queue.peek t.q in
+			match t.destination item with
+			| None -> () (* no room *)
+			| Some () ->
+				(* successfully sent item to next stage *)
+				let _ = Queue.pop t.q in
+				(* continue trying to send more items *)
+				flush_pipe t
+
+	let push t item =
+		(* first try to flush as many items from this pipe as possible to make room,
+		   it is important to do this first to preserve the order of the items
+		 *)
+		flush_pipe t;
+		if Queue.length t.q < t.capacity then begin
+			(* enqueue, instead of sending directly.
+			   this ensures that [out] sees the items in the same order as we receive them
+			 *)
+			Queue.push item t.q;
+			Some (flush_pipe t)
+		end else None
+
+	let is_empty t = Queue.is_empty t.q
+	let length t = Queue.length t.q
+end
+
 type watch = {
 	con: t;
 	token: string;
 	path: string;
 	base: string;
 	is_relative: bool;
+	pending_watchevents: Xenbus.Xb.Packet.t BoundedPipe.t;
 }
 
 and t = {
@@ -38,8 +110,36 @@ and t = {
 	anonid: int;
 	mutable stat_nb_ops: int;
 	mutable perm: Perms.Connection.t;
+	pending_source_watchevents: (watch * Xenbus.Xb.Packet.t) BoundedPipe.t
 }
 
+module Watch = struct
+	module T = struct
+		type t = watch
+
+		let compare w1 w2 =
+			(* cannot compare watches from different connections *)
+			assert (w1.con == w2.con);
+			match String.compare w1.token w2.token with
+			| 0 -> String.compare w1.path w2.path
+			| n -> n
+	end
+	module Set = Set.Make(T)
+
+	let flush_events t =
+		BoundedPipe.flush_pipe t.pending_watchevents;
+		not (BoundedPipe.is_empty t.pending_watchevents)
+
+	let pending_watchevents t =
+		BoundedPipe.length t.pending_watchevents
+end
+
+let source_flush_watchevents t =
+	BoundedPipe.flush_pipe t.pending_source_watchevents
+
+let source_pending_watchevents t =
+	BoundedPipe.length t.pending_source_watchevents
+
 let mark_as_bad con =
 	match con.dom with
 	|None -> ()
@@ -67,7 +167,8 @@ let watch_create ~con ~path ~token = {
 	token = token;
 	path = path;
 	base = get_path con;
-	is_relative = path.[0] <> '/' && path.[0] <> '@'
+	is_relative = path.[0] <> '/' && path.[0] <> '@';
+	pending_watchevents = BoundedPipe.create ~capacity:!Define.maxwatchevents ~destination:(Xenbus.Xb.queue con.xb)
 }
 
 let get_con w = w.con
@@ -93,6 +194,9 @@ let make_perm dom =
 	Perms.Connection.create ~perms:[Perms.READ; Perms.WRITE] domid
 
 let create xbcon dom =
+	let destination (watch, pkt) =
+		BoundedPipe.push watch.pending_watchevents pkt
+	in
 	let id =
 		match dom with
 		| None -> let old = !anon_id_next in incr anon_id_next; old
@@ -109,6 +213,16 @@ let create xbcon dom =
 	anonid = id;
 	stat_nb_ops = 0;
 	perm = make_perm dom;
+
+	(* the actual capacity will be lower, this is used as an overflow
+	   buffer: anything that doesn't fit elsewhere gets put here, only
+	   limited by the amount of watches that you can generate with a
+	   single xenstore command (which is finite, although possibly very
+	   large in theory for Dom0).  Once the pipe here has any contents the
+	   domain is blocked from sending more commands until it is empty
+	   again though.
+	 *)
+	pending_source_watchevents = BoundedPipe.create ~capacity:Sys.max_array_length ~destination
 	}
 	in
 	Logging.new_connection ~tid:Transaction.none ~con:(get_domstr con);
@@ -127,11 +241,17 @@ let set_target con target_domid =
 
 let is_backend_mmap con = Xenbus.Xb.is_mmap con.xb
 
-let send_reply con tid rid ty data =
+let packet_of con tid rid ty data =
 	if (String.length data) > xenstore_payload_max && (is_backend_mmap con) then
-		Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000")
+		Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000"
 	else
-		Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid ty data)
+		Xenbus.Xb.Packet.create tid rid ty data
+
+let send_reply con tid rid ty data =
+	let result = Xenbus.Xb.queue con.xb (packet_of con tid rid ty data) in
+	(* should never happen: we only process an input packet when there is room for an output packet *)
+	(* and the limit for replies is different from the limit for watch events *)
+	assert (result <> None)
 
 let send_error con tid rid err = send_reply con tid rid Xenbus.Xb.Op.Error (err ^ "\000")
 let send_ack con tid rid ty = send_reply con tid rid ty "OK\000"
@@ -181,11 +301,11 @@ let del_watch con path token =
 	apath, w
 
 let del_watches con =
-  Hashtbl.clear con.watches;
+  Hashtbl.reset con.watches;
   con.nb_watches <- 0
 
 let del_transactions con =
-  Hashtbl.clear con.transactions
+  Hashtbl.reset con.transactions
 
 let list_watches con =
 	let ll = Hashtbl.fold
@@ -208,21 +328,29 @@ let lookup_watch_perm path = function
 let lookup_watch_perms oldroot root path =
 	lookup_watch_perm path oldroot @ lookup_watch_perm path (Some root)
 
-let fire_single_watch_unchecked watch =
+let fire_single_watch_unchecked source watch =
 	let data = Utils.join_by_null [watch.path; watch.token; ""] in
-	send_reply watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data
+	let pkt = packet_of watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data in
 
-let fire_single_watch (oldroot, root) watch =
+	match BoundedPipe.push source.pending_source_watchevents (watch, pkt) with
+	| Some () -> () (* packet queued *)
+	| None ->
+			(* a well behaved Dom0 shouldn't be able to trigger this,
+			   if it happens it is likely a Dom0 bug causing runaway memory usage
+			 *)
+			failwith "watch event overflow, cannot happen"
+
+let fire_single_watch source (oldroot, root) watch =
 	let abspath = get_watch_path watch.con watch.path |> Store.Path.of_string in
 	let perms = lookup_watch_perms oldroot root abspath in
 	if Perms.can_fire_watch watch.con.perm perms then
-		fire_single_watch_unchecked watch
+		fire_single_watch_unchecked source watch
 	else
 		let perms = perms |> List.map (Perms.Node.to_string ~sep:" ") |> String.concat ", " in
 		let con = get_domstr watch.con in
 		Logging.watch_not_fired ~con perms (Store.Path.to_string abspath)
 
-let fire_watch roots watch path =
+let fire_watch source roots watch path =
 	let new_path =
 		if watch.is_relative && path.[0] = '/'
 		then begin
@@ -232,7 +360,7 @@ let fire_watch roots watch path =
 		end else
 			path
 	in
-	fire_single_watch roots { watch with path = new_path }
+	fire_single_watch source roots { watch with path = new_path }
 
 (* Search for a valid unused transaction id. *)
 let rec valid_transaction_id con proposed_id =
@@ -280,6 +408,7 @@ let do_input con = Xenbus.Xb.input con.xb
 let has_partial_input con = Xenbus.Xb.has_partial_input con.xb
 let has_more_input con = Xenbus.Xb.has_more_input con.xb
 
+let can_input con = Xenbus.Xb.can_input con.xb && BoundedPipe.is_empty con.pending_source_watchevents
 let has_output con = Xenbus.Xb.has_output con.xb
 let has_old_output con = Xenbus.Xb.has_old_output con.xb
 let has_new_output con = Xenbus.Xb.has_new_output con.xb
@@ -323,7 +452,7 @@ let prevents_live_update con = not (is_bad con)
 	&& (has_extra_connection_data con || has_transaction_data con)
 
 let has_more_work con =
-	has_more_input con || not (has_old_output con) && has_new_output con
+	(has_more_input con && can_input con) || not (has_old_output con) && has_new_output con
 
 let incr_ops con = con.stat_nb_ops <- con.stat_nb_ops + 1
 
diff --git a/tools/ocaml/xenstored/connections.ml b/tools/ocaml/xenstored/connections.ml
index 3c7429fe7f61..7d68c583b43a 100644
--- a/tools/ocaml/xenstored/connections.ml
+++ b/tools/ocaml/xenstored/connections.ml
@@ -22,22 +22,30 @@ type t = {
 	domains: (int, Connection.t) Hashtbl.t;
 	ports: (Xeneventchn.t, Connection.t) Hashtbl.t;
 	mutable watches: Connection.watch list Trie.t;
+	mutable has_pending_watchevents: Connection.Watch.Set.t
 }
 
 let create () = {
 	anonymous = Hashtbl.create 37;
 	domains = Hashtbl.create 37;
 	ports = Hashtbl.create 37;
-	watches = Trie.create ()
+	watches = Trie.create ();
+	has_pending_watchevents = Connection.Watch.Set.empty;
 }
 
+let get_capacity () =
+	(* not multiplied by maxwatch on purpose: 2nd queue in watch itself! *)
+	{ Xenbus.Xb.maxoutstanding = !Define.maxoutstanding; maxwatchevents = !Define.maxwatchevents }
+
 let add_anonymous cons fd =
-	let xbcon = Xenbus.Xb.open_fd fd in
+	let capacity = get_capacity () in
+	let xbcon = Xenbus.Xb.open_fd fd ~capacity in
 	let con = Connection.create xbcon None in
 	Hashtbl.add cons.anonymous (Xenbus.Xb.get_fd xbcon) con
 
 let add_domain cons dom =
-	let xbcon = Xenbus.Xb.open_mmap (Domain.get_interface dom) (fun () -> Domain.notify dom) in
+	let capacity = get_capacity () in
+	let xbcon = Xenbus.Xb.open_mmap ~capacity (Domain.get_interface dom) (fun () -> Domain.notify dom) in
 	let con = Connection.create xbcon (Some dom) in
 	Hashtbl.add cons.domains (Domain.get_id dom) con;
 	match Domain.get_port dom with
@@ -48,7 +56,9 @@ let select ?(only_if = (fun _ -> true)) cons =
 	Hashtbl.fold (fun _ con (ins, outs) ->
 		if (only_if con) then (
 			let fd = Connection.get_fd con in
-			(fd :: ins,  if Connection.has_output con then fd :: outs else outs)
+			let in_fds = if Connection.can_input con then fd :: ins else ins in
+			let out_fds = if Connection.has_output con then fd :: outs else outs in
+			in_fds, out_fds
 		) else (ins, outs)
 	)
 	cons.anonymous ([], [])
@@ -67,10 +77,17 @@ let del_watches_of_con con watches =
 	| [] -> None
 	| ws -> Some ws
 
+let del_watches cons con =
+	Connection.del_watches con;
+	cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
+	cons.has_pending_watchevents <-
+		cons.has_pending_watchevents |> Connection.Watch.Set.filter @@ fun w ->
+		Connection.get_con w != con
+
 let del_anonymous cons con =
 	try
 		Hashtbl.remove cons.anonymous (Connection.get_fd con);
-		cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
+		del_watches cons con;
 		Connection.close con
 	with exn ->
 		debug "del anonymous %s" (Printexc.to_string exn)
@@ -85,7 +102,7 @@ let del_domain cons id =
 		    | Some p -> Hashtbl.remove cons.ports p
 		    | None -> ())
 		 | None -> ());
-		cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
+		del_watches cons con;
 		Connection.close con
 	with exn ->
 		debug "del domain %u: %s" id (Printexc.to_string exn)
@@ -136,31 +153,33 @@ let del_watch cons con path token =
 		cons.watches <- Trie.set cons.watches key watches;
  	watch
 
-let del_watches cons con =
-	Connection.del_watches con;
-	cons.watches <- Trie.map (del_watches_of_con con) cons.watches
-
 (* path is absolute *)
-let fire_watches ?oldroot root cons path recurse =
+let fire_watches ?oldroot source root cons path recurse =
 	let key = key_of_path path in
 	let path = Store.Path.to_string path in
 	let roots = oldroot, root in
 	let fire_watch _ = function
 		| None         -> ()
-		| Some watches -> List.iter (fun w -> Connection.fire_watch roots w path) watches
+		| Some watches -> List.iter (fun w -> Connection.fire_watch source roots w path) watches
 	in
 	let fire_rec _x = function
 		| None         -> ()
 		| Some watches ->
-			List.iter (Connection.fire_single_watch roots) watches
+			List.iter (Connection.fire_single_watch source roots) watches
 	in
 	Trie.iter_path fire_watch cons.watches key;
 	if recurse then
 		Trie.iter fire_rec (Trie.sub cons.watches key)
 
+let send_watchevents cons con =
+	cons.has_pending_watchevents <-
+		cons.has_pending_watchevents |> Connection.Watch.Set.filter Connection.Watch.flush_events;
+	Connection.source_flush_watchevents con
+
 let fire_spec_watches root cons specpath =
+	let source = find_domain cons 0 in
 	iter cons (fun con ->
-		List.iter (Connection.fire_single_watch (None, root)) (Connection.get_watches con specpath))
+		List.iter (Connection.fire_single_watch source (None, root)) (Connection.get_watches con specpath))
 
 let set_target cons domain target_domain =
 	let con = find_domain cons domain in
@@ -197,6 +216,16 @@ let debug cons =
 	let domains = Hashtbl.fold (fun _ con accu -> Connection.debug con :: accu) cons.domains [] in
 	String.concat "" (domains @ anonymous)
 
+let debug_watchevents cons con =
+	(* == (physical equality)
+	   has to be used here because w.con.xb.backend might contain a [unit->unit] value causing regular
+	   comparison to fail due to having a 'functional value' which cannot be compared.
+	 *)
+	let s = cons.has_pending_watchevents |> Connection.Watch.Set.filter (fun w -> w.con == con) in
+	let pending = s |> Connection.Watch.Set.elements
+		|> List.map (fun w -> Connection.Watch.pending_watchevents w) |> List.fold_left (+) 0 in
+	Printf.sprintf "Watches with pending events: %d, pending events total: %d" (Connection.Watch.Set.cardinal s) pending
+
 let filter ~f cons =
 	let fold _ v acc = if f v then v :: acc else acc in
 	[]
diff --git a/tools/ocaml/xenstored/define.ml b/tools/ocaml/xenstored/define.ml
index ba63a8147e09..327b6d795ec7 100644
--- a/tools/ocaml/xenstored/define.ml
+++ b/tools/ocaml/xenstored/define.ml
@@ -24,6 +24,13 @@ let default_config_dir = Paths.xen_config_dir
 let maxwatch = ref (100)
 let maxtransaction = ref (10)
 let maxrequests = ref (1024)   (* maximum requests per transaction *)
+let maxoutstanding = ref (1024) (* maximum outstanding requests, i.e. in-flight requests / domain *)
+let maxwatchevents = ref (1024)
+(*
+	maximum outstanding watch events per watch,
+	recommended >= maxoutstanding to avoid blocking backend transactions due to
+	malicious frontends
+ *)
 
 let gc_max_overhead = ref 120 (* 120% see comment in xenstored.ml *)
 let conflict_burst_limit = ref 5.0
diff --git a/tools/ocaml/xenstored/oxenstored.conf.in b/tools/ocaml/xenstored/oxenstored.conf.in
index 4ae48e42d47d..9d034e744b4b 100644
--- a/tools/ocaml/xenstored/oxenstored.conf.in
+++ b/tools/ocaml/xenstored/oxenstored.conf.in
@@ -62,6 +62,8 @@ quota-maxwatch = 100
 quota-transaction = 10
 quota-maxrequests = 1024
 quota-path-max = 1024
+quota-maxoutstanding = 1024
+quota-maxwatchevents = 1024
 
 # Activate filed base backend
 persistent = false
diff --git a/tools/ocaml/xenstored/process.ml b/tools/ocaml/xenstored/process.ml
index cbf708213796..ce39ce28b5f3 100644
--- a/tools/ocaml/xenstored/process.ml
+++ b/tools/ocaml/xenstored/process.ml
@@ -57,7 +57,7 @@ let split_one_path data con =
 	| path :: "" :: [] -> Store.Path.create path (Connection.get_path con)
 	| _                -> raise Invalid_Cmd_Args
 
-let process_watch t cons =
+let process_watch source t cons =
 	let oldroot = t.Transaction.oldroot in
 	let newroot = Store.get_root t.store in
 	let ops = Transaction.get_paths t |> List.rev in
@@ -67,8 +67,9 @@ let process_watch t cons =
 		| Xenbus.Xb.Op.Rm       -> true, None, oldroot
 		| Xenbus.Xb.Op.Setperms -> false, Some oldroot, newroot
 		| _              -> raise (Failure "huh ?") in
-		Connections.fire_watches ?oldroot root cons (snd op) recurse in
-	List.iter (fun op -> do_op_watch op cons) ops
+		Connections.fire_watches ?oldroot source root cons (snd op) recurse in
+	List.iter (fun op -> do_op_watch op cons) ops;
+	Connections.send_watchevents cons source
 
 let create_implicit_path t perm path =
 	let dirname = Store.Path.get_parent path in
@@ -234,6 +235,20 @@ let do_debug con t _domains cons data =
 	| "watches" :: _ ->
 		let watches = Connections.debug cons in
 		Some (watches ^ "\000")
+	| "xenbus" :: domid :: _ ->
+		let domid = int_of_string domid in
+		let con = Connections.find_domain cons domid in
+		let s = Printf.sprintf "xenbus: %s; overflow queue length: %d, can_input: %b, has_more_input: %b, has_old_output: %b, has_new_output: %b, has_more_work: %b. pending: %s"
+			(Xenbus.Xb.debug con.xb)
+			(Connection.source_pending_watchevents con)
+			(Connection.can_input con)
+			(Connection.has_more_input con)
+			(Connection.has_old_output con)
+			(Connection.has_new_output con)
+			(Connection.has_more_work con)
+			(Connections.debug_watchevents cons con)
+		in
+		Some s
 	| "mfn" :: domid :: _ ->
 		let domid = int_of_string domid in
 		let con = Connections.find_domain cons domid in
@@ -342,7 +357,7 @@ let reply_ack fct con t doms cons data =
 	fct con t doms cons data;
 	Packet.Ack (fun () ->
 		if Transaction.get_id t = Transaction.none then
-			process_watch t cons
+			process_watch con t cons
 	)
 
 let reply_data fct con t doms cons data =
@@ -501,7 +516,7 @@ let do_watch con t _domains cons data =
 	Packet.Ack (fun () ->
 		(* xenstore.txt says this watch is fired immediately,
 		   implying even if path doesn't exist or is unreadable *)
-		Connection.fire_single_watch_unchecked watch)
+		Connection.fire_single_watch_unchecked con watch)
 
 let do_unwatch con _t _domains cons data =
 	let (node, token) =
@@ -532,7 +547,7 @@ let do_transaction_end con t domains cons data =
 	if not success then
 		raise Transaction_again;
 	if commit then begin
-		process_watch t cons;
+		process_watch con t cons;
 		match t.Transaction.ty with
 		| Transaction.No ->
 			() (* no need to record anything *)
@@ -700,7 +715,8 @@ let process_packet ~store ~cons ~doms ~con ~req =
 let do_input store cons doms con =
 	let newpacket =
 		try
-			Connection.do_input con
+			if Connection.can_input con then Connection.do_input con
+			else None
 		with Xenbus.Xb.Reconnect ->
 			info "%s requests a reconnect" (Connection.get_domstr con);
 			History.reconnect con;
@@ -728,6 +744,7 @@ let do_input store cons doms con =
 		Connection.incr_ops con
 
 let do_output _store _cons _doms con =
+	Connection.source_flush_watchevents con;
 	if Connection.has_output con then (
 		if Connection.has_new_output con then (
 			let packet = Connection.peek_output con in
diff --git a/tools/ocaml/xenstored/xenstored.ml b/tools/ocaml/xenstored/xenstored.ml
index 3b57ad016dfb..c799e20f1145 100644
--- a/tools/ocaml/xenstored/xenstored.ml
+++ b/tools/ocaml/xenstored/xenstored.ml
@@ -103,6 +103,8 @@ let parse_config filename =
 		("quota-maxentity", Config.Set_int Quota.maxent);
 		("quota-maxsize", Config.Set_int Quota.maxsize);
 		("quota-maxrequests", Config.Set_int Define.maxrequests);
+		("quota-maxoutstanding", Config.Set_int Define.maxoutstanding);
+		("quota-maxwatchevents", Config.Set_int Define.maxwatchevents);
 		("quota-path-max", Config.Set_int Define.path_max);
 		("gc-max-overhead", Config.Set_int Define.gc_max_overhead);
 		("test-eagain", Config.Set_bool Transaction.test_eagain);
-- 
2.37.4