1 /** Various buffer implementations. 2 * 3 * Authors: $(LINK2 https://github.com/epi, Adrian Matoga) 4 * Copyright: © 2016 Adrian Matoga 5 * License: $(LINK2 http://www.boost.org/users/license.html, BSL-1.0). 6 */ 7 module flod.buffer; 8 9 private size_t alignUp(size_t n, size_t al) 10 { 11 return (n + al - 1) & ~(al - 1); 12 } 13 static assert(13.alignUp(4) == 16); 14 static assert(31337.alignUp(4096) == 32768); 15 16 private size_t goodSize(Allocator)(ref Allocator, size_t n) 17 { 18 static if (is(typeof(allocator.goodAllocSize(n)) : size_t)) 19 return allocator.goodAllocSize(n); 20 else 21 return n.alignUp(size_t.sizeof); 22 } 23 24 import std.experimental.allocator.mallocator : Mallocator; 25 26 /// A buffer that discards all data written to it and always returns empty slice. 27 struct NullBuffer { 28 private: 29 void[] buffer; 30 public: 31 ~this() { Mallocator.instance.deallocate(buffer); } 32 T[] alloc(T)(size_t n) 33 { 34 return new T[n]; 35 } 36 void commit(T)(size_t n) {}; 37 const(T)[] peek(T)() { return null; } 38 void consume(T)(size_t n) {}; 39 } 40 41 /** 42 A buffer that relies on moving chunks of data in memory 43 to ensure that contiguous slices of any requested size can always be provided. 44 Params: 45 Allocator = _Allocator used for internal storage allocation. 46 */ 47 struct MovingBuffer(Allocator = Mallocator) { 48 private: 49 import core.exception : OutOfMemoryError; 50 51 void[] buffer; 52 size_t peekOffset; 53 size_t allocOffset; 54 Allocator allocator; 55 56 invariant { 57 assert(peekOffset <= allocOffset); 58 assert(allocOffset <= buffer.length); 59 } 60 61 public: 62 this()(auto ref Allocator allocator, size_t initialSize = 0) 63 { 64 import flod.meta : moveIfNonCopyable; 65 this.allocator = moveIfNonCopyable(allocator); 66 if (initialSize > 0) 67 buffer = allocator.allocate(allocator.goodSize(initialSize)); 68 } 69 70 this(this) 71 { 72 assert(buffer == null); 73 } 74 75 /// Allocates space for at least `n` new objects of type `T` to be written to the buffer. 76 T[] alloc(T)(size_t n) 77 { 78 import std.experimental.allocator : reallocate; 79 import core.stdc.string : memmove; 80 size_t tn = T.sizeof * n; 81 if (buffer.length - allocOffset >= tn) 82 return cast(T[]) buffer[allocOffset .. $]; 83 memmove(buffer.ptr, buffer.ptr + peekOffset, allocOffset - peekOffset); 84 allocOffset -= peekOffset; 85 peekOffset = 0; 86 size_t newSize = goodSize(allocator, allocOffset + tn); 87 if (buffer.length < newSize) 88 allocator.reallocate(buffer, newSize); 89 assert(buffer.length - allocOffset >= tn); // TODO: let it return smaller chunk and the user will handle it 90 return cast(T[]) buffer[allocOffset .. $]; 91 } 92 93 /// Adds first `n` objects of type `T` stored in the slice previously obtained using `alloc`. 94 /// Does not touch the remaining part of that slice. 95 void commit(T)(size_t n) 96 { 97 size_t tn = T.sizeof * n; 98 allocOffset += tn; 99 assert(allocOffset <= buffer.length); 100 } 101 102 /// Returns a read-only slice, typed as `const(T)[]`, containing all data currently available in the buffer. 103 const(T)[] peek(T)() 104 { 105 return cast(const(T)[]) buffer[peekOffset .. allocOffset]; 106 } 107 108 /// Removes first `n` objects of type `T` from the buffer. 109 void consume(T)(size_t n) 110 { 111 size_t tn = T.sizeof * n; 112 peekOffset += tn; 113 assert(peekOffset <= allocOffset); 114 if (peekOffset == buffer.length) { 115 peekOffset = 0; 116 allocOffset = 0; 117 } 118 } 119 } 120 121 /// 122 auto movingBuffer(Allocator)(auto ref Allocator allocator) 123 { 124 return MovingBuffer!Allocator(allocator); 125 } 126 127 /// 128 auto movingBuffer() 129 { 130 import std.experimental.allocator.mallocator : Mallocator; 131 return movingBuffer(Mallocator.instance); 132 } 133 134 version(unittest) { 135 private void testBuffer(Buffer)(auto ref Buffer b) 136 { 137 import std.range : iota, array, put; 138 import std.algorithm : copy; 139 static assert(is(typeof(b))); 140 assert(b.peek!uint().length == 0); 141 b.consume!uint(0); 142 auto chunk = b.alloc!uint(1); 143 assert(chunk.length >= 1); 144 assert(b.peek!uint().length == 0); 145 chunk = b.alloc!uint(31337); 146 assert(chunk.length >= 31337); 147 auto arr = iota!uint(0, chunk.length).array(); 148 iota!uint(0, cast(uint) chunk.length).copy(chunk[0 .. $]); 149 b.commit!uint(1312); 150 assert(b.peek!uint()[] == arr[0 .. 1312]); 151 b.commit!uint(chunk.length - 1312); 152 assert(b.peek!uint()[] == arr[]); 153 b.consume!uint(0); 154 assert(b.peek!uint()[] == arr[]); 155 b.consume!uint(15); 156 assert(b.peek!uint()[] == arr[15 .. $]); 157 // TODO: put more stress on the buffer 158 } 159 } 160 161 unittest { 162 auto b = movingBuffer(); 163 testBuffer(b); 164 // consume everything and check if b will reset its pointers. 165 b.consume!uint(b.peek!uint().length); 166 assert(b.peek!uint().length == 0); 167 assert(b.allocOffset == 0); 168 assert(b.peekOffset == 0); 169 } 170 171 /** 172 A circular buffer which avoids moving data around, but instead maps the same physical memory block twice 173 into two adjacent virtual memory blocks. 174 It $(U does) move data blocks when growing the buffer. 175 */ 176 struct MmappedBuffer { 177 private: 178 enum pageSize = 4096; 179 import flod.meta : NonCopyable; 180 mixin NonCopyable; 181 182 import core.sys.posix.stdlib : mkstemp; 183 import core.sys.posix.unistd : close, unlink, ftruncate; 184 import core.sys.posix.sys.mman : mmap, munmap, 185 MAP_ANON, MAP_PRIVATE, MAP_FIXED, MAP_SHARED, MAP_FAILED, PROT_WRITE, PROT_READ; 186 187 void[] buffer; 188 size_t peekOffset; 189 size_t peekableLength; 190 int fd = -1; 191 ulong wraps = 0; 192 193 @property size_t allocOffset() const pure nothrow 194 { 195 auto ao = peekOffset + peekableLength; 196 if (ao <= buffer.length) 197 return ao; 198 return ao - buffer.length; 199 } 200 201 @property size_t allocableLength() const pure nothrow { return buffer.length - peekableLength; } 202 203 invariant { 204 assert(peekOffset <= buffer.length); 205 } 206 207 this(size_t initialSize) 208 { 209 if (!createFile()) 210 return; 211 if (initialSize) 212 buffer = allocate(initialSize); 213 } 214 215 bool createFile()() 216 { 217 static immutable path = "/dev/shm/flod-buffer-XXXXXX"; 218 char[path.length + 1] mutablePath = path.ptr[0 .. path.length + 1]; 219 fd = mkstemp(mutablePath.ptr); 220 if (fd < 0) 221 return false; 222 if (unlink(mutablePath.ptr) != 0) { 223 close(fd); 224 fd = -1; 225 return false; 226 } 227 return true; 228 } 229 230 void[] allocate(size_t length) 231 { 232 length = length.alignUp(pageSize); 233 if (fd < 0) 234 return null; 235 if (ftruncate(fd, length) != 0) 236 return null; 237 238 // first, make sure that a contiguous virtual memory range of 2 * length bytes is available 239 void* anon = mmap(null, length * 2, PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE, -1, 0); 240 if (anon == MAP_FAILED) 241 return null; 242 243 // then map the 2 halves inside to the same range of physical memory 244 void* p = mmap(anon, length, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0); 245 if (p == MAP_FAILED) { 246 munmap(anon, length * 2); 247 return null; 248 } 249 assert(p == anon); 250 p = mmap(anon + length, length, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0); 251 if (p == MAP_FAILED) { 252 munmap(anon, length * 2); 253 return null; 254 } 255 assert(p == anon + length); 256 return anon[0 .. length]; 257 } 258 259 bool reallocate(size_t length) 260 { 261 if (length == buffer.length) 262 return true; 263 auto newbuf = allocate(length); 264 if (!newbuf) 265 return false; 266 newbuf.ptr[peekOffset .. peekOffset + peekableLength] = buffer.ptr[peekOffset .. peekOffset + peekableLength]; 267 if (peekOffset > allocOffset) { 268 auto po1 = peekOffset; 269 auto po2 = newbuf.length - buffer.length; 270 peekOffset += newbuf.length - buffer.length; 271 if (peekOffset >= newbuf.length) 272 peekOffset -= newbuf.length; 273 } 274 deallocate(buffer); 275 buffer = newbuf; 276 return true; 277 } 278 279 static void deallocate(ref void[] b) 280 { 281 if (b.ptr) { 282 munmap(b.ptr, b.length << 1); 283 b = null; 284 } 285 } 286 287 public: 288 ~this() 289 { 290 deallocate(buffer); 291 if (fd >= 0) { 292 close(fd); 293 fd = -1; 294 } 295 } 296 297 /// Returns a read-only slice, typed as `const(T)[]`, containing all data currently available in the buffer. 298 const(T)[] peek(T)() 299 { 300 auto typed = cast(const(T*)) (buffer.ptr + peekOffset); 301 auto count = peekableLength / T.sizeof; 302 return typed[0 .. count]; 303 } 304 305 /// Removes first `n` objects of type `T` from the buffer. 306 void consume(T)(size_t n) 307 { 308 size_t tn = T.sizeof * n; 309 assert(peekableLength >= tn); 310 peekOffset += tn; 311 peekableLength -= tn; 312 if (peekOffset >= buffer.length) { 313 peekOffset -= buffer.length; 314 wraps++; 315 } 316 } 317 318 /// Allocates space for at least `n` new objects of type `T` to be written to the buffer. 319 T[] alloc(T)(size_t n) 320 { 321 auto typed = cast(T*) (buffer.ptr + allocOffset); 322 auto count = allocableLength / T.sizeof; 323 if (n <= count) 324 return typed[0 .. count]; 325 // make sure at least T[n] will be available behind what's currently peekable 326 reallocate(peekOffset + peekableLength + n * T.sizeof); 327 typed = cast(T*) (buffer.ptr + allocOffset); 328 count = allocableLength / T.sizeof; 329 assert(count >= n); // TODO: let it return smaller chunk and the user will handle it 330 return typed[0 .. count]; 331 } 332 333 /// Adds first `n` objects of type `T` stored in the slice previously obtained using `alloc`. 334 /// Does not touch the remaining part of that slice. 335 void commit(T)(size_t n) 336 { 337 size_t tn = T.sizeof * n; 338 assert(tn <= allocableLength); 339 peekableLength += tn; 340 } 341 } 342 343 auto mmappedBuffer(size_t initialSize = 0) 344 { 345 return MmappedBuffer(initialSize); 346 } 347 348 unittest { 349 testBuffer(mmappedBuffer()); 350 }