Signed-off-by: chixinze <xmdxcxz@gmail.com> (cherry picked from commit d6f3fb67431b24c824467959ead0f60b0305c7cd)
155 lines
5.9 KiB
Diff
155 lines
5.9 KiB
Diff
From c3f9c972297c4d73a901453e806c16044e570667 Mon Sep 17 00:00:00 2001
|
|
From: Rishabh Dave <rishabhddave@gmail.com>
|
|
Date: Thu, 7 Jun 2018 12:26:44 +0000
|
|
Subject: [PATCH 1/2] ceph-volume-client: allow atomic updates for RADOS
|
|
objects
|
|
|
|
put_object_versioned() takes the version of the object and verifies if
|
|
the version of the object is the expected one before updating the data
|
|
in the object. This verification of version before actually writing
|
|
makes put_objcet_version() atomic.
|
|
|
|
Rest of the changes include adding get_object_and_version() so that
|
|
current version of the object can be obtained and modification of
|
|
get_object() and put_object() to use get_object_and_version() and
|
|
put_object_versioned() respectively.
|
|
|
|
Fixes: http://tracker.ceph.com/issues/24173
|
|
Signed-off-by: Rishabh Dave <ridave@redhat.com>
|
|
(cherry picked from commit ca7253cff6cdac590bb14d0d297c02452bf75bf6)
|
|
---
|
|
src/pybind/ceph_volume_client.py | 46 +++++++++++++++++++++++++++++---
|
|
src/pybind/rados/rados.pyx | 14 ++++++++++
|
|
2 files changed, 57 insertions(+), 3 deletions(-)
|
|
|
|
diff --git a/src/pybind/ceph_volume_client.py b/src/pybind/ceph_volume_client.py
|
|
index d38f72be9a3..c43681bef21 100644
|
|
--- a/src/pybind/ceph_volume_client.py
|
|
+++ b/src/pybind/ceph_volume_client.py
|
|
@@ -205,6 +205,7 @@ CEPHFSVOLUMECLIENT_VERSION_HISTORY = """
|
|
* 1 - Initial version
|
|
* 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
|
|
* 3 - Allow volumes to be created without RADOS namespace isolation
|
|
+ * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient
|
|
"""
|
|
|
|
|
|
@@ -228,7 +229,7 @@ class CephFSVolumeClient(object):
|
|
"""
|
|
|
|
# Current version
|
|
- version = 3
|
|
+ version = 4
|
|
|
|
# Where shall we create our volumes?
|
|
POOL_PREFIX = "fsvolume_"
|
|
@@ -1403,15 +1404,40 @@ class CephFSVolumeClient(object):
|
|
:param data: data to write
|
|
:type data: bytes
|
|
"""
|
|
+ return self.put_object_versioned(pool_name, object_name, data)
|
|
+
|
|
+ def put_object_versioned(self, pool_name, object_name, data, version=None):
|
|
+ """
|
|
+ Synchronously write data to an object only if version of the object
|
|
+ version matches the expected version.
|
|
+
|
|
+ :param pool_name: name of the pool
|
|
+ :type pool_name: str
|
|
+ :param object_name: name of the object
|
|
+ :type object_name: str
|
|
+ :param data: data to write
|
|
+ :type data: bytes
|
|
+ :param version: expected version of the object to write
|
|
+ :type version: int
|
|
+ """
|
|
ioctx = self.rados.open_ioctx(pool_name)
|
|
+
|
|
max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
|
|
if len(data) > max_size:
|
|
msg = ("Data to be written to object '{0}' exceeds "
|
|
"{1} bytes".format(object_name, max_size))
|
|
log.error(msg)
|
|
raise CephFSVolumeClientError(msg)
|
|
+
|
|
try:
|
|
- ioctx.write_full(object_name, data)
|
|
+ with rados.WriteOpCtx(ioctx) as wop:
|
|
+ if version is not None:
|
|
+ wop.assert_version(version)
|
|
+ wop.write_full(data)
|
|
+ ioctx.operate_write_op(wop, object_name)
|
|
+ except rados.OSError as e:
|
|
+ log.error(e)
|
|
+ raise e
|
|
finally:
|
|
ioctx.close()
|
|
|
|
@@ -1426,6 +1452,19 @@ class CephFSVolumeClient(object):
|
|
|
|
:returns: bytes - data read from object
|
|
"""
|
|
+ return self.get_object_and_version(pool_name, object_name)[0]
|
|
+
|
|
+ def get_object_and_version(self, pool_name, object_name):
|
|
+ """
|
|
+ Synchronously read data from object and get its version.
|
|
+
|
|
+ :param pool_name: name of the pool
|
|
+ :type pool_name: str
|
|
+ :param object_name: name of the object
|
|
+ :type object_name: str
|
|
+
|
|
+ :returns: tuple of object data and version
|
|
+ """
|
|
ioctx = self.rados.open_ioctx(pool_name)
|
|
max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
|
|
try:
|
|
@@ -1434,9 +1473,10 @@ class CephFSVolumeClient(object):
|
|
(ioctx.read(object_name, 1, offset=max_size))):
|
|
log.warning("Size of object {0} exceeds '{1}' bytes "
|
|
"read".format(object_name, max_size))
|
|
+ obj_version = ioctx.get_last_version()
|
|
finally:
|
|
ioctx.close()
|
|
- return bytes_read
|
|
+ return (bytes_read, obj_version)
|
|
|
|
def delete_object(self, pool_name, object_name):
|
|
ioctx = self.rados.open_ioctx(pool_name)
|
|
diff --git a/src/pybind/rados/rados.pyx b/src/pybind/rados/rados.pyx
|
|
index e9829937a11..c0df28645b8 100644
|
|
--- a/src/pybind/rados/rados.pyx
|
|
+++ b/src/pybind/rados/rados.pyx
|
|
@@ -284,6 +284,7 @@ cdef extern from "rados/librados.h" nogil:
|
|
void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
|
|
void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
|
|
void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
|
|
+ void rados_write_op_assert_version(rados_write_op_t write_op, uint64_t ver)
|
|
void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
|
|
void rados_write_op_remove(rados_write_op_t write_op)
|
|
void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
|
|
@@ -1941,6 +1942,19 @@ cdef class WriteOp(object):
|
|
with nogil:
|
|
rados_write_op_write(self.write_op, _to_write, length, _offset)
|
|
|
|
+ @requires(('version', int))
|
|
+ def assert_version(self, version):
|
|
+ """
|
|
+ Check if object's version is the expected one.
|
|
+ :param version: expected version of the object
|
|
+ :param type: int
|
|
+ """
|
|
+ cdef:
|
|
+ uint64_t _version = version
|
|
+
|
|
+ with nogil:
|
|
+ rados_write_op_assert_version(self.write_op, _version)
|
|
+
|
|
@requires(('offset', int), ('length', int))
|
|
def zero(self, offset, length):
|
|
"""
|
|
--
|
|
2.23.0
|
|
|