1 /** Adapters connecting stages with incompatible interfaces. 2 * 3 * Authors: $(LINK2 https://github.com/epi, Adrian Matoga) 4 * Copyright: © 2016 Adrian Matoga 5 * License: $(LINK2 http://www.boost.org/users/license.html, BSL-1.0). 6 */ 7 module flod.adapter; 8 9 import flod.pipeline; 10 import flod.traits; 11 12 private template DefaultPullPeekAdapter(Buffer, E) { 13 @pullSink!E @peekSource!E 14 struct DefaultPullPeekAdapter(Source) { 15 Source source; 16 Buffer buffer; 17 18 this()(auto ref Buffer buffer) 19 { 20 this.buffer = buffer; 21 } 22 23 const(E)[] peek(size_t size) 24 { 25 auto ready = buffer.peek!E(); 26 if (ready.length >= size) 27 return ready; 28 auto chunk = buffer.alloc!E(size - ready.length); 29 size_t r = source.pull(chunk); 30 buffer.commit!E(r); 31 return buffer.peek!E(); 32 } 33 34 void consume(size_t size) 35 { 36 buffer.consume!E(size); 37 } 38 } 39 } 40 41 /// 42 auto pullPeek(Pipeline, Buffer)(auto ref Pipeline pipeline, auto ref Buffer buffer) 43 if (isPullPipeline!Pipeline) 44 { 45 return pipeline.pipe!(DefaultPullPeekAdapter!(Buffer, Pipeline.ElementType))(buffer); 46 } 47 48 /// 49 auto pullPeek(Pipeline)(auto ref Pipeline pipeline) 50 if (isPullPipeline!Pipeline) 51 { 52 import flod.buffer : movingBuffer; 53 return pipeline.pullPeek(movingBuffer()); 54 } 55 56 private template DefaultPeekPullAdapter(E) { 57 @peekSink!E @pullSource!E 58 struct DefaultPeekPullAdapter(Source) { 59 Source source; 60 61 size_t pull(E[] buf) 62 { 63 import std.algorithm : min; 64 auto inbuf = source.peek(buf.length); 65 auto l = min(buf.length, inbuf.length); 66 buf[0 .. l] = inbuf[0 .. l]; 67 source.consume(l); 68 return l; 69 } 70 } 71 } 72 73 /// 74 auto peekPull(Pipeline)(auto ref Pipeline pipeline) 75 if (isPeekPipeline!Pipeline) 76 { 77 return pipeline.pipe!(DefaultPeekPullAdapter!(Pipeline.ElementType)); 78 } 79 80 private template DefaultPullPushAdapter(E) { 81 @pullSink!E @pushSource!E 82 struct DefaultPullPushAdapter(Source, Sink) { 83 Source source; 84 Sink sink; 85 size_t chunkSize; 86 87 this(size_t chunkSize) 88 { 89 this.chunkSize = chunkSize; 90 } 91 92 void run()() 93 { 94 import core.stdc.stdlib : alloca; 95 auto buf = (cast(E*) alloca(E.sizeof * chunkSize))[0 .. chunkSize]; 96 for (;;) { 97 size_t inp = source.pull(buf[]); 98 if (inp == 0) 99 break; 100 if (sink.push(buf[0 .. inp]) < chunkSize) 101 break; 102 } 103 } 104 } 105 } 106 107 /// 108 auto pullPush(Pipeline)(auto ref Pipeline pipeline, size_t chunkSize = 4096) 109 if (isPullPipeline!Pipeline) 110 { 111 return pipeline.pipe!(DefaultPullPushAdapter!(Pipeline.ElementType))(chunkSize); 112 } 113 114 private template DefaultPullAllocAdapter(E) { 115 @pullSink!E @allocSource!E 116 struct DefaultPullAllocAdapter(Source, Sink) { 117 Source source; 118 Sink sink; 119 size_t chunkSize; 120 121 this(size_t chunkSize) 122 { 123 this.chunkSize = chunkSize; 124 } 125 126 void run()() 127 { 128 E[] buf; 129 for (;;) { 130 if (!sink.alloc(buf, chunkSize)) { 131 import core.exception : OutOfMemoryError; 132 throw new OutOfMemoryError; 133 } 134 size_t inp = source.pull(buf[]); 135 if (inp == 0) 136 break; 137 if (sink.commit(inp) < chunkSize) 138 break; 139 } 140 } 141 } 142 } 143 144 /// 145 auto pullAlloc(Pipeline)(auto ref Pipeline pipeline, size_t chunkSize = 4096) 146 if (isPullPipeline!Pipeline) 147 { 148 return pipeline.pipe!(DefaultPullAllocAdapter!(Pipeline.ElementType))(chunkSize); 149 } 150 151 private template DefaultPeekPushAdapter(E) { 152 @peekSink!E @pushSource!E 153 struct DefaultPeekPushAdapter(Source, Sink) { 154 Source source; 155 Sink sink; 156 size_t minSliceSize; 157 158 this(size_t minSliceSize) 159 { 160 this.minSliceSize = minSliceSize; 161 } 162 163 void run()() 164 { 165 for (;;) { 166 auto buf = source.peek(minSliceSize); 167 if (buf.length == 0) 168 break; 169 size_t w = sink.push(buf[]); 170 if (w < minSliceSize) 171 break; 172 assert(w <= buf.length); 173 source.consume(w); 174 } 175 } 176 } 177 } 178 179 /// 180 auto peekPush(Pipeline)(auto ref Pipeline pipeline, size_t minSliceSize = size_t.sizeof) 181 if (isPeekPipeline!Pipeline) 182 { 183 return pipeline.pipe!(DefaultPeekPushAdapter!(Pipeline.ElementType))(minSliceSize); 184 } 185 186 private template DefaultPeekAllocAdapter(E) { 187 @peekSink!E @allocSource!E 188 struct DefaultPeekAllocAdapter(Source, Sink) { 189 Source source; 190 Sink sink; 191 size_t minSliceSize; 192 size_t maxSliceSize; 193 194 this(size_t minSliceSize, size_t maxSliceSize) 195 { 196 this.minSliceSize = minSliceSize; 197 this.maxSliceSize = maxSliceSize; 198 } 199 200 void run()() 201 { 202 E[] ob; 203 for (;;) { 204 auto ib = source.peek(minSliceSize); 205 if (ib.length == 0) 206 break; 207 auto len = min(ib.length, maxSliceSize); 208 if (!sink.alloc(ob, len)) { 209 import core.exception : OutOfMemoryError; 210 throw new OutOfMemoryError(); 211 } 212 ob[0 .. len] = ib[0 .. len]; 213 size_t w = sink.commit(len); 214 source.consume(w); 215 if (w < minSliceSize) 216 break; 217 } 218 } 219 } 220 } 221 222 /// 223 auto peekAlloc(Pipeline)(auto ref Pipeline pipeline, size_t minSliceSize = size_t.sizeof, size_t maxSliceSize = 4096) 224 if (isPeekPipeline!Pipeline) 225 { 226 return pipeline.pipe!(DefaultPeekAllocAdapter!(Pipeline.ElementType))(minSliceSize, maxSliceSize); 227 } 228 229 private template DefaultPushAllocAdapter(E) { 230 @pushSink!E @allocSource!E 231 struct DefaultPushAllocAdapter(Sink) { 232 Sink sink; 233 234 size_t push(const(E)[] buf) 235 { 236 E[] ob; 237 if (!sink.alloc(ob, buf.length)) { 238 import core.exception : OutOfMemoryError; 239 throw new OutOfMemoryError(); 240 } 241 ob[0 .. buf.length] = buf[]; 242 return sink.commit(buf.length); 243 } 244 } 245 } 246 247 /// 248 auto pushAlloc(Pipeline)(auto ref Pipeline pipeline) 249 if (isPushPipeline!Pipeline) 250 { 251 alias E = Pipeline.ElementType; 252 alias PP = DefaultPushAllocAdapter!(E); 253 return pipeline.pipe!PP(); 254 } 255 256 private template DefaultPushPullAdapter(Buffer, E) { 257 @pushSink!E @pullSource!E 258 struct DefaultPushPullAdapter(alias Scheduler) { 259 import std.algorithm : min; 260 261 mixin Scheduler; 262 263 Buffer buffer; 264 const(E)[] pushed; 265 266 this()(auto ref Buffer buffer) 267 { 268 this.buffer = buffer; 269 } 270 271 size_t push(const(E)[] buf) 272 { 273 if (pushed.length > 0) 274 return 0; 275 pushed = buf; 276 yield(); 277 return buf.length; 278 } 279 280 private E[] pullFromBuffer(E[] dest) 281 { 282 auto src = buffer.peek!E(); 283 auto len = min(src.length, dest.length); 284 if (len > 0) { 285 dest[0 .. len] = src[0 .. len]; 286 buffer.consume!E(len); 287 return dest[len .. $]; 288 } 289 return dest; 290 } 291 292 size_t pull(E[] dest) 293 { 294 size_t requestedLength = dest.length; 295 // first, give off whatever was left from this.pushed on previous pull(); 296 dest = pullFromBuffer(dest); 297 if (dest.length == 0) 298 return requestedLength; 299 // if not satisfied yet, switch to source fiber till push() is called again 300 // enough times to fill dest[] 301 do { 302 if (yield()) 303 break; 304 // pushed is the slice of the original buffer passed to push() by the source. 305 auto len = min(pushed.length, dest.length); 306 assert(len > 0); 307 dest[0 .. len] = pushed[0 .. len]; 308 dest = dest[len .. $]; 309 pushed = pushed[len .. $]; 310 } while (dest.length > 0); 311 312 // whatever's left in pushed, keep it in buffer for the next time pull() is called 313 while (pushed.length > 0) { 314 auto b = buffer.alloc!E(pushed.length); 315 if (b.length == 0) { 316 import core.exception : OutOfMemoryError; 317 throw new OutOfMemoryError(); 318 } 319 auto len = (b.length, pushed.length); 320 b[0 .. len] = pushed[0 .. len]; 321 buffer.commit!E(len); 322 pushed = pushed[len .. $]; 323 } 324 return requestedLength - dest.length; 325 } 326 } 327 } 328 329 /// 330 auto pushPull(Pipeline, Buffer)(auto ref Pipeline pipeline, auto ref Buffer buffer) 331 { 332 alias E = Pipeline.ElementType; 333 alias PP = DefaultPushPullAdapter!(Buffer, E); 334 return pipeline.pipe!PP(buffer); 335 } 336 337 /// 338 auto pushPull(Pipeline)(auto ref Pipeline pipeline) 339 { 340 import flod.buffer : movingBuffer; 341 return pipeline.pushPull(movingBuffer()); 342 } 343 344 private template ImplementPeekConsume(E) { 345 const(E)[] peek(size_t n) 346 { 347 const(E)[] result; 348 for (;;) { 349 result = buffer.peek!E; 350 if (result.length >= n) 351 break; 352 if (yield()) 353 break; 354 } 355 return result; 356 } 357 358 void consume(size_t n) 359 { 360 buffer.consume!E(n); 361 } 362 } 363 364 private template ImplementAllocCommit(E) { 365 bool alloc(ref E[] buf, size_t n) 366 { 367 buf = buffer.alloc!E(n); 368 if (!buf || buf.length < n) 369 return false; 370 return true; 371 } 372 373 size_t commit(size_t n) 374 { 375 buffer.commit!E(n); 376 if (yield()) 377 return 0; 378 return n; 379 } 380 } 381 382 private template DefaultPushPeekAdapter(Buffer, E) { 383 @pushSink!E @peekSource!E 384 struct DefaultPushPeekAdapter(alias Scheduler) { 385 import std.algorithm : min; 386 mixin Scheduler; 387 Buffer buffer; 388 389 this()(auto ref Buffer buffer) 390 { 391 this.buffer = buffer; 392 } 393 394 size_t push(const(E)[] buf) 395 { 396 size_t n = buf.length; 397 auto ob = buffer.alloc!E(n); 398 if (ob.length < n) { 399 import core.exception : OutOfMemoryError; 400 throw new OutOfMemoryError(); 401 } 402 ob[0 .. n] = buf[0 .. n]; 403 buffer.commit!E(n); 404 if (yield()) 405 return 0; 406 return n; 407 } 408 409 mixin ImplementPeekConsume!E; 410 } 411 } 412 413 414 /// 415 auto pushPeek(Pipeline, Buffer)(auto ref Pipeline pipeline, auto ref Buffer buffer) 416 { 417 alias E = Pipeline.ElementType; 418 alias PP = DefaultPushPeekAdapter!(Buffer, E); 419 return pipeline.pipe!PP(buffer); 420 } 421 422 /// 423 auto pushPeek(Pipeline)(auto ref Pipeline pipeline) 424 { 425 import flod.buffer : movingBuffer; 426 return pipeline.pushPeek(movingBuffer()); 427 } 428 429 private template DefaultAllocPeekAdapter(Buffer, E) { 430 @allocSink!E @peekSource!E 431 struct DefaultAllocPeekAdapter(alias Scheduler) { 432 import std.algorithm : min; 433 mixin Scheduler; 434 Buffer buffer; 435 436 this()(auto ref Buffer buffer) 437 { 438 this.buffer = buffer; 439 } 440 441 mixin ImplementAllocCommit!E; 442 mixin ImplementPeekConsume!E; 443 } 444 } 445 446 /// 447 auto allocPeek(Pipeline, Buffer)(auto ref Pipeline pipeline, auto ref Buffer buffer) 448 if (isAllocPipeline!Pipeline) 449 { 450 alias E = Pipeline.ElementType; 451 alias PP = DefaultAllocPeekAdapter!(Buffer, E); 452 return pipeline.pipe!PP(buffer); 453 } 454 455 /// 456 auto allocPeek(Pipeline)(auto ref Pipeline pipeline) 457 if (isAllocPipeline!Pipeline) 458 { 459 import flod.buffer : movingBuffer; 460 return pipeline.allocPeek(movingBuffer()); 461 } 462 463 private template DefaultAllocPullAdapter(Buffer, E) { 464 @allocSink!E @pullSource!E 465 struct DefaultAllocPullAdapter(alias Scheduler) { 466 import std.algorithm : min; 467 mixin Scheduler; 468 Buffer buffer; 469 470 this()(auto ref Buffer buffer) 471 { 472 this.buffer = buffer; 473 } 474 475 mixin ImplementAllocCommit!E; 476 477 size_t pull(E[] buf) 478 { 479 const(E)[] ib; 480 for (;;) { 481 ib = buffer.peek!E; 482 if (ib.length >= buf.length) 483 break; 484 if (yield()) 485 break; 486 } 487 auto len = min(ib.length, buf.length); 488 buf[0 .. len] = ib[0 .. len]; 489 buffer.consume!E(len); 490 return len; 491 } 492 } 493 } 494 495 /// 496 auto allocPull(Pipeline, Buffer)(auto ref Pipeline pipeline, auto ref Buffer buffer) 497 if (isAllocPipeline!Pipeline) 498 { 499 alias E = Pipeline.ElementType; 500 alias PP = DefaultAllocPullAdapter!(Buffer, E); 501 return pipeline.pipe!PP(buffer); 502 } 503 504 /// 505 auto allocPull(Pipeline)(auto ref Pipeline pipeline) 506 if (isAllocPipeline!Pipeline) 507 { 508 import flod.buffer : movingBuffer; 509 return pipeline.allocPull(movingBuffer()); 510 } 511 512 private template DefaultAllocPushAdapter(Buffer, E) { 513 @allocSink!E @pushSource!E 514 struct DefaultAllocPushAdapter(Sink) { 515 Sink sink; 516 Buffer buffer; 517 518 this()(auto ref Buffer buffer) 519 { 520 this.buffer = buffer; 521 } 522 523 bool alloc(ref E[] buf, size_t n) 524 { 525 buf = buffer.alloc!E(n); 526 if (!buf || buf.length < n) 527 return false; 528 return true; 529 } 530 531 size_t commit(size_t n) 532 { 533 buffer.commit!E(n); 534 sink.push(buffer.peek!E[0 .. n]); 535 buffer.consume!E(n); 536 return n; 537 } 538 } 539 } 540 541 /// 542 auto allocPush(Pipeline, Buffer)(auto ref Pipeline pipeline, auto ref Buffer buffer) 543 if (isAllocPipeline!Pipeline) 544 { 545 alias E = Pipeline.ElementType; 546 alias PP = DefaultAllocPushAdapter!(Buffer, E); 547 return pipeline.pipe!PP(buffer); 548 } 549 550 /// 551 auto allocPush(Pipeline)(auto ref Pipeline pipeline) 552 if (isAllocPipeline!Pipeline) 553 { 554 import flod.buffer : movingBuffer; 555 return pipeline.allocPush(movingBuffer()); 556 }