1 module libfuture.common;
2 
3 import core.thread : Fiber;
4 import core.sync.event : Event;
5 import core.time : Duration;
6 
7 ///
8 auto await(T : Fiber)(auto ref T fiber)
9 {
10 	while (fiber.state != Fiber.State.TERM)
11 	{
12 		fiber.call();
13 		if (fiber.state != Fiber.State.TERM && Fiber.getThis())
14 		{
15 			Fiber.yield();
16 		}
17 	}
18 
19 	static if (__traits(compiles, { return fiber.result; }))
20 	{
21 		return fiber.result;
22 	}
23 }
24 
25 @("await: Works as a force wait if not in Fiber")
26 unittest
27 {
28     int n = 0;
29     auto t = new Fiber({
30         n = 1;
31         Fiber.yield();
32         n = 2;
33         Fiber.yield();
34         n = 3;
35     });
36 
37     assert(n == 0);
38     t.await; // force wait
39     assert(n == 3);
40 }
41 
42 @("await: 内部のFiber.yield()を直接書いたときと同じ動作(yield 1回)")
43 unittest
44 {
45     int n = 1;
46     auto inner = new Fiber({
47         n += 1;
48         Fiber.yield();
49         n += 1;
50     });
51     auto outer = new Fiber({
52         n *= 2;
53         inner.await;
54         n *= 2;
55     });
56     assert(n == 1);
57     outer.call(); // *= 2; += 1; before Fiber.yield()
58     assert(n == 3);
59     outer.call(); // += 1; *= 2; after Fiber.yield()
60     assert(n == 8);
61     assert(inner.state == Fiber.State.TERM);
62     assert(outer.state == Fiber.State.TERM);
63 }
64 
65 @("await: 内部のFiber.yield()を直接書いたときと同じ動作(yield 2回)")
66 unittest
67 {
68     int n = 1;
69     auto inner = new Fiber({
70         n += 1;
71         Fiber.yield();
72         n += 1;
73         Fiber.yield();
74         n += 1;
75     });
76     auto outer = new Fiber({
77         n *= 2;
78         inner.await;
79         n *= 2;
80     });
81     assert(n == 1);
82     outer.call(); // *= 2; += 1; before Fiber.yield()
83     assert(n == 3);
84     outer.call(); // += 1; between Fiber.yield()
85     assert(n == 4);
86     outer.call(); // += 1; *= 2; after Fiber.yield()
87     assert(n == 10);
88     assert(inner.state == Fiber.State.TERM);
89     assert(outer.state == Fiber.State.TERM);
90 }
91 
92 ///
93 class Future(T) : Fiber
94 {
95     enum ResolveState
96     {
97         Running,
98         Fulfilled,
99         Rejected
100     }
101 
102 	private Event event;
103 	private ResolveState resolveState = ResolveState.Running;
104     static if (!is(T == void))
105     {
106         private T value;
107     }
108     private Throwable throwable;
109 
110     static if (is(T == void))
111     {
112         this(void delegate() delegate(void delegate(), void delegate(Throwable)) factory)
113         {
114             event.initialize(true, false);
115             super({
116                 factory(&setResult, &setReject)();
117                 yieldImpl();
118             });
119         }
120         
121         this(void delegate() delegate(void delegate()) factory)
122         {
123             event.initialize(true, false);
124             super({
125                 factory(&setResult)();
126                 yieldImpl();
127             });
128         }
129 
130         this(void delegate() action)
131         {
132             event.initialize(true, false);
133             super({
134                 action();
135                 setResult();
136             });
137         }
138 
139         this(void delegate(void delegate() resolve) dg)
140         {
141             event.initialize(true, false);
142             super({
143                 dg(&setResult);
144                 yieldImpl();
145             });
146         }
147     }
148     else
149     {
150         this(void delegate() delegate(void delegate(T), void delegate(Throwable)) factory)
151         {
152             event.initialize(true, false);
153             super({
154                 factory(&setResult, &setReject)();
155                 yieldImpl();
156             });
157         }
158 
159         this(void delegate() delegate(void delegate(T)) factory)
160         {
161             event.initialize(true, false);
162             super({
163                 factory(&setResult)();
164                 yieldImpl();
165             });
166         }
167 
168         this(T delegate() action)
169         {
170             event.initialize(true, false);
171             super({
172                 setResult(action());
173             });
174         }
175 
176         this(void delegate(void delegate(T) resolve) dg)
177         {
178             event.initialize(true, false);
179             super({
180                 dg(&setResult);
181                 yieldImpl();
182             });
183         }
184     }
185 
186     static if (!is(T == void))
187     {
188         T result()
189         {
190             if (this.state != Fiber.State.TERM)
191             {
192                 call();
193                 event.wait();
194             }
195             return value;
196         }
197     }
198 
199     private void yieldImpl()
200     {
201         while (!event.wait(Duration.zero))
202         {
203             if (resolveState == ResolveState.Fulfilled)
204                 Fiber.yield();
205             else
206                 Fiber.yieldAndThrow(this.throwable);
207         }
208     }
209 
210     static if (is(T == void))
211     {
212         private void setResult()
213         {
214             resolveState = ResolveState.Fulfilled;
215             event.set();
216         }
217     }
218     else
219     {
220         private void setResult(T obj)
221         {
222             value = obj;
223             resolveState = ResolveState.Fulfilled;
224             event.set();
225         }
226     }
227 
228     private void setReject(Throwable t)
229     {
230         this.throwable = t;
231         resolveState = ResolveState.Rejected;
232         event.set();
233     }
234 }
235 
236 @("Future: Just like regular Fiber")
237 unittest
238 {
239     int n = 0;
240     auto t1 = new Future!int(resolve => {
241         n = 1;
242         resolve(10);
243     });
244     assert(n == 0);
245     t1.call();
246     assert(n == 1);
247     assert(t1.state == Fiber.State.TERM);
248     assert(t1.result == 10);
249 }
250 
251 @("Future: Nested")
252 unittest
253 {
254     auto inner = new Future!int(resolve => {
255         resolve(10);
256     });
257     auto outer = new Future!int(resolve => {
258         const x = inner.await;
259         resolve(x * 2);
260     });
261 
262     const y = outer.await;
263     assert(y == 20);
264 }
265 
266 @("Future: try-catch can handle internal Fiber exceptions")
267 unittest
268 {
269     auto t = new Fiber({
270         throw new Exception("");
271     });
272 
273     const x = new Future!int(resolve => {
274         try
275         {
276             t.await; // throw and reject
277             resolve(1);
278         }
279         catch (Exception ex)
280         {
281             resolve(2);
282         }
283     }).await;
284     assert(x == 2);
285 }
286 
287 @("Future.result: Works as a force wait")
288 unittest
289 {
290     auto t = new Future!string(resolve => {
291         resolve("FUTURE");
292     });
293     string text = t.result;
294     assert(text == "FUTURE");
295 }
296 
297 @("Future: async resolve")
298 unittest
299 {
300     auto t = new Future!string(resolve => {
301         import std.parallelism : taskPool, task;
302         import core.thread : Thread;
303         import core.time : msecs;
304         taskPool.put(task({
305             Thread.sleep(20.msecs);
306             resolve("OK");
307         }));
308     });
309 
310     const string text = t.await;
311     assert(text == "OK");
312 }
313 
314 @("Future: async reject")
315 unittest
316 {
317     auto t = new Future!string((resolve, reject) => {
318         import std.parallelism : taskPool, task;
319         import core.thread : Thread;
320         import core.time : msecs;
321         taskPool.put(task({
322             Thread.sleep(20.msecs);
323             reject(new Exception("REJECT"));
324         }));
325     });
326 
327     try
328     {
329         t.await;
330     }
331     catch (Exception ex)
332     {
333         assert(ex.msg == "REJECT");
334         return;
335     }
336     assert(false);
337 }
338 
339 @("Future: return value")
340 unittest
341 {
342     auto tenMsecs = new Future!int(resolve => {
343         import std.parallelism : taskPool, task;
344         import core.thread : Thread;
345         import core.time : msecs;
346         taskPool.put(task({
347             Thread.sleep(10.msecs);
348             resolve(10);
349         }));
350     });
351 
352     const t = new Future!int({
353         return tenMsecs.await + 10;
354     }).await;
355     assert(t == 20);
356 }
357 
358 struct FutureFactory
359 {
360     @disable this();
361 
362     static Future!T startNew(T)(T delegate() action)
363     {
364         Future!T t;
365         t = new Future!T((resolve, reject) => {
366             import core.thread : Thread;
367             new Thread({
368                 try
369                 {
370                     static if (is(T == void))
371                     {
372                         action();
373                         resolve();
374                     }
375                     else
376                     {
377                         resolve(action());
378                     }
379                 }
380                 catch (Throwable t)
381                 {
382                     reject(t);
383                 }
384             }).start();
385         });
386         t.call();
387 
388         return t;
389     }
390 }
391 
392 @("FutureFactory: startNew run new thread")
393 unittest
394 {
395     auto futureInt = FutureFactory.startNew!int({
396         import core.thread;
397         import core.time;
398 
399         Thread.sleep(10.msecs);
400         return 10;
401     });
402 
403     const n = futureInt.await;
404     assert(n == 10);
405 }
406 
407 @("Overview: delay and await some futures")
408 unittest
409 {
410     import core.time : msecs;
411 
412     Fiber delay(Duration dur)
413     {
414         return new Future!void(resolve => {
415             import std.parallelism : taskPool, task;
416             import core.thread : Thread;
417             taskPool.put(task({
418                 Thread.sleep(dur);
419                 resolve();
420             }));
421         });
422     }
423 
424     auto futureHello = new Future!string({
425         delay(10.msecs).await;
426         return "Hello";
427     });
428     auto futureWorld = new Future!string({
429         delay(10.msecs).await;
430         return "world";
431     });
432 
433     auto futureFormat = FutureFactory.startNew!string({
434         const hello = futureHello.await;
435         const world = futureWorld.await;
436         import std.format : format;
437         return format!"%s, %s!"(hello, world);
438     });
439     
440     const text = futureFormat.await;
441     assert(text == "Hello, world!");
442 }
443 
444 @("Overview: delay and await some logic")
445 unittest
446 {
447     import core.time : msecs;
448 
449     Fiber delay(Duration dur)
450     {
451         return new Future!void(resolve => {
452             import std.parallelism : taskPool, task;
453             import core.thread : Thread;
454             taskPool.put(task({
455                 Thread.sleep(dur);
456                 resolve();
457             }));
458         });
459     }
460 
461     Fiber times(size_t n, void delegate() action)
462     {
463         return new Fiber({
464             while (n-- > 0)
465             {
466                 delay(10.msecs).await;
467                 action();
468             }
469         });
470     }
471 
472     int count = 0;
473     times(3, { count++; }).await;
474     assert(count == 3);
475 }
476 
477 struct EventLoop
478 {
479     import std.container.dlist;
480 
481     DList!Fiber fibers;
482 
483     @disable this(this);
484 
485     void schedule(Fiber fiber)
486     {
487         fibers.insertBack(fiber);
488     }
489 
490     void run()
491     {
492         while (!fibers.empty)
493         {
494             auto front = fibers.front;
495             fibers.removeFront();
496             if (front.state != Fiber.State.TERM)
497             {
498                 front.call();
499                 if (front.state != Fiber.State.TERM)
500                 {
501                     fibers.insertBack(front);
502                 }
503             }
504         }
505     }
506 }
507 
508 @("EventLoop: simple usage")
509 unittest
510 {
511     import core.time;
512 
513     Fiber delay(Duration time)
514     {
515         import core.thread : Thread;
516         return FutureFactory.startNew!void({
517             Thread.sleep(time);
518         });
519     }
520 
521     int[] ns;
522     EventLoop loop;
523     loop.schedule(new Fiber({
524         foreach (int i; 0 .. 3)
525         {
526             if (i > 0)
527                 delay(10.msecs).await;
528             ns ~= i;
529         }
530     }));
531     loop.schedule(new Fiber({
532         foreach (int i; 10 .. 13)
533         {
534             if (i > 10)
535                 delay(10.msecs).await;
536             ns ~= i;
537         }
538     }));
539 
540     loop.run();
541 
542     import std.format;
543     assert(ns.length == 6, format!"ns.length? : %s"(ns));
544 }
545 
546 @("Overview: use with some algorithms")
547 unittest
548 {
549     import core.time : MonoTime, msecs;
550     import std.conv : to;
551 
552     Fiber delay(Duration time) {
553         const begin = MonoTime.currTime;
554         return new Fiber({
555             while (MonoTime.currTime - begin < time)
556                 Fiber.yield();
557         });
558     }
559 
560     Future!string toText(size_t n) {
561         return new Future!string(resolve => {
562             delay((n * 10).to!int().msecs).await;
563             resolve(n.to!string());
564         });
565     }
566 
567     auto sumTextLength = new Future!size_t({
568         import std.range : iota;
569         import std.algorithm : map, sum;
570         return iota(20)
571             .map!(a => toText(size_t(a)).await.length) // suspend and resume in map
572             .sum();
573     });
574 
575     auto length = sumTextLength.await;
576     assert(length > 0);
577 }
578 
579 ///
580 Fiber whenAll(T : Fiber)(T[] fibers)
581 {
582     return new Fiber({
583         for (;;)
584         {
585             bool completed = true;
586             foreach (fiber; fibers)
587             {
588                 if (fiber.state != Fiber.State.TERM)
589                 {
590                     completed = false;
591                     fiber.call();
592                 }
593             }
594             if (completed)
595             {
596                 break;
597             }
598             else if (Fiber.getThis())
599             {
600                 Fiber.yield();
601             }
602         }
603     });
604 }
605 
606 @("whenAll: use whenAll with some algorithms")
607 unittest
608 {
609     import core.time : MonoTime, msecs;
610     import std.conv : to;
611 
612     Fiber delay(Duration time) {
613         const begin = MonoTime.currTime;
614         return new Fiber({
615             while (MonoTime.currTime - begin < time)
616                 Fiber.yield();
617         });
618     }
619 
620     Future!string toText(size_t n) {
621         return new Future!string(resolve => {
622             delay((n * 10).to!int().msecs).await;
623             resolve(n.to!string());
624         });
625     }
626 
627     Future!string[] fibers;
628     foreach (i; 0 .. 20)
629     {
630         fibers ~= toText(size_t(i));
631     }
632     fibers.whenAll().await;
633 
634     import std.algorithm : map, sum;
635 
636     auto length = fibers.map!(a => a.result.length).sum();
637     assert(length);
638 }