A loose federation of distributed, typed datasets
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Updated typing to help suppress linting errors on fyle bytes types

+8 -6
+8 -6
src/atdata/local.py
··· 59 59 Type, 60 60 TypeVar, 61 61 Generator, 62 + BinaryIO, 63 + cast, 62 64 ) 63 65 64 66 T = TypeVar( 'T', bound = PackableSample ) ··· 294 296 metadata_path.parent.mkdir( parents = True, exist_ok = True ) 295 297 296 298 if ds.metadata is not None: 297 - with hive_fs.open( metadata_path, 'wb' ) as f: 298 - # TODO Figure out how to make linting work better here 299 - f.write( msgpack.packb( ds.metadata ) ) 299 + with cast( BinaryIO, hive_fs.open( metadata_path, 'wb' ) ) as f: 300 + meta_packed = msgpack.packb( ds.metadata ) 301 + assert meta_packed is not None 302 + f.write( cast( bytes, meta_packed ) ) 300 303 301 304 302 305 # Write data ··· 320 323 # Copy to S3 321 324 print( 'Copying file to s3 ...', end = '' ) 322 325 with open( local_cache_path, 'rb' ) as f_in: 323 - with hive_fs.open( p, 'wb' ) as f_out: 324 - # TODO Linting issues 326 + with cast( BinaryIO, hive_fs.open( p, 'wb' ) ) as f_out: 325 327 f_out.write( f_in.read() ) 326 328 print( ' done.' ) 327 329 ··· 334 336 writer_post = _writer_post 335 337 336 338 else: 337 - writer_opener = lambda s: hive_fs.open( s, 'wb' ) 339 + writer_opener = lambda s: cast( BinaryIO, hive_fs.open( s, 'wb' ) ) 338 340 writer_post = lambda s: written_shards.append( s ) 339 341 340 342 written_shards = []