My working unpac space for OCaml projects in development
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #4517 from Cyan4973/asyncio_revisit

Remove asyncio from the compression path

authored by

Yann Collet and committed by
GitHub
124a6f01 8df20e32

+375 -131
+375 -131
vendor/git/zstd-c/programs/fileio.c
··· 125 125 #define TEMPORARY_FILE_PERMISSIONS (0600) 126 126 #endif 127 127 128 + 129 + #ifndef ZSTD_NOCOMPRESS 130 + 131 + /* ************************************* 132 + * Synchronous compression IO helpers 133 + * Lightweight wrapper used by compression paths to manage buffered 134 + * reads/writes without the async job machinery. 135 + ***************************************/ 136 + typedef struct { 137 + const FIO_prefs_t* prefs; 138 + FILE* srcFile; 139 + FILE* dstFile; 140 + unsigned storedSkips; 141 + U8* inBuffer; 142 + size_t inCapacity; 143 + U8* srcBuffer; 144 + size_t srcBufferLoaded; 145 + U8* outBuffer; 146 + size_t outCapacity; 147 + } FIO_SyncCompressIO; 148 + 149 + static void FIO_SyncCompressIO_init(FIO_SyncCompressIO* io, 150 + const FIO_prefs_t* prefs, 151 + size_t inCapacity, 152 + size_t outCapacity); 153 + static void FIO_SyncCompressIO_destroy(FIO_SyncCompressIO* io); 154 + static void FIO_SyncCompressIO_setSrc(FIO_SyncCompressIO* io, FILE* file); 155 + static void FIO_SyncCompressIO_clearSrc(FIO_SyncCompressIO* io); 156 + static void FIO_SyncCompressIO_setDst(FIO_SyncCompressIO* io, FILE* file); 157 + static int FIO_SyncCompressIO_closeDst(FIO_SyncCompressIO* io); 158 + static size_t FIO_SyncCompressIO_fillBuffer(FIO_SyncCompressIO* io, size_t minToHave); 159 + static void FIO_SyncCompressIO_consumeBytes(FIO_SyncCompressIO* io, size_t n); 160 + static void FIO_SyncCompressIO_commitOut(FIO_SyncCompressIO* io, const void* buffer, size_t size); 161 + static void FIO_SyncCompressIO_finish(FIO_SyncCompressIO* io); 162 + 163 + 164 + static unsigned FIO_sparseWrite(FILE* file, 165 + const void* buffer, size_t bufferSize, 166 + const FIO_prefs_t* const prefs, 167 + unsigned storedSkips) 168 + { 169 + const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */ 170 + size_t bufferSizeT = bufferSize / sizeof(size_t); 171 + const size_t* const bufferTEnd = bufferT + bufferSizeT; 172 + const size_t* ptrT = bufferT; 173 + static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */ 174 + 175 + if (prefs->testMode) return 0; /* do not output anything in test mode */ 176 + 177 + if (!prefs->sparseFileSupport) { /* normal write */ 178 + size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file); 179 + if (sizeCheck != bufferSize) 180 + EXM_THROW(70, "Write error : cannot write block : %s", 181 + strerror(errno)); 182 + return 0; 183 + } 184 + 185 + /* avoid int overflow */ 186 + if (storedSkips > 1 GB) { 187 + if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0) 188 + EXM_THROW(91, "1 GB skip error (sparse file support)"); 189 + storedSkips -= 1 GB; 190 + } 191 + 192 + while (ptrT < bufferTEnd) { 193 + size_t nb0T; 194 + 195 + /* adjust last segment if < 32 KB */ 196 + size_t seg0SizeT = segmentSizeT; 197 + if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT; 198 + bufferSizeT -= seg0SizeT; 199 + 200 + /* count leading zeroes */ 201 + for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ; 202 + storedSkips += (unsigned)(nb0T * sizeof(size_t)); 203 + 204 + if (nb0T != seg0SizeT) { /* not all 0s */ 205 + size_t const nbNon0ST = seg0SizeT - nb0T; 206 + /* skip leading zeros */ 207 + if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) 208 + EXM_THROW(92, "Sparse skip error ; try --no-sparse"); 209 + storedSkips = 0; 210 + /* write the rest */ 211 + if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST) 212 + EXM_THROW(93, "Write error : cannot write block : %s", 213 + strerror(errno)); 214 + } 215 + ptrT += seg0SizeT; 216 + } 217 + 218 + { static size_t const maskT = sizeof(size_t)-1; 219 + if (bufferSize & maskT) { 220 + /* size not multiple of sizeof(size_t) : implies end of block */ 221 + const char* const restStart = (const char*)bufferTEnd; 222 + const char* restPtr = restStart; 223 + const char* const restEnd = (const char*)buffer + bufferSize; 224 + assert(restEnd > restStart && restEnd < restStart + sizeof(size_t)); 225 + for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ; 226 + storedSkips += (unsigned) (restPtr - restStart); 227 + if (restPtr != restEnd) { 228 + /* not all remaining bytes are 0 */ 229 + size_t const restSize = (size_t)(restEnd - restPtr); 230 + if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) 231 + EXM_THROW(92, "Sparse skip error ; try --no-sparse"); 232 + if (fwrite(restPtr, 1, restSize, file) != restSize) 233 + EXM_THROW(95, "Write error : cannot write end of decoded block : %s", 234 + strerror(errno)); 235 + storedSkips = 0; 236 + } } } 237 + 238 + return storedSkips; 239 + } 240 + 241 + static void FIO_sparseWriteEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips) 242 + { 243 + if (file == NULL) return; 244 + if (prefs->testMode) { 245 + assert(storedSkips == 0); 246 + return; 247 + } 248 + if (storedSkips>0) { 249 + assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */ 250 + if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0) 251 + EXM_THROW(69, "Final skip error (sparse file support)"); 252 + /* last zero must be explicitly written, 253 + * so that skipped ones get implicitly translated as zero by FS */ 254 + { const char lastZeroByte[1] = { 0 }; 255 + if (fwrite(lastZeroByte, 1, 1, file) != 1) 256 + EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno)); 257 + } 258 + } 259 + } 260 + 261 + static void FIO_SyncCompressIO_init(FIO_SyncCompressIO* io, 262 + const FIO_prefs_t* prefs, 263 + size_t inCapacity, 264 + size_t outCapacity) 265 + { 266 + memset(io, 0, sizeof(*io)); 267 + io->prefs = prefs; 268 + io->inCapacity = inCapacity; 269 + io->outCapacity = outCapacity; 270 + io->inBuffer = (U8*)malloc(inCapacity); 271 + if (!io->inBuffer) 272 + EXM_THROW(101, "Allocation error : not enough memory"); 273 + io->outBuffer = (U8*)malloc(outCapacity); 274 + if (!io->outBuffer) { 275 + free(io->inBuffer); 276 + io->inBuffer = NULL; 277 + EXM_THROW(101, "Allocation error : not enough memory"); 278 + } 279 + io->srcBuffer = io->inBuffer; 280 + io->srcBufferLoaded = 0; 281 + } 282 + 283 + static void FIO_SyncCompressIO_destroy(FIO_SyncCompressIO* io) 284 + { 285 + if (!io) return; 286 + free(io->inBuffer); 287 + free(io->outBuffer); 288 + io->inBuffer = NULL; 289 + io->outBuffer = NULL; 290 + io->srcBuffer = NULL; 291 + io->srcBufferLoaded = 0; 292 + io->srcFile = NULL; 293 + io->dstFile = NULL; 294 + io->storedSkips = 0; 295 + } 296 + 297 + static void FIO_SyncCompressIO_setSrc(FIO_SyncCompressIO* io, FILE* file) 298 + { 299 + io->srcFile = file; 300 + io->srcBuffer = io->inBuffer; 301 + io->srcBufferLoaded = 0; 302 + } 303 + 304 + static void FIO_SyncCompressIO_clearSrc(FIO_SyncCompressIO* io) 305 + { 306 + io->srcFile = NULL; 307 + io->srcBuffer = io->inBuffer; 308 + io->srcBufferLoaded = 0; 309 + } 310 + 311 + static void FIO_SyncCompressIO_setDst(FIO_SyncCompressIO* io, FILE* file) 312 + { 313 + io->dstFile = file; 314 + io->storedSkips = 0; 315 + } 316 + 317 + static int FIO_SyncCompressIO_closeDst(FIO_SyncCompressIO* io) 318 + { 319 + int result = 0; 320 + if (io->dstFile != NULL) { 321 + FIO_SyncCompressIO_finish(io); 322 + result = fclose(io->dstFile); 323 + io->dstFile = NULL; 324 + } 325 + return result; 326 + } 327 + 328 + static size_t FIO_SyncCompressIO_fillBuffer(FIO_SyncCompressIO* io, size_t minToHave) 329 + { 330 + size_t added = 0; 331 + if (io->srcFile == NULL) 332 + return 0; 333 + 334 + if (minToHave > io->inCapacity) 335 + minToHave = io->inCapacity; 336 + 337 + if (io->srcBufferLoaded >= minToHave) 338 + return 0; 339 + 340 + if (io->srcBuffer != io->inBuffer) { 341 + if (io->srcBufferLoaded > 0) 342 + memmove(io->inBuffer, io->srcBuffer, io->srcBufferLoaded); 343 + io->srcBuffer = io->inBuffer; 344 + } 345 + 346 + while (io->srcBufferLoaded < minToHave) { 347 + size_t const toRead = io->inCapacity - io->srcBufferLoaded; 348 + size_t const readBytes = fread(io->inBuffer + io->srcBufferLoaded, 1, toRead, io->srcFile); 349 + if (readBytes == 0) { 350 + if (ferror(io->srcFile)) 351 + EXM_THROW(37, "Read error"); 352 + break; /* EOF */ 353 + } 354 + io->srcBufferLoaded += readBytes; 355 + added += readBytes; 356 + if (readBytes < toRead) 357 + break; 358 + } 359 + 360 + return added; 361 + } 362 + 363 + static void FIO_SyncCompressIO_consumeBytes(FIO_SyncCompressIO* io, size_t n) 364 + { 365 + assert(n <= io->srcBufferLoaded); 366 + io->srcBuffer += n; 367 + io->srcBufferLoaded -= n; 368 + if (io->srcBufferLoaded == 0) 369 + io->srcBuffer = io->inBuffer; 370 + } 371 + 372 + static void FIO_SyncCompressIO_commitOut(FIO_SyncCompressIO* io, const void* buffer, size_t size) 373 + { 374 + if (size == 0) 375 + return; 376 + if (io->dstFile == NULL) { 377 + assert(io->prefs->testMode); 378 + return; 379 + } 380 + io->storedSkips = FIO_sparseWrite(io->dstFile, buffer, size, io->prefs, io->storedSkips); 381 + } 382 + 383 + static void FIO_SyncCompressIO_finish(FIO_SyncCompressIO* io) 384 + { 385 + if (io->dstFile == NULL) 386 + return; 387 + FIO_sparseWriteEnd(io->prefs, io->dstFile, io->storedSkips); 388 + io->storedSkips = 0; 389 + } 390 + 391 + #endif /* ZSTD_NOCOMPRESS */ 392 + 128 393 /*-************************************ 129 394 * Signal (Ctrl-C trapping) 130 395 **************************************/ ··· 1078 1343 const char* dictFileName; 1079 1344 stat_t dictFileStat; 1080 1345 ZSTD_CStream* cctx; 1081 - WritePoolCtx_t *writeCtx; 1082 - ReadPoolCtx_t *readCtx; 1346 + FIO_SyncCompressIO io; 1083 1347 } cRess_t; 1084 1348 1085 1349 /** ZSTD_cycleLog() : ··· 1147 1411 dictBufferType = (useMMap && !forceNoUseMMap) ? FIO_mmapDict : FIO_mallocDict; 1148 1412 FIO_initDict(&ress.dict, dictFileName, prefs, &ress.dictFileStat, dictBufferType); /* works with dictFileName==NULL */ 1149 1413 1150 - ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_CStreamOutSize()); 1151 - ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_CStreamInSize()); 1414 + FIO_SyncCompressIO_init(&ress.io, prefs, ZSTD_CStreamInSize(), ZSTD_CStreamOutSize()); 1152 1415 1153 1416 /* Advanced parameters, including dictionary */ 1154 1417 if (dictFileName && (ress.dict.dictBuffer==NULL)) ··· 1212 1475 static void FIO_freeCResources(cRess_t* const ress) 1213 1476 { 1214 1477 FIO_freeDict(&(ress->dict)); 1215 - AIO_WritePool_free(ress->writeCtx); 1216 - AIO_ReadPool_free(ress->readCtx); 1478 + FIO_SyncCompressIO_destroy(&ress->io); 1217 1479 ZSTD_freeCStream(ress->cctx); /* never fails */ 1218 1480 } 1219 1481 1220 1482 1221 1483 #ifdef ZSTD_GZCOMPRESS 1222 1484 static unsigned long long 1223 - FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but not changed */ 1485 + FIO_compressGzFrame(cRess_t* ress, 1224 1486 const char* srcFileName, U64 const srcFileSize, 1225 1487 int compressionLevel, U64* readsize) 1226 1488 { 1489 + FIO_SyncCompressIO* const syncIO = &ress->io; 1227 1490 unsigned long long inFileSize = 0, outFileSize = 0; 1228 1491 z_stream strm; 1229 - IOJob_t *writeJob = NULL; 1230 1492 1231 1493 if (compressionLevel > Z_BEST_COMPRESSION) 1232 1494 compressionLevel = Z_BEST_COMPRESSION; ··· 1242 1504 EXM_THROW(71, "zstd: %s: deflateInit2 error %d \n", srcFileName, ret); 1243 1505 } } 1244 1506 1245 - writeJob = AIO_WritePool_acquireJob(ress->writeCtx); 1246 1507 strm.next_in = 0; 1247 1508 strm.avail_in = 0; 1248 - strm.next_out = (Bytef*)writeJob->buffer; 1249 - strm.avail_out = (uInt)writeJob->bufferSize; 1509 + strm.next_out = (Bytef*)syncIO->outBuffer; 1510 + strm.avail_out = (uInt)syncIO->outCapacity; 1250 1511 1251 1512 while (1) { 1252 1513 int ret; 1253 1514 if (strm.avail_in == 0) { 1254 - AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize()); 1255 - if (ress->readCtx->srcBufferLoaded == 0) break; 1256 - inFileSize += ress->readCtx->srcBufferLoaded; 1257 - strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer; 1258 - strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded; 1515 + size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, ZSTD_CStreamInSize()); 1516 + if (syncIO->srcBufferLoaded == 0) break; 1517 + inFileSize += added; 1518 + *readsize += added; 1519 + strm.next_in = (z_const unsigned char*)syncIO->srcBuffer; 1520 + strm.avail_in = (uInt)syncIO->srcBufferLoaded; 1259 1521 } 1260 1522 1261 1523 { 1262 1524 size_t const availBefore = strm.avail_in; 1263 1525 ret = deflate(&strm, Z_NO_FLUSH); 1264 - AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in); 1526 + FIO_SyncCompressIO_consumeBytes(syncIO, availBefore - strm.avail_in); 1265 1527 } 1266 1528 1267 1529 if (ret != Z_OK) 1268 1530 EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret); 1269 - { size_t const cSize = writeJob->bufferSize - strm.avail_out; 1531 + { size_t const cSize = (size_t)((uInt)syncIO->outCapacity - strm.avail_out); 1270 1532 if (cSize) { 1271 - writeJob->usedBufferSize = cSize; 1272 - AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); 1533 + FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, cSize); 1273 1534 outFileSize += cSize; 1274 - strm.next_out = (Bytef*)writeJob->buffer; 1275 - strm.avail_out = (uInt)writeJob->bufferSize; 1535 + strm.next_out = (Bytef*)syncIO->outBuffer; 1536 + strm.avail_out = (uInt)syncIO->outCapacity; 1276 1537 } } 1277 1538 if (srcFileSize == UTIL_FILESIZE_UNKNOWN) { 1278 1539 DISPLAYUPDATE_PROGRESS( ··· 1288 1549 1289 1550 while (1) { 1290 1551 int const ret = deflate(&strm, Z_FINISH); 1291 - { size_t const cSize = writeJob->bufferSize - strm.avail_out; 1552 + { size_t const cSize = (size_t)((uInt)syncIO->outCapacity - strm.avail_out); 1292 1553 if (cSize) { 1293 - writeJob->usedBufferSize = cSize; 1294 - AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); 1554 + FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, cSize); 1295 1555 outFileSize += cSize; 1296 - strm.next_out = (Bytef*)writeJob->buffer; 1297 - strm.avail_out = (uInt)writeJob->bufferSize; 1556 + strm.next_out = (Bytef*)syncIO->outBuffer; 1557 + strm.avail_out = (uInt)syncIO->outCapacity; 1298 1558 } } 1299 1559 if (ret == Z_STREAM_END) break; 1300 1560 if (ret != Z_BUF_ERROR) ··· 1306 1566 EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret); 1307 1567 } } 1308 1568 *readsize = inFileSize; 1309 - AIO_WritePool_releaseIoJob(writeJob); 1310 - AIO_WritePool_sparseWriteEnd(ress->writeCtx); 1569 + FIO_SyncCompressIO_finish(syncIO); 1311 1570 return outFileSize; 1312 1571 } 1313 1572 #endif ··· 1319 1578 const char* srcFileName, U64 const srcFileSize, 1320 1579 int compressionLevel, U64* readsize, int plain_lzma) 1321 1580 { 1581 + FIO_SyncCompressIO* const syncIO = &ress->io; 1322 1582 unsigned long long inFileSize = 0, outFileSize = 0; 1323 1583 lzma_stream strm = LZMA_STREAM_INIT; 1324 1584 lzma_action action = LZMA_RUN; 1325 1585 lzma_ret ret; 1326 - IOJob_t *writeJob = NULL; 1327 1586 1328 1587 if (compressionLevel < 0) compressionLevel = 0; 1329 1588 if (compressionLevel > 9) compressionLevel = 9; ··· 1341 1600 EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret); 1342 1601 } 1343 1602 1344 - writeJob =AIO_WritePool_acquireJob(ress->writeCtx); 1345 - strm.next_out = (BYTE*)writeJob->buffer; 1346 - strm.avail_out = writeJob->bufferSize; 1603 + strm.next_out = (BYTE*)syncIO->outBuffer; 1604 + strm.avail_out = syncIO->outCapacity; 1347 1605 strm.next_in = 0; 1348 1606 strm.avail_in = 0; 1349 1607 1350 1608 while (1) { 1351 1609 if (strm.avail_in == 0) { 1352 - size_t const inSize = AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize()); 1353 - if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH; 1354 - inFileSize += inSize; 1355 - strm.next_in = (BYTE const*)ress->readCtx->srcBuffer; 1356 - strm.avail_in = ress->readCtx->srcBufferLoaded; 1610 + size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, ZSTD_CStreamInSize()); 1611 + if (syncIO->srcBufferLoaded == 0) action = LZMA_FINISH; 1612 + inFileSize += added; 1613 + *readsize += added; 1614 + strm.next_in = (BYTE const*)syncIO->srcBuffer; 1615 + strm.avail_in = syncIO->srcBufferLoaded; 1357 1616 } 1358 1617 1359 1618 { 1360 1619 size_t const availBefore = strm.avail_in; 1361 1620 ret = lzma_code(&strm, action); 1362 - AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in); 1621 + FIO_SyncCompressIO_consumeBytes(syncIO, availBefore - strm.avail_in); 1363 1622 } 1364 - 1365 1623 1366 1624 if (ret != LZMA_OK && ret != LZMA_STREAM_END) 1367 1625 EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret); 1368 - { size_t const compBytes = writeJob->bufferSize - strm.avail_out; 1626 + { size_t const compBytes = syncIO->outCapacity - strm.avail_out; 1369 1627 if (compBytes) { 1370 - writeJob->usedBufferSize = compBytes; 1371 - AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); 1628 + FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, compBytes); 1372 1629 outFileSize += compBytes; 1373 - strm.next_out = (BYTE*)writeJob->buffer; 1374 - strm.avail_out = writeJob->bufferSize; 1630 + strm.next_out = (BYTE*)syncIO->outBuffer; 1631 + strm.avail_out = syncIO->outCapacity; 1375 1632 } } 1376 1633 if (srcFileSize == UTIL_FILESIZE_UNKNOWN) 1377 1634 DISPLAYUPDATE_PROGRESS("\rRead : %u MB ==> %.2f%%", ··· 1387 1644 lzma_end(&strm); 1388 1645 *readsize = inFileSize; 1389 1646 1390 - AIO_WritePool_releaseIoJob(writeJob); 1391 - AIO_WritePool_sparseWriteEnd(ress->writeCtx); 1647 + FIO_SyncCompressIO_finish(syncIO); 1392 1648 1393 1649 return outFileSize; 1394 1650 } ··· 1409 1665 int compressionLevel, int checksumFlag, 1410 1666 U64* readsize) 1411 1667 { 1668 + FIO_SyncCompressIO* const syncIO = &ress->io; 1412 1669 const size_t blockSize = FIO_LZ4_GetBlockSize_FromBlockId(LZ4F_max64KB); 1413 1670 unsigned long long inFileSize = 0, outFileSize = 0; 1414 1671 1415 1672 LZ4F_preferences_t prefs; 1416 1673 LZ4F_compressionContext_t ctx; 1417 1674 1418 - IOJob_t* writeJob = AIO_WritePool_acquireJob(ress->writeCtx); 1419 - 1420 1675 LZ4F_errorCode_t const errorCode = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION); 1421 1676 if (LZ4F_isError(errorCode)) 1422 1677 EXM_THROW(31, "zstd: failed to create lz4 compression context"); 1423 1678 1424 1679 memset(&prefs, 0, sizeof(prefs)); 1425 1680 1426 - assert(blockSize <= ress->readCtx->base.jobBufferSize); 1681 + assert(blockSize <= syncIO->inCapacity); 1427 1682 1428 1683 /* autoflush off to mitigate a bug in lz4<=1.9.3 for compression level 12 */ 1429 1684 prefs.autoFlush = 0; ··· 1434 1689 #if LZ4_VERSION_NUMBER >= 10600 1435 1690 prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize; 1436 1691 #endif 1437 - assert(LZ4F_compressBound(blockSize, &prefs) <= writeJob->bufferSize); 1692 + assert(LZ4F_compressBound(blockSize, &prefs) <= syncIO->outCapacity); 1438 1693 1439 1694 { 1440 - size_t headerSize = LZ4F_compressBegin(ctx, writeJob->buffer, writeJob->bufferSize, &prefs); 1695 + size_t headerSize = LZ4F_compressBegin(ctx, syncIO->outBuffer, syncIO->outCapacity, &prefs); 1441 1696 if (LZ4F_isError(headerSize)) 1442 1697 EXM_THROW(33, "File header generation failed : %s", 1443 1698 LZ4F_getErrorName(headerSize)); 1444 - writeJob->usedBufferSize = headerSize; 1445 - AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); 1699 + FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, headerSize); 1446 1700 outFileSize += headerSize; 1447 1701 1448 - /* Read first block */ 1449 - inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize); 1702 + { 1703 + size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, blockSize); 1704 + inFileSize += added; 1705 + *readsize += added; 1706 + } 1450 1707 1451 - /* Main Loop */ 1452 - while (ress->readCtx->srcBufferLoaded) { 1453 - size_t inSize = MIN(blockSize, ress->readCtx->srcBufferLoaded); 1454 - size_t const outSize = LZ4F_compressUpdate(ctx, writeJob->buffer, writeJob->bufferSize, 1455 - ress->readCtx->srcBuffer, inSize, NULL); 1708 + while (syncIO->srcBufferLoaded) { 1709 + size_t const inSize = MIN(blockSize, syncIO->srcBufferLoaded); 1710 + size_t const outSize = LZ4F_compressUpdate(ctx, syncIO->outBuffer, syncIO->outCapacity, 1711 + syncIO->srcBuffer, inSize, NULL); 1456 1712 if (LZ4F_isError(outSize)) 1457 1713 EXM_THROW(35, "zstd: %s: lz4 compression failed : %s", 1458 1714 srcFileName, LZ4F_getErrorName(outSize)); ··· 1467 1723 (double)outFileSize/(double)inFileSize*100); 1468 1724 } 1469 1725 1470 - /* Write Block */ 1471 - writeJob->usedBufferSize = outSize; 1472 - AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); 1726 + FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, outSize); 1473 1727 1474 - /* Read next block */ 1475 - AIO_ReadPool_consumeBytes(ress->readCtx, inSize); 1476 - inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize); 1728 + FIO_SyncCompressIO_consumeBytes(syncIO, inSize); 1729 + { 1730 + size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, blockSize); 1731 + inFileSize += added; 1732 + *readsize += added; 1733 + } 1477 1734 } 1478 1735 1479 - /* End of Stream mark */ 1480 - headerSize = LZ4F_compressEnd(ctx, writeJob->buffer, writeJob->bufferSize, NULL); 1736 + headerSize = LZ4F_compressEnd(ctx, syncIO->outBuffer, syncIO->outCapacity, NULL); 1481 1737 if (LZ4F_isError(headerSize)) 1482 1738 EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s", 1483 1739 srcFileName, LZ4F_getErrorName(headerSize)); 1484 1740 1485 - writeJob->usedBufferSize = headerSize; 1486 - AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); 1741 + FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, headerSize); 1487 1742 outFileSize += headerSize; 1488 1743 } 1489 1744 1490 - *readsize = inFileSize; 1491 1745 LZ4F_freeCompressionContext(ctx); 1492 - AIO_WritePool_releaseIoJob(writeJob); 1493 - AIO_WritePool_sparseWriteEnd(ress->writeCtx); 1746 + FIO_SyncCompressIO_finish(syncIO); 1494 1747 1495 1748 return outFileSize; 1496 1749 } ··· 1499 1752 static unsigned long long 1500 1753 FIO_compressZstdFrame(FIO_ctx_t* const fCtx, 1501 1754 FIO_prefs_t* const prefs, 1502 - const cRess_t* ressPtr, 1755 + cRess_t* ress, 1503 1756 const char* srcFileName, U64 fileSize, 1504 1757 int compressionLevel, U64* readsize) 1505 1758 { 1506 - cRess_t const ress = *ressPtr; 1507 - IOJob_t* writeJob = AIO_WritePool_acquireJob(ressPtr->writeCtx); 1759 + FIO_SyncCompressIO* const syncIO = &ress->io; 1508 1760 1509 1761 U64 compressedfilesize = 0; 1510 1762 ZSTD_EndDirective directive = ZSTD_e_continue; ··· 1529 1781 /* init */ 1530 1782 if (fileSize != UTIL_FILESIZE_UNKNOWN) { 1531 1783 pledgedSrcSize = fileSize; 1532 - CHECK(ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize)); 1784 + CHECK(ZSTD_CCtx_setPledgedSrcSize(ress->cctx, fileSize)); 1533 1785 } else if (prefs->streamSrcSize > 0) { 1534 1786 /* unknown source size; use the declared stream size */ 1535 1787 pledgedSrcSize = prefs->streamSrcSize; 1536 - CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, prefs->streamSrcSize) ); 1788 + CHECK( ZSTD_CCtx_setPledgedSrcSize(ress->cctx, prefs->streamSrcSize) ); 1537 1789 } 1538 1790 1539 1791 { int windowLog; 1540 1792 UTIL_HumanReadableSize_t windowSize; 1541 - CHECK(ZSTD_CCtx_getParameter(ress.cctx, ZSTD_c_windowLog, &windowLog)); 1793 + CHECK(ZSTD_CCtx_getParameter(ress->cctx, ZSTD_c_windowLog, &windowLog)); 1542 1794 if (windowLog == 0) { 1543 1795 if (prefs->ldmFlag) { 1544 1796 /* If long mode is set without a window size libzstd will set this size internally */ ··· 1556 1808 do { 1557 1809 size_t stillToFlush; 1558 1810 /* Fill input Buffer */ 1559 - size_t const inSize = AIO_ReadPool_fillBuffer(ress.readCtx, ZSTD_CStreamInSize()); 1560 - ZSTD_inBuffer inBuff = setInBuffer( ress.readCtx->srcBuffer, ress.readCtx->srcBufferLoaded, 0 ); 1811 + size_t const inSize = FIO_SyncCompressIO_fillBuffer(syncIO, ZSTD_CStreamInSize()); 1812 + ZSTD_inBuffer inBuff = setInBuffer( syncIO->srcBuffer, syncIO->srcBufferLoaded, 0 ); 1561 1813 DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize); 1562 1814 *readsize += inSize; 1563 1815 1564 - if ((ress.readCtx->srcBufferLoaded == 0) || (*readsize == fileSize)) 1816 + if ((syncIO->srcBufferLoaded == 0) || (*readsize == fileSize)) 1565 1817 directive = ZSTD_e_end; 1566 1818 1567 1819 stillToFlush = 1; ··· 1569 1821 || (directive == ZSTD_e_end && stillToFlush != 0) ) { 1570 1822 1571 1823 size_t const oldIPos = inBuff.pos; 1572 - ZSTD_outBuffer outBuff = setOutBuffer( writeJob->buffer, writeJob->bufferSize, 0 ); 1573 - size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx); 1574 - CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive)); 1575 - AIO_ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos); 1824 + ZSTD_outBuffer outBuff = setOutBuffer( syncIO->outBuffer, syncIO->outCapacity, 0 ); 1825 + size_t const toFlushNow = ZSTD_toFlushNow(ress->cctx); 1826 + CHECK_V(stillToFlush, ZSTD_compressStream2(ress->cctx, &outBuff, &inBuff, directive)); 1827 + FIO_SyncCompressIO_consumeBytes(syncIO, inBuff.pos - oldIPos); 1576 1828 1577 1829 /* count stats */ 1578 1830 inputPresented++; ··· 1583 1835 DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n", 1584 1836 (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos); 1585 1837 if (outBuff.pos) { 1586 - writeJob->usedBufferSize = outBuff.pos; 1587 - AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); 1838 + FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, outBuff.pos); 1588 1839 compressedfilesize += outBuff.pos; 1589 1840 } 1590 1841 1591 1842 /* adaptive mode : statistics measurement and speed correction */ 1592 1843 if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) { 1593 - ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); 1844 + ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress->cctx); 1594 1845 1595 1846 lastAdaptTime = UTIL_getTime(); 1596 1847 ··· 1663 1914 if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel(); 1664 1915 if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel; 1665 1916 compressionLevel += (compressionLevel == 0); /* skip 0 */ 1666 - ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel); 1917 + ZSTD_CCtx_setParameter(ress->cctx, ZSTD_c_compressionLevel, compressionLevel); 1667 1918 } 1668 1919 if (speedChange == faster) { 1669 1920 DISPLAYLEVEL(6, "faster speed , lighter compression \n") 1670 1921 compressionLevel --; 1671 1922 if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel; 1672 1923 compressionLevel -= (compressionLevel == 0); /* skip 0 */ 1673 - ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel); 1924 + ZSTD_CCtx_setParameter(ress->cctx, ZSTD_c_compressionLevel, compressionLevel); 1674 1925 } 1675 1926 speedChange = noChange; 1676 1927 ··· 1680 1931 1681 1932 /* display notification */ 1682 1933 if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) { 1683 - ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); 1934 + ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress->cctx); 1684 1935 double const cShare = (double)zfp.produced / (double)(zfp.consumed + !zfp.consumed/*avoid div0*/) * 100; 1685 1936 UTIL_HumanReadableSize_t const buffered_hrs = UTIL_makeHumanReadableSize(zfp.ingested - zfp.consumed); 1686 1937 UTIL_HumanReadableSize_t const consumed_hrs = UTIL_makeHumanReadableSize(zfp.consumed); ··· 1727 1978 (unsigned long long)*readsize, (unsigned long long)fileSize); 1728 1979 } 1729 1980 1730 - AIO_WritePool_releaseIoJob(writeJob); 1731 - AIO_WritePool_sparseWriteEnd(ressPtr->writeCtx); 1981 + FIO_SyncCompressIO_finish(syncIO); 1732 1982 1733 1983 return compressedfilesize; 1734 1984 } ··· 1741 1991 static int 1742 1992 FIO_compressFilename_internal(FIO_ctx_t* const fCtx, 1743 1993 FIO_prefs_t* const prefs, 1744 - cRess_t ress, 1994 + cRess_t* ress, 1745 1995 const char* dstFileName, const char* srcFileName, 1746 1996 int compressionLevel) 1747 1997 { ··· 1756 2006 switch (prefs->compressionType) { 1757 2007 default: 1758 2008 case FIO_zstdCompression: 1759 - compressedfilesize = FIO_compressZstdFrame(fCtx, prefs, &ress, srcFileName, fileSize, compressionLevel, &readsize); 2009 + compressedfilesize = FIO_compressZstdFrame(fCtx, prefs, ress, srcFileName, fileSize, compressionLevel, &readsize); 1760 2010 break; 1761 2011 1762 2012 case FIO_gzipCompression: 1763 2013 #ifdef ZSTD_GZCOMPRESS 1764 - compressedfilesize = FIO_compressGzFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize); 2014 + compressedfilesize = FIO_compressGzFrame(ress, srcFileName, fileSize, compressionLevel, &readsize); 1765 2015 #else 1766 2016 (void)compressionLevel; 1767 2017 EXM_THROW(20, "zstd: %s: file cannot be compressed as gzip (zstd compiled without ZSTD_GZCOMPRESS) -- ignored \n", ··· 1772 2022 case FIO_xzCompression: 1773 2023 case FIO_lzmaCompression: 1774 2024 #ifdef ZSTD_LZMACOMPRESS 1775 - compressedfilesize = FIO_compressLzmaFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize, prefs->compressionType==FIO_lzmaCompression); 2025 + compressedfilesize = FIO_compressLzmaFrame(ress, srcFileName, fileSize, compressionLevel, &readsize, prefs->compressionType==FIO_lzmaCompression); 1776 2026 #else 1777 2027 (void)compressionLevel; 1778 2028 EXM_THROW(20, "zstd: %s: file cannot be compressed as xz/lzma (zstd compiled without ZSTD_LZMACOMPRESS) -- ignored \n", ··· 1782 2032 1783 2033 case FIO_lz4Compression: 1784 2034 #ifdef ZSTD_LZ4COMPRESS 1785 - compressedfilesize = FIO_compressLz4Frame(&ress, srcFileName, fileSize, compressionLevel, prefs->checksumFlag, &readsize); 2035 + compressedfilesize = FIO_compressLz4Frame(ress, srcFileName, fileSize, compressionLevel, prefs->checksumFlag, &readsize); 1786 2036 #else 1787 2037 (void)compressionLevel; 1788 2038 EXM_THROW(20, "zstd: %s: file cannot be compressed as lz4 (zstd compiled without ZSTD_LZ4COMPRESS) -- ignored \n", ··· 1838 2088 */ 1839 2089 static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx, 1840 2090 FIO_prefs_t* const prefs, 1841 - cRess_t ress, 2091 + cRess_t* ress, 1842 2092 const char* dstFileName, 1843 2093 const char* srcFileName, 1844 2094 const stat_t* srcFileStat, ··· 1849 2099 int transferStat = 0; 1850 2100 int dstFd = -1; 1851 2101 1852 - assert(AIO_ReadPool_getFile(ress.readCtx) != NULL); 1853 - if (AIO_WritePool_getFile(ress.writeCtx) == NULL) { 2102 + if (ress->io.dstFile == NULL) { 1854 2103 int dstFileInitialPermissions = DEFAULT_FILE_PERMISSIONS; 1855 2104 if ( strcmp (srcFileName, stdinmark) 1856 2105 && strcmp (dstFileName, stdoutmark) ··· 1861 2110 1862 2111 closeDstFile = 1; 1863 2112 DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: opening dst: %s \n", dstFileName); 1864 - { FILE *dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFileInitialPermissions); 2113 + { 2114 + FILE *dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFileInitialPermissions); 1865 2115 if (dstFile==NULL) return 1; /* could not open dstFileName */ 1866 2116 dstFd = fileno(dstFile); 1867 - AIO_WritePool_setFile(ress.writeCtx, dstFile); 2117 + FIO_SyncCompressIO_setDst(&ress->io, dstFile); 1868 2118 } 1869 - /* Must only be added after FIO_openDstFile() succeeds. 1870 - * Otherwise we may delete the destination file if it already exists, 1871 - * and the user presses Ctrl-C when asked if they wish to overwrite. 1872 - */ 2119 + /* Must only be added after FIO_openDstFile() succeeds. */ 1873 2120 addHandler(dstFileName); 1874 2121 } 1875 2122 ··· 1883 2130 } 1884 2131 1885 2132 DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: closing dst: %s \n", dstFileName); 1886 - if (AIO_WritePool_closeFile(ress.writeCtx)) { /* error closing file */ 2133 + if (FIO_SyncCompressIO_closeDst(&ress->io)) { /* error closing file */ 1887 2134 DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno)); 1888 2135 result=1; 1889 2136 } ··· 1892 2139 UTIL_utime(dstFileName, srcFileStat); 1893 2140 } 1894 2141 1895 - if ( (result != 0) /* operation failure */ 1896 - && strcmp(dstFileName, stdoutmark) /* special case : don't remove() stdout */ 1897 - ) { 1898 - FIO_removeFile(dstFileName); /* remove compression artefact; note don't do anything special if remove() fails */ 2142 + if ( (result != 0) 2143 + && strcmp(dstFileName, stdoutmark) ) { 2144 + FIO_removeFile(dstFileName); 1899 2145 } 1900 2146 } 1901 2147 ··· 2029 2275 static int 2030 2276 FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx, 2031 2277 FIO_prefs_t* const prefs, 2032 - cRess_t ress, 2278 + cRess_t* ress, 2033 2279 const char* dstFileName, 2034 2280 const char* srcFileName, 2035 2281 int compressionLevel) ··· 2051 2297 } 2052 2298 2053 2299 /* ensure src is not the same as dict (if present) */ 2054 - if (ress.dictFileName != NULL && UTIL_isSameFileStat(srcFileName, ress.dictFileName, &srcFileStat, &ress.dictFileStat)) { 2300 + if (ress->dictFileName != NULL && UTIL_isSameFileStat(srcFileName, ress->dictFileName, &srcFileStat, &ress->dictFileStat)) { 2055 2301 DISPLAYLEVEL(1, "zstd: cannot use %s as an input file and dictionary \n", srcFileName); 2056 2302 return 1; 2057 2303 } ··· 2070 2316 srcFile = FIO_openSrcFile(prefs, srcFileName, &srcFileStat); 2071 2317 if (srcFile == NULL) return 1; /* srcFile could not be opened */ 2072 2318 2073 - /* Don't use AsyncIO for small files */ 2074 2319 if (strcmp(srcFileName, stdinmark)) /* Stdin doesn't have stats */ 2075 2320 fileSize = UTIL_getFileSizeStat(&srcFileStat); 2076 - if(fileSize != UTIL_FILESIZE_UNKNOWN && fileSize < ZSTD_BLOCKSIZE_MAX * 3) { 2077 - AIO_ReadPool_setAsync(ress.readCtx, 0); 2078 - AIO_WritePool_setAsync(ress.writeCtx, 0); 2079 - } else { 2080 - AIO_ReadPool_setAsync(ress.readCtx, 1); 2081 - AIO_WritePool_setAsync(ress.writeCtx, 1); 2082 - } 2321 + (void)fileSize; 2083 2322 2084 - AIO_ReadPool_setFile(ress.readCtx, srcFile); 2323 + FIO_SyncCompressIO_setSrc(&ress->io, srcFile); 2085 2324 result = FIO_compressFilename_dstFile( 2086 2325 fCtx, prefs, ress, 2087 2326 dstFileName, srcFileName, 2088 2327 &srcFileStat, compressionLevel); 2089 - AIO_ReadPool_closeFile(ress.readCtx); 2328 + FIO_SyncCompressIO_clearSrc(&ress->io); 2329 + 2330 + if (srcFile != NULL && fclose(srcFile)) { 2331 + DISPLAYLEVEL(1, "zstd: %s: %s \n", srcFileName, strerror(errno)); 2332 + return 1; 2333 + } 2090 2334 2091 2335 if ( prefs->removeSrcFile /* --rm */ 2092 2336 && result == 0 /* success */ ··· 2153 2397 int compressionLevel, ZSTD_compressionParameters comprParams) 2154 2398 { 2155 2399 cRess_t ress = FIO_createCResources(prefs, dictFileName, UTIL_getFileSize(srcFileName), compressionLevel, comprParams); 2156 - int const result = FIO_compressFilename_srcFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel); 2400 + int const result = FIO_compressFilename_srcFile(fCtx, prefs, &ress, dstFileName, srcFileName, compressionLevel); 2157 2401 2158 2402 #define DISPLAY_LEVEL_DEFAULT 2 2159 2403 ··· 2250 2494 if (dstFile == NULL) { /* could not open outFileName */ 2251 2495 error = 1; 2252 2496 } else { 2253 - AIO_WritePool_setFile(ress.writeCtx, dstFile); 2497 + FIO_SyncCompressIO_setDst(&ress.io, dstFile); 2254 2498 for (; fCtx->currFileIdx < fCtx->nbFilesTotal; ++fCtx->currFileIdx) { 2255 - status = FIO_compressFilename_srcFile(fCtx, prefs, ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel); 2499 + status = FIO_compressFilename_srcFile(fCtx, prefs, &ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel); 2256 2500 if (!status) fCtx->nbFilesProcessed++; 2257 2501 error |= status; 2258 2502 } 2259 - if (AIO_WritePool_closeFile(ress.writeCtx)) 2503 + if (FIO_SyncCompressIO_closeDst(&ress.io)) 2260 2504 EXM_THROW(29, "Write error (%s) : cannot properly close %s", 2261 2505 strerror(errno), outFileName); 2262 2506 } ··· 2280 2524 } else { 2281 2525 dstFileName = FIO_determineCompressedName(srcFileName, outDirName, suffix); /* cannot fail */ 2282 2526 } 2283 - status = FIO_compressFilename_srcFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel); 2527 + status = FIO_compressFilename_srcFile(fCtx, prefs, &ress, dstFileName, srcFileName, compressionLevel); 2284 2528 if (!status) fCtx->nbFilesProcessed++; 2285 2529 error |= status; 2286 2530 }