序
本文主要研究一下flink的BlobService
BlobService
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
/**
* A simple store and retrieve binary large objects (BLOBs).
*/
public interface BlobService extends Closeable {
/**
* Returns a BLOB service for accessing permanent BLOBs.
*
* @return BLOB service
*/
PermanentBlobService getPermanentBlobService();
/**
* Returns a BLOB service for accessing transient BLOBs.
*
* @return BLOB service
*/
TransientBlobService getTransientBlobService();
/**
* Returns the port of the BLOB server that this BLOB service is working with.
*
* @return the port the blob server.
*/
int getPort();
}
- BlobService定义了getPermanentBlobService方法用于获取PermanentBlobService;getTransientBlobService方法用于获取TransientBlobService
PermanentBlobService
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
/**
* A service to retrieve permanent binary large objects (BLOBs).
*
* <p>These may include per-job BLOBs that are covered by high-availability (HA) mode, e.g. a job's
* JAR files or (parts of) an off-loaded {@link org.apache.flink.runtime.deployment.TaskDeploymentDescriptor}
* or files in the {@link org.apache.flink.api.common.cache.DistributedCache}.
*/
public interface PermanentBlobService extends Closeable {
/**
* Returns the path to a local copy of the file associated with the provided job ID and blob
* key.
*
* @param jobId
* ID of the job this blob belongs to
* @param key
* BLOB key associated with the requested file
*
* @return The path to the file.
*
* @throws java.io.FileNotFoundException
* if the BLOB does not exist;
* @throws IOException
* if any other error occurs when retrieving the file
*/
File getFile(JobID jobId, PermanentBlobKey key) throws IOException;
}
- PermanentBlobService提供了getFile方法,它根据JobID及PermanentBlobKey来获取File
TransientBlobService
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobService.java
/**
* A service to retrieve transient binary large objects (BLOBs) which are deleted on the
* {@link BlobServer} when they are retrieved.
*
* <p>These may include per-job BLOBs like files in the {@link
* org.apache.flink.api.common.cache.DistributedCache}, for example.
*
* <p>Note: None of these BLOBs is highly available (HA). This case is covered by BLOBs in the
* {@link PermanentBlobService}.
*
* <p>TODO: change API to not rely on local files but return {@link InputStream} objects
*/
public interface TransientBlobService extends Closeable {
// --------------------------------------------------------------------------------------------
// GET
// --------------------------------------------------------------------------------------------
/**
* Returns the path to a local copy of the (job-unrelated) file associated with the provided
* blob key.
*
* @param key
* blob key associated with the requested file
*
* @return The path to the file.
*
* @throws java.io.FileNotFoundException
* when the path does not exist;
* @throws IOException
* if any other error occurs when retrieving the file
*/
File getFile(TransientBlobKey key) throws IOException;
/**
* Returns the path to a local copy of the file associated with the provided job ID and blob
* key.
*
* @param jobId
* ID of the job this blob belongs to
* @param key
* blob key associated with the requested file
*
* @return The path to the file.
*
* @throws java.io.FileNotFoundException
* when the path does not exist;
* @throws IOException
* if any other error occurs when retrieving the file
*/
File getFile(JobID jobId, TransientBlobKey key) throws IOException;
// --------------------------------------------------------------------------------------------
// PUT
// --------------------------------------------------------------------------------------------
/**
* Uploads the (job-unrelated) data of the given byte array to the BLOB server.
*
* @param value
* the buffer to upload
*
* @return the computed BLOB key identifying the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while uploading the data to the BLOB server
*/
TransientBlobKey putTransient(byte[] value) throws IOException;
/**
* Uploads the data of the given byte array for the given job to the BLOB server.
*
* @param jobId
* the ID of the job the BLOB belongs to
* @param value
* the buffer to upload
*
* @return the computed BLOB key identifying the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while uploading the data to the BLOB server
*/
TransientBlobKey putTransient(JobID jobId, byte[] value) throws IOException;
/**
* Uploads the (job-unrelated) data from the given input stream to the BLOB server.
*
* @param inputStream
* the input stream to read the data from
*
* @return the computed BLOB key identifying the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while reading the data from the input stream or uploading the
* data to the BLOB server
*/
TransientBlobKey putTransient(InputStream inputStream) throws IOException;
/**
* Uploads the data from the given input stream for the given job to the BLOB server.
*
* @param jobId
* ID of the job this blob belongs to
* @param inputStream
* the input stream to read the data from
*
* @return the computed BLOB key identifying the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while reading the data from the input stream or uploading the
* data to the BLOB server
*/
TransientBlobKey putTransient(JobID jobId, InputStream inputStream) throws IOException;
// --------------------------------------------------------------------------------------------
// DELETE
// --------------------------------------------------------------------------------------------
/**
* Deletes the (job-unrelated) file associated with the provided blob key from the local cache.
*
* @param key
* associated with the file to be deleted
*
* @return <tt>true</tt> if the given blob is successfully deleted or non-existing;
* <tt>false</tt> otherwise
*/
boolean deleteFromCache(TransientBlobKey key);
/**
* Deletes the file associated with the provided job ID and blob key from the local cache.
*
* @param jobId
* ID of the job this blob belongs to
* @param key
* associated with the file to be deleted
*
* @return <tt>true</tt> if the given blob is successfully deleted or non-existing;
* <tt>false</tt> otherwise
*/
boolean deleteFromCache(JobID jobId, TransientBlobKey key);
}
- TransientBlobService用于获取transient binary large objects (BLOBs),这些blobs在获取时就会在BlobServer上删掉;它提供了getFile、putTransient、deleteFromCache方法
BlobKey
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
/**
* A BLOB key uniquely identifies a BLOB.
*/
public abstract class BlobKey implements Serializable, Comparable<BlobKey> {
private static final long serialVersionUID = 3847117712521785209L;
/** Size of the internal BLOB key in bytes. */
public static final int SIZE = 20;
/** The byte buffer storing the actual key data. */
private final byte[] key;
/**
* (Internal) BLOB type - to be reflected by the inheriting sub-class.
*/
private final BlobType type;
/**
* BLOB type, i.e. permanent or transient.
*/
enum BlobType {
/**
* Indicates a permanent BLOB whose lifecycle is that of a job and which is made highly
* available.
*/
PERMANENT_BLOB,
/**
* Indicates a transient BLOB whose lifecycle is managed by the user and which is not made
* highly available.
*/
TRANSIENT_BLOB
}
/**
* Random component of the key.
*/
private final AbstractID random;
/**
* Constructs a new BLOB key.
*
* @param type
* whether the referenced BLOB is permanent or transient
*/
protected BlobKey(BlobType type) {
this.type = checkNotNull(type);
this.key = new byte[SIZE];
this.random = new AbstractID();
}
/**
* Constructs a new BLOB key from the given byte array.
*
* @param type
* whether the referenced BLOB is permanent or transient
* @param key
* the actual key data
*/
protected BlobKey(BlobType type, byte[] key) {
if (key == null || key.length != SIZE) {
throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
}
this.type = checkNotNull(type);
this.key = key;
this.random = new AbstractID();
}
/**
* Constructs a new BLOB key from the given byte array.
*
* @param type
* whether the referenced BLOB is permanent or transient
* @param key
* the actual key data
* @param random
* the random component of the key
*/
protected BlobKey(BlobType type, byte[] key, byte[] random) {
if (key == null || key.length != SIZE) {
throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
}
this.type = checkNotNull(type);
this.key = key;
this.random = new AbstractID(random);
}
/**
* Returns the right {@link BlobKey} subclass for the given parameters.
*
* @param type
* whether the referenced BLOB is permanent or transient
*
* @return BlobKey subclass
*/
@VisibleForTesting
static BlobKey createKey(BlobType type) {
if (type == PERMANENT_BLOB) {
return new PermanentBlobKey();
} else {
return new TransientBlobKey();
}
}
/**
* Returns the right {@link BlobKey} subclass for the given parameters.
*
* @param type
* whether the referenced BLOB is permanent or transient
* @param key
* the actual key data
*
* @return BlobKey subclass
*/
static BlobKey createKey(BlobType type, byte[] key) {
if (type == PERMANENT_BLOB) {
return new PermanentBlobKey(key);
} else {
return new TransientBlobKey(key);
}
}
/**
* Returns the right {@link BlobKey} subclass for the given parameters.
*
* @param type
* whether the referenced BLOB is permanent or transient
* @param key
* the actual key data
* @param random
* the random component of the key
*
* @return BlobKey subclass
*/
static BlobKey createKey(BlobType type, byte[] key, byte[] random) {
if (type == PERMANENT_BLOB) {
return new PermanentBlobKey(key, random);
} else {
return new TransientBlobKey(key, random);
}
}
/**
* Returns the hash component of this key.
*
* @return a 20 bit hash of the contents the key refers to
*/
@VisibleForTesting
public byte[] getHash() {
return key;
}
/**
* Returns the (internal) BLOB type which is reflected by the inheriting sub-class.
*
* @return BLOB type, i.e. permanent or transient
*/
BlobType getType() {
return type;
}
/**
* Adds the BLOB key to the given {@link MessageDigest}.
*
* @param md
* the message digest to add the BLOB key to
*/
public void addToMessageDigest(MessageDigest md) {
md.update(this.key);
}
@Override
public boolean equals(final Object obj) {
if (!(obj instanceof BlobKey)) {
return false;
}
final BlobKey bk = (BlobKey) obj;
return Arrays.equals(this.key, bk.key) &&
this.type == bk.type &&
this.random.equals(bk.random);
}
@Override
public int hashCode() {
int result = Arrays.hashCode(this.key);
result = 37 * result + this.type.hashCode();
result = 37 * result + this.random.hashCode();
return result;
}
@Override
public String toString() {
final String typeString;
switch (this.type) {
case TRANSIENT_BLOB:
typeString = "t-";
break;
case PERMANENT_BLOB:
typeString = "p-";
break;
default:
// this actually never happens!
throw new IllegalStateException("Invalid BLOB type");
}
return typeString + StringUtils.byteToHexString(this.key) + "-" + random.toString();
}
@Override
public int compareTo(BlobKey o) {
// compare the hashes first
final byte[] aarr = this.key;
final byte[] barr = o.key;
final int len = Math.min(aarr.length, barr.length);
for (int i = 0; i < len; ++i) {
final int a = (aarr[i] & 0xff);
final int b = (barr[i] & 0xff);
if (a != b) {
return a - b;
}
}
if (aarr.length == barr.length) {
// same hash contents - compare the BLOB types
int typeCompare = this.type.compareTo(o.type);
if (typeCompare == 0) {
// same type - compare random components
return this.random.compareTo(o.random);
} else {
return typeCompare;
}
} else {
return aarr.length - barr.length;
}
}
// --------------------------------------------------------------------------------------------
/**
* Auxiliary method to read a BLOB key from an input stream.
*
* @param inputStream
* the input stream to read the BLOB key from
* @return the read BLOB key
* @throws IOException
* throw if an I/O error occurs while reading from the input stream
*/
static BlobKey readFromInputStream(InputStream inputStream) throws IOException {
final byte[] key = new byte[BlobKey.SIZE];
final byte[] random = new byte[AbstractID.SIZE];
int bytesRead = 0;
// read key
while (bytesRead < key.length) {
final int read = inputStream.read(key, bytesRead, key.length - bytesRead);
if (read < 0) {
throw new EOFException("Read an incomplete BLOB key");
}
bytesRead += read;
}
// read BLOB type
final BlobType blobType;
{
final int read = inputStream.read();
if (read < 0) {
throw new EOFException("Read an incomplete BLOB type");
} else if (read == TRANSIENT_BLOB.ordinal()) {
blobType = TRANSIENT_BLOB;
} else if (read == PERMANENT_BLOB.ordinal()) {
blobType = PERMANENT_BLOB;
} else {
throw new IOException("Invalid data received for the BLOB type: " + read);
}
}
// read random component
bytesRead = 0;
while (bytesRead < AbstractID.SIZE) {
final int read = inputStream.read(random, bytesRead, AbstractID.SIZE - bytesRead);
if (read < 0) {
throw new EOFException("Read an incomplete BLOB key");
}
bytesRead += read;
}
return createKey(blobType, key, random);
}
/**
* Auxiliary method to write this BLOB key to an output stream.
*
* @param outputStream
* the output stream to write the BLOB key to
* @throws IOException
* thrown if an I/O error occurs while writing the BLOB key
*/
void writeToOutputStream(final OutputStream outputStream) throws IOException {
outputStream.write(this.key);
outputStream.write(this.type.ordinal());
outputStream.write(this.random.getBytes());
}
}
- BlobKey是个抽象类,它有key、BlobType、AbstractID三个属性,其中BlobType分为PERMANENT_BLOB及TRANSIENT_BLOB;它定义了createKey静态方法,用于根据BlobType创建BlobKey;readFromInputStream方法用于从InputStream反序列化为BlobKey;writeToOutputStream方法用于将BlobKey序列化到OutputStream;它有两个子类,分别为PermanentBlobKey及TransientBlobKey
PermanentBlobKey
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
/**
* BLOB key referencing permanent BLOB files.
*/
public final class PermanentBlobKey extends BlobKey {
/**
* Constructs a new BLOB key.
*/
@VisibleForTesting
public PermanentBlobKey() {
super(BlobType.PERMANENT_BLOB);
}
/**
* Constructs a new BLOB key from the given byte array.
*
* @param key
* the actual key data
*/
PermanentBlobKey(byte[] key) {
super(BlobType.PERMANENT_BLOB, key);
}
/**
* Constructs a new BLOB key from the given byte array.
*
* @param key
* the actual key data
* @param random
* the random component of the key
*/
PermanentBlobKey(byte[] key, byte[] random) {
super(BlobType.PERMANENT_BLOB, key, random);
}
}
- PermanentBlobKey继承了BlobKey,它的BlobType为BlobType.PERMANENT_BLOB
TransientBlobKey
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
/**
* BLOB key referencing transient BLOB files.
*/
public final class TransientBlobKey extends BlobKey {
/**
* Constructs a new BLOB key.
*/
@VisibleForTesting
public TransientBlobKey() {
super(BlobType.TRANSIENT_BLOB);
}
/**
* Constructs a new BLOB key from the given byte array.
*
* @param key
* the actual key data
*/
TransientBlobKey(byte[] key) {
super(BlobType.TRANSIENT_BLOB, key);
}
/**
* Constructs a new BLOB key from the given byte array.
*
* @param key
* the actual key data
* @param random
* the random component of the key
*/
TransientBlobKey(byte[] key, byte[] random) {
super(BlobType.TRANSIENT_BLOB, key, random);
}
}
- TransientBlobKey继承了BlobKey,它的BlobType为BlobType.TRANSIENT_BLOB
AbstractID
flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
/**
* A statistically unique identification number.
*/
@PublicEvolving
public class AbstractID implements Comparable<AbstractID>, java.io.Serializable {
private static final long serialVersionUID = 1L;
private static final Random RND = new Random();
/** The size of a long in bytes. */
private static final int SIZE_OF_LONG = 8;
/** The size of the ID in byte. */
public static final int SIZE = 2 * SIZE_OF_LONG;
// ------------------------------------------------------------------------
/** The upper part of the actual ID. */
protected final long upperPart;
/** The lower part of the actual ID. */
protected final long lowerPart;
/** The memoized value returned by toString(). */
private transient String toString;
// --------------------------------------------------------------------------------------------
/**
* Constructs a new ID with a specific bytes value.
*/
public AbstractID(byte[] bytes) {
if (bytes == null || bytes.length != SIZE) {
throw new IllegalArgumentException("Argument bytes must by an array of " + SIZE + " bytes");
}
this.lowerPart = byteArrayToLong(bytes, 0);
this.upperPart = byteArrayToLong(bytes, SIZE_OF_LONG);
}
/**
* Constructs a new abstract ID.
*
* @param lowerPart the lower bytes of the ID
* @param upperPart the higher bytes of the ID
*/
public AbstractID(long lowerPart, long upperPart) {
this.lowerPart = lowerPart;
this.upperPart = upperPart;
}
/**
* Copy constructor: Creates a new abstract ID from the given one.
*
* @param id the abstract ID to copy
*/
public AbstractID(AbstractID id) {
if (id == null) {
throw new IllegalArgumentException("Id must not be null.");
}
this.lowerPart = id.lowerPart;
this.upperPart = id.upperPart;
}
/**
* Constructs a new random ID from a uniform distribution.
*/
public AbstractID() {
this.lowerPart = RND.nextLong();
this.upperPart = RND.nextLong();
}
// --------------------------------------------------------------------------------------------
/**
* Gets the lower 64 bits of the ID.
*
* @return The lower 64 bits of the ID.
*/
public long getLowerPart() {
return lowerPart;
}
/**
* Gets the upper 64 bits of the ID.
*
* @return The upper 64 bits of the ID.
*/
public long getUpperPart() {
return upperPart;
}
/**
* Gets the bytes underlying this ID.
*
* @return The bytes underlying this ID.
*/
public byte[] getBytes() {
byte[] bytes = new byte[SIZE];
longToByteArray(lowerPart, bytes, 0);
longToByteArray(upperPart, bytes, SIZE_OF_LONG);
return bytes;
}
// --------------------------------------------------------------------------------------------
// Standard Utilities
// --------------------------------------------------------------------------------------------
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (obj != null && obj.getClass() == getClass()) {
AbstractID that = (AbstractID) obj;
return that.lowerPart == this.lowerPart && that.upperPart == this.upperPart;
} else {
return false;
}
}
@Override
public int hashCode() {
return ((int) this.lowerPart) ^
((int) (this.lowerPart >>> 32)) ^
((int) this.upperPart) ^
((int) (this.upperPart >>> 32));
}
@Override
public String toString() {
if (this.toString == null) {
final byte[] ba = new byte[SIZE];
longToByteArray(this.lowerPart, ba, 0);
longToByteArray(this.upperPart, ba, SIZE_OF_LONG);
this.toString = StringUtils.byteToHexString(ba);
}
return this.toString;
}
@Override
public int compareTo(AbstractID o) {
int diff1 = Long.compare(this.upperPart, o.upperPart);
int diff2 = Long.compare(this.lowerPart, o.lowerPart);
return diff1 == 0 ? diff2 : diff1;
}
// --------------------------------------------------------------------------------------------
// Conversion Utilities
// --------------------------------------------------------------------------------------------
/**
* Converts the given byte array to a long.
*
* @param ba the byte array to be converted
* @param offset the offset indicating at which byte inside the array the conversion shall begin
* @return the long variable
*/
private static long byteArrayToLong(byte[] ba, int offset) {
long l = 0;
for (int i = 0; i < SIZE_OF_LONG; ++i) {
l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3);
}
return l;
}
/**
* Converts a long to a byte array.
*
* @param l the long variable to be converted
* @param ba the byte array to store the result the of the conversion
* @param offset offset indicating at what position inside the byte array the result of the conversion shall be stored
*/
private static void longToByteArray(long l, byte[] ba, int offset) {
for (int i = 0; i < SIZE_OF_LONG; ++i) {
final int shift = i << 3; // i * 8
ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift);
}
}
}
- AbstractID由upperPart及lowerPart两个long类型的属性组成;无参构造器会使用Random.nextLong来生成upperPart及lowerPart;bytes参数的构造器则会从bytes中解析出lowerPart及upperPart;也可以直接使用lowerPart及upperPart参数的构造器直接指定
小结
- BlobService定义了getPermanentBlobService方法用于获取PermanentBlobService;getTransientBlobService方法用于获取TransientBlobService;PermanentBlobService提供了getFile方法,它根据JobID及PermanentBlobKey来获取File;TransientBlobService用于获取transient binary large objects (BLOBs),这些blobs在获取时就会在BlobServer上删掉;它提供了getFile、putTransient、deleteFromCache方法
- BlobKey是个抽象类,它有key、BlobType、AbstractID三个属性,其中BlobType分为PERMANENT_BLOB及TRANSIENT_BLOB;它定义了createKey静态方法,用于根据BlobType创建BlobKey;readFromInputStream方法用于从InputStream反序列化为BlobKey;writeToOutputStream方法用于将BlobKey序列化到OutputStream;它有两个子类,分别为PermanentBlobKey及TransientBlobKey;PermanentBlobKey继承了BlobKey,它的BlobType为BlobType.PERMANENT_BLOB;TransientBlobKey继承了BlobKey,它的BlobType为BlobType.TRANSIENT_BLOB
- AbstractID由upperPart及lowerPart两个long类型的属性组成;无参构造器会使用Random.nextLong来生成upperPart及lowerPart;bytes参数的构造器则会从bytes中解析出lowerPart及upperPart;也可以直接使用lowerPart及upperPart参数的构造器直接指定