1 module libfuture.common; 2 3 import core.thread : Fiber; 4 import core.sync.event : Event; 5 import core.time : Duration; 6 7 /// 8 auto await(T : Fiber)(auto ref T fiber) 9 { 10 while (fiber.state != Fiber.State.TERM) 11 { 12 fiber.call(); 13 if (fiber.state != Fiber.State.TERM && Fiber.getThis()) 14 { 15 Fiber.yield(); 16 } 17 } 18 19 static if (__traits(compiles, { return fiber.result; })) 20 { 21 return fiber.result; 22 } 23 } 24 25 @("await: Works as a force wait if not in Fiber") 26 unittest 27 { 28 int n = 0; 29 auto t = new Fiber({ 30 n = 1; 31 Fiber.yield(); 32 n = 2; 33 Fiber.yield(); 34 n = 3; 35 }); 36 37 assert(n == 0); 38 t.await; // force wait 39 assert(n == 3); 40 } 41 42 @("await: 内部のFiber.yield()を直接書いたときと同じ動作(yield 1回)") 43 unittest 44 { 45 int n = 1; 46 auto inner = new Fiber({ 47 n += 1; 48 Fiber.yield(); 49 n += 1; 50 }); 51 auto outer = new Fiber({ 52 n *= 2; 53 inner.await; 54 n *= 2; 55 }); 56 assert(n == 1); 57 outer.call(); // *= 2; += 1; before Fiber.yield() 58 assert(n == 3); 59 outer.call(); // += 1; *= 2; after Fiber.yield() 60 assert(n == 8); 61 assert(inner.state == Fiber.State.TERM); 62 assert(outer.state == Fiber.State.TERM); 63 } 64 65 @("await: 内部のFiber.yield()を直接書いたときと同じ動作(yield 2回)") 66 unittest 67 { 68 int n = 1; 69 auto inner = new Fiber({ 70 n += 1; 71 Fiber.yield(); 72 n += 1; 73 Fiber.yield(); 74 n += 1; 75 }); 76 auto outer = new Fiber({ 77 n *= 2; 78 inner.await; 79 n *= 2; 80 }); 81 assert(n == 1); 82 outer.call(); // *= 2; += 1; before Fiber.yield() 83 assert(n == 3); 84 outer.call(); // += 1; between Fiber.yield() 85 assert(n == 4); 86 outer.call(); // += 1; *= 2; after Fiber.yield() 87 assert(n == 10); 88 assert(inner.state == Fiber.State.TERM); 89 assert(outer.state == Fiber.State.TERM); 90 } 91 92 /// 93 class Future(T) : Fiber 94 { 95 enum ResolveState 96 { 97 Running, 98 Fulfilled, 99 Rejected 100 } 101 102 private Event event; 103 private ResolveState resolveState = ResolveState.Running; 104 static if (!is(T == void)) 105 { 106 private T value; 107 } 108 private Throwable throwable; 109 110 static if (is(T == void)) 111 { 112 this(void delegate() delegate(void delegate(), void delegate(Throwable)) factory) 113 { 114 event.initialize(true, false); 115 super({ 116 factory(&setResult, &setReject)(); 117 yieldImpl(); 118 }); 119 } 120 121 this(void delegate() delegate(void delegate()) factory) 122 { 123 event.initialize(true, false); 124 super({ 125 factory(&setResult)(); 126 yieldImpl(); 127 }); 128 } 129 130 this(void delegate() action) 131 { 132 event.initialize(true, false); 133 super({ 134 action(); 135 setResult(); 136 }); 137 } 138 139 this(void delegate(void delegate() resolve) dg) 140 { 141 event.initialize(true, false); 142 super({ 143 dg(&setResult); 144 yieldImpl(); 145 }); 146 } 147 } 148 else 149 { 150 this(void delegate() delegate(void delegate(T), void delegate(Throwable)) factory) 151 { 152 event.initialize(true, false); 153 super({ 154 factory(&setResult, &setReject)(); 155 yieldImpl(); 156 }); 157 } 158 159 this(void delegate() delegate(void delegate(T)) factory) 160 { 161 event.initialize(true, false); 162 super({ 163 factory(&setResult)(); 164 yieldImpl(); 165 }); 166 } 167 168 this(T delegate() action) 169 { 170 event.initialize(true, false); 171 super({ 172 setResult(action()); 173 }); 174 } 175 176 this(void delegate(void delegate(T) resolve) dg) 177 { 178 event.initialize(true, false); 179 super({ 180 dg(&setResult); 181 yieldImpl(); 182 }); 183 } 184 } 185 186 static if (!is(T == void)) 187 { 188 T result() 189 { 190 if (this.state != Fiber.State.TERM) 191 { 192 call(); 193 event.wait(); 194 } 195 return value; 196 } 197 } 198 199 private void yieldImpl() 200 { 201 while (!event.wait(Duration.zero)) 202 { 203 if (resolveState == ResolveState.Fulfilled) 204 Fiber.yield(); 205 else 206 Fiber.yieldAndThrow(this.throwable); 207 } 208 } 209 210 static if (is(T == void)) 211 { 212 private void setResult() 213 { 214 resolveState = ResolveState.Fulfilled; 215 event.set(); 216 } 217 } 218 else 219 { 220 private void setResult(T obj) 221 { 222 value = obj; 223 resolveState = ResolveState.Fulfilled; 224 event.set(); 225 } 226 } 227 228 private void setReject(Throwable t) 229 { 230 this.throwable = t; 231 resolveState = ResolveState.Rejected; 232 event.set(); 233 } 234 } 235 236 @("Future: Just like regular Fiber") 237 unittest 238 { 239 int n = 0; 240 auto t1 = new Future!int(resolve => { 241 n = 1; 242 resolve(10); 243 }); 244 assert(n == 0); 245 t1.call(); 246 assert(n == 1); 247 assert(t1.state == Fiber.State.TERM); 248 assert(t1.result == 10); 249 } 250 251 @("Future: Nested") 252 unittest 253 { 254 auto inner = new Future!int(resolve => { 255 resolve(10); 256 }); 257 auto outer = new Future!int(resolve => { 258 const x = inner.await; 259 resolve(x * 2); 260 }); 261 262 const y = outer.await; 263 assert(y == 20); 264 } 265 266 @("Future: try-catch can handle internal Fiber exceptions") 267 unittest 268 { 269 auto t = new Fiber({ 270 throw new Exception(""); 271 }); 272 273 const x = new Future!int(resolve => { 274 try 275 { 276 t.await; // throw and reject 277 resolve(1); 278 } 279 catch (Exception ex) 280 { 281 resolve(2); 282 } 283 }).await; 284 assert(x == 2); 285 } 286 287 @("Future.result: Works as a force wait") 288 unittest 289 { 290 auto t = new Future!string(resolve => { 291 resolve("FUTURE"); 292 }); 293 string text = t.result; 294 assert(text == "FUTURE"); 295 } 296 297 @("Future: async resolve") 298 unittest 299 { 300 auto t = new Future!string(resolve => { 301 import std.parallelism : taskPool, task; 302 import core.thread : Thread; 303 import core.time : msecs; 304 taskPool.put(task({ 305 Thread.sleep(20.msecs); 306 resolve("OK"); 307 })); 308 }); 309 310 const string text = t.await; 311 assert(text == "OK"); 312 } 313 314 @("Future: async reject") 315 unittest 316 { 317 auto t = new Future!string((resolve, reject) => { 318 import std.parallelism : taskPool, task; 319 import core.thread : Thread; 320 import core.time : msecs; 321 taskPool.put(task({ 322 Thread.sleep(20.msecs); 323 reject(new Exception("REJECT")); 324 })); 325 }); 326 327 try 328 { 329 t.await; 330 } 331 catch (Exception ex) 332 { 333 assert(ex.msg == "REJECT"); 334 return; 335 } 336 assert(false); 337 } 338 339 @("Future: return value") 340 unittest 341 { 342 auto tenMsecs = new Future!int(resolve => { 343 import std.parallelism : taskPool, task; 344 import core.thread : Thread; 345 import core.time : msecs; 346 taskPool.put(task({ 347 Thread.sleep(10.msecs); 348 resolve(10); 349 })); 350 }); 351 352 const t = new Future!int({ 353 return tenMsecs.await + 10; 354 }).await; 355 assert(t == 20); 356 } 357 358 struct FutureFactory 359 { 360 @disable this(); 361 362 static Future!T startNew(T)(T delegate() action) 363 { 364 Future!T t; 365 t = new Future!T((resolve, reject) => { 366 import core.thread : Thread; 367 new Thread({ 368 try 369 { 370 static if (is(T == void)) 371 { 372 action(); 373 resolve(); 374 } 375 else 376 { 377 resolve(action()); 378 } 379 } 380 catch (Throwable t) 381 { 382 reject(t); 383 } 384 }).start(); 385 }); 386 t.call(); 387 388 return t; 389 } 390 } 391 392 @("FutureFactory: startNew run new thread") 393 unittest 394 { 395 auto futureInt = FutureFactory.startNew!int({ 396 import core.thread; 397 import core.time; 398 399 Thread.sleep(10.msecs); 400 return 10; 401 }); 402 403 const n = futureInt.await; 404 assert(n == 10); 405 } 406 407 @("Overview: delay and await some futures") 408 unittest 409 { 410 import core.time : msecs; 411 412 Fiber delay(Duration dur) 413 { 414 return new Future!void(resolve => { 415 import std.parallelism : taskPool, task; 416 import core.thread : Thread; 417 taskPool.put(task({ 418 Thread.sleep(dur); 419 resolve(); 420 })); 421 }); 422 } 423 424 auto futureHello = new Future!string({ 425 delay(10.msecs).await; 426 return "Hello"; 427 }); 428 auto futureWorld = new Future!string({ 429 delay(10.msecs).await; 430 return "world"; 431 }); 432 433 auto futureFormat = FutureFactory.startNew!string({ 434 const hello = futureHello.await; 435 const world = futureWorld.await; 436 import std.format : format; 437 return format!"%s, %s!"(hello, world); 438 }); 439 440 const text = futureFormat.await; 441 assert(text == "Hello, world!"); 442 } 443 444 @("Overview: delay and await some logic") 445 unittest 446 { 447 import core.time : msecs; 448 449 Fiber delay(Duration dur) 450 { 451 return new Future!void(resolve => { 452 import std.parallelism : taskPool, task; 453 import core.thread : Thread; 454 taskPool.put(task({ 455 Thread.sleep(dur); 456 resolve(); 457 })); 458 }); 459 } 460 461 Fiber times(size_t n, void delegate() action) 462 { 463 return new Fiber({ 464 while (n-- > 0) 465 { 466 delay(10.msecs).await; 467 action(); 468 } 469 }); 470 } 471 472 int count = 0; 473 times(3, { count++; }).await; 474 assert(count == 3); 475 } 476 477 struct EventLoop 478 { 479 import std.container.dlist; 480 481 DList!Fiber fibers; 482 483 @disable this(this); 484 485 void schedule(Fiber fiber) 486 { 487 fibers.insertBack(fiber); 488 } 489 490 void run() 491 { 492 while (!fibers.empty) 493 { 494 auto front = fibers.front; 495 fibers.removeFront(); 496 if (front.state != Fiber.State.TERM) 497 { 498 front.call(); 499 if (front.state != Fiber.State.TERM) 500 { 501 fibers.insertBack(front); 502 } 503 } 504 } 505 } 506 } 507 508 @("EventLoop: simple usage") 509 unittest 510 { 511 import core.time; 512 513 Fiber delay(Duration time) 514 { 515 import core.thread : Thread; 516 return FutureFactory.startNew!void({ 517 Thread.sleep(time); 518 }); 519 } 520 521 int[] ns; 522 EventLoop loop; 523 loop.schedule(new Fiber({ 524 foreach (int i; 0 .. 3) 525 { 526 if (i > 0) 527 delay(10.msecs).await; 528 ns ~= i; 529 } 530 })); 531 loop.schedule(new Fiber({ 532 foreach (int i; 10 .. 13) 533 { 534 if (i > 10) 535 delay(10.msecs).await; 536 ns ~= i; 537 } 538 })); 539 540 loop.run(); 541 542 import std.format; 543 assert(ns.length == 6, format!"ns.length? : %s"(ns)); 544 } 545 546 @("Overview: use with some algorithms") 547 unittest 548 { 549 import core.time : MonoTime, msecs; 550 import std.conv : to; 551 552 Fiber delay(Duration time) { 553 const begin = MonoTime.currTime; 554 return new Fiber({ 555 while (MonoTime.currTime - begin < time) 556 Fiber.yield(); 557 }); 558 } 559 560 Future!string toText(size_t n) { 561 return new Future!string(resolve => { 562 delay((n * 10).to!int().msecs).await; 563 resolve(n.to!string()); 564 }); 565 } 566 567 auto sumTextLength = new Future!size_t({ 568 import std.range : iota; 569 import std.algorithm : map, sum; 570 return iota(20) 571 .map!(a => toText(size_t(a)).await.length) // suspend and resume in map 572 .sum(); 573 }); 574 575 auto length = sumTextLength.await; 576 assert(length > 0); 577 } 578 579 /// 580 Fiber whenAll(T : Fiber)(T[] fibers) 581 { 582 return new Fiber({ 583 for (;;) 584 { 585 bool completed = true; 586 foreach (fiber; fibers) 587 { 588 if (fiber.state != Fiber.State.TERM) 589 { 590 completed = false; 591 fiber.call(); 592 } 593 } 594 if (completed) 595 { 596 break; 597 } 598 else if (Fiber.getThis()) 599 { 600 Fiber.yield(); 601 } 602 } 603 }); 604 } 605 606 @("whenAll: use whenAll with some algorithms") 607 unittest 608 { 609 import core.time : MonoTime, msecs; 610 import std.conv : to; 611 612 Fiber delay(Duration time) { 613 const begin = MonoTime.currTime; 614 return new Fiber({ 615 while (MonoTime.currTime - begin < time) 616 Fiber.yield(); 617 }); 618 } 619 620 Future!string toText(size_t n) { 621 return new Future!string(resolve => { 622 delay((n * 10).to!int().msecs).await; 623 resolve(n.to!string()); 624 }); 625 } 626 627 Future!string[] fibers; 628 foreach (i; 0 .. 20) 629 { 630 fibers ~= toText(size_t(i)); 631 } 632 fibers.whenAll().await; 633 634 import std.algorithm : map, sum; 635 636 auto length = fibers.map!(a => a.result.length).sum(); 637 assert(length); 638 }