1 /* 2 *Copyright (C) 2018 Laurent Tréguier 3 * 4 *This file is part of DLS. 5 * 6 *DLS is free software: you can redistribute it and/or modify 7 *it under the terms of the GNU General Public License as published by 8 *the Free Software Foundation, either version 3 of the License, or 9 *(at your option) any later version. 10 * 11 *DLS is distributed in the hope that it will be useful, 12 *but WITHOUT ANY WARRANTY; without even the implied warranty of 13 *MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 *GNU General Public License for more details. 15 * 16 *You should have received a copy of the GNU General Public License 17 *along with DLS. If not, see <http://www.gnu.org/licenses/>. 18 * 19 */ 20 21 module dls.util.communicator; 22 23 import std.stdio : File; 24 25 // Socket can't be used with a shared aliasing. 26 // However, the communicator's methods are all called either from the main 27 // thread, or from inside a synchronized block, so __gshared is ok. 28 private __gshared Communicator _communicator; 29 private __gshared File _stdin; 30 private __gshared File _stdout; 31 32 shared static this() 33 { 34 import std.stdio : stdin, stdout; 35 36 _stdin = stdin; 37 _stdout = stdout; 38 39 version (Windows) 40 { 41 stdin = File("NUL", "rb"); 42 stdout = File("NUL", "wb"); 43 } 44 else version (Posix) 45 { 46 stdin = File("/dev/null", "rb"); 47 stdout = File("/dev/null", "wb"); 48 } 49 } 50 51 shared static ~this() 52 { 53 if (_communicator !is null) 54 { 55 destroy(communicator); 56 } 57 } 58 59 @property Communicator communicator() 60 { 61 return _communicator; 62 } 63 64 @property void communicator(Communicator c) 65 { 66 assert(_communicator is null); 67 _communicator = c; 68 } 69 70 interface Communicator 71 { 72 bool hasData(); 73 bool hasPendingData(); 74 char[] read(const size_t size); 75 void write(const char[] data); 76 void flush(); 77 } 78 79 class StdioCommunicator : Communicator 80 { 81 import std.parallelism : Task, TaskPool; 82 83 private bool _checkPending; 84 private TaskPool _pool; 85 private Task!(readChar)* _background; 86 87 static char readChar() 88 { 89 static char[1] buffer; 90 auto result = _stdin.rawRead(buffer); 91 92 if (result.length > 0) 93 { 94 return result[0]; 95 } 96 97 throw new Exception("No input data"); 98 } 99 100 this(bool checkPendingData) 101 { 102 _checkPending = checkPendingData; 103 104 if (checkPendingData) 105 { 106 _pool = new TaskPool(1); 107 _pool.isDaemon = true; 108 startBackground(); 109 } 110 } 111 112 ~this() 113 { 114 if (_checkPending) 115 { 116 _pool.stop(); 117 } 118 } 119 120 bool hasData() 121 { 122 return _stdin.isOpen && !_stdin.eof; 123 } 124 125 bool hasPendingData() 126 { 127 try 128 { 129 return _checkPending && _background.done; 130 } 131 catch (Exception e) 132 { 133 return false; 134 } 135 } 136 137 char[] read(const size_t size) 138 { 139 if (size == 0) 140 { 141 return []; 142 } 143 144 static char[] buffer; 145 buffer.length = size; 146 147 if (!_checkPending) 148 { 149 return _stdin.rawRead(buffer); 150 } 151 152 scope(exit) 153 { 154 startBackground(); 155 } 156 157 try 158 { 159 buffer[0] = _background.yieldForce(); 160 } 161 catch (Exception e) 162 { 163 return hasData() ? _stdin.rawRead(buffer) : []; 164 } 165 166 if (size > 1) 167 { 168 buffer = buffer[0 .. _stdin.rawRead(buffer[1 .. $]).length + 1]; 169 } 170 171 return buffer; 172 } 173 174 void write(const char[] data) 175 { 176 _stdout.rawWrite(data); 177 } 178 179 void flush() 180 { 181 _stdout.flush(); 182 } 183 184 private void startBackground() 185 { 186 import std.parallelism : task; 187 188 if (_checkPending && hasData()) 189 { 190 _background = task!readChar; 191 _pool.put(_background); 192 } 193 } 194 } 195 196 class SocketCommunicator : Communicator 197 { 198 import std.socket : Socket; 199 200 private Socket _socket; 201 202 this(ushort port) 203 { 204 import std.socket : AddressInfo, InternetAddress, SocketOption, 205 SocketOptionLevel, TcpSocket; 206 207 _socket = new TcpSocket(new InternetAddress("localhost", port)); 208 _socket.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 209 } 210 211 bool hasData() 212 { 213 synchronized (_socket) 214 { 215 return _socket.isAlive; 216 } 217 } 218 219 bool hasPendingData() 220 { 221 import std.socket : SocketFlags; 222 223 static char[1] buffer; 224 ptrdiff_t result; 225 226 synchronized (_socket) 227 { 228 _socket.blocking = false; 229 result = _socket.receive(buffer, SocketFlags.PEEK); 230 _socket.blocking = true; 231 } 232 233 return result != Socket.ERROR && result > 0; 234 } 235 236 char[] read(const size_t size) 237 { 238 static char[] buffer; 239 buffer.length = size; 240 ptrdiff_t totalBytesReceived; 241 ptrdiff_t bytesReceived; 242 243 do 244 { 245 synchronized (_socket) 246 { 247 bytesReceived = _socket.receive(buffer); 248 } 249 250 if (bytesReceived != Socket.ERROR) 251 { 252 totalBytesReceived += bytesReceived; 253 } 254 else if (bytesReceived == 0) 255 { 256 buffer.length = totalBytesReceived; 257 break; 258 } 259 } 260 while (bytesReceived == Socket.ERROR || totalBytesReceived < size); 261 262 return buffer; 263 } 264 265 void write(const char[] data) 266 { 267 ptrdiff_t totalBytesSent; 268 ptrdiff_t bytesSent; 269 270 do 271 { 272 synchronized (_socket) 273 { 274 bytesSent = _socket.send(data[totalBytesSent .. $]); 275 } 276 277 if (bytesSent != Socket.ERROR) 278 { 279 totalBytesSent += bytesSent; 280 } 281 } 282 while (bytesSent == Socket.ERROR || totalBytesSent < data.length); 283 } 284 285 void flush() 286 { 287 } 288 }