Staging
v0.8.1
v0.8.1
https://github.com/python/cpython
Tip revision: 74f4bd53e03ded8408bcc2de67cf0f5a4ac5b1a1 authored by Barry Warsaw on 23 February 2012, 15:59:38 UTC
Bump some more copyright years (as per PEP 101), since this is the first
Bump some more copyright years (as per PEP 101), since this is the first
Tip revision: 74f4bd5
multibytecodec.c
/*
* multibytecodec.c: Common Multibyte Codec Implementation
*
* Written by Hye-Shik Chang <perky@FreeBSD.org>
*/
#define PY_SSIZE_T_CLEAN
#include "Python.h"
#include "structmember.h"
#include "multibytecodec.h"
typedef struct {
const Py_UNICODE *inbuf, *inbuf_top, *inbuf_end;
unsigned char *outbuf, *outbuf_end;
PyObject *excobj, *outobj;
} MultibyteEncodeBuffer;
typedef struct {
const unsigned char *inbuf, *inbuf_top, *inbuf_end;
Py_UNICODE *outbuf, *outbuf_end;
PyObject *excobj, *outobj;
} MultibyteDecodeBuffer;
PyDoc_STRVAR(MultibyteCodec_Encode__doc__,
"I.encode(unicode[, errors]) -> (string, length consumed)\n\
\n\
Return an encoded string version of `unicode'. errors may be given to\n\
set a different error handling scheme. Default is 'strict' meaning that\n\
encoding errors raise a UnicodeEncodeError. Other possible values are\n\
'ignore', 'replace' and 'xmlcharrefreplace' as well as any other name\n\
registered with codecs.register_error that can handle UnicodeEncodeErrors.");
PyDoc_STRVAR(MultibyteCodec_Decode__doc__,
"I.decode(string[, errors]) -> (unicodeobject, length consumed)\n\
\n\
Decodes `string' using I, an MultibyteCodec instance. errors may be given\n\
to set a different error handling scheme. Default is 'strict' meaning\n\
that encoding errors raise a UnicodeDecodeError. Other possible values\n\
are 'ignore' and 'replace' as well as any other name registered with\n\
codecs.register_error that is able to handle UnicodeDecodeErrors.");
static char *codeckwarglist[] = {"input", "errors", NULL};
static char *incnewkwarglist[] = {"errors", NULL};
static char *incrementalkwarglist[] = {"input", "final", NULL};
static char *streamkwarglist[] = {"stream", "errors", NULL};
static PyObject *multibytecodec_encode(MultibyteCodec *,
MultibyteCodec_State *, const Py_UNICODE **, Py_ssize_t,
PyObject *, int);
#define MBENC_RESET MBENC_MAX<<1 /* reset after an encoding session */
static PyObject *
make_tuple(PyObject *object, Py_ssize_t len)
{
PyObject *v, *w;
if (object == NULL)
return NULL;
v = PyTuple_New(2);
if (v == NULL) {
Py_DECREF(object);
return NULL;
}
PyTuple_SET_ITEM(v, 0, object);
w = PyInt_FromSsize_t(len);
if (w == NULL) {
Py_DECREF(v);
return NULL;
}
PyTuple_SET_ITEM(v, 1, w);
return v;
}
static PyObject *
internal_error_callback(const char *errors)
{
if (errors == NULL || strcmp(errors, "strict") == 0)
return ERROR_STRICT;
else if (strcmp(errors, "ignore") == 0)
return ERROR_IGNORE;
else if (strcmp(errors, "replace") == 0)
return ERROR_REPLACE;
else
return PyString_FromString(errors);
}
static PyObject *
call_error_callback(PyObject *errors, PyObject *exc)
{
PyObject *args, *cb, *r;
assert(PyString_Check(errors));
cb = PyCodec_LookupError(PyString_AS_STRING(errors));
if (cb == NULL)
return NULL;
args = PyTuple_New(1);
if (args == NULL) {
Py_DECREF(cb);
return NULL;
}
PyTuple_SET_ITEM(args, 0, exc);
Py_INCREF(exc);
r = PyObject_CallObject(cb, args);
Py_DECREF(args);
Py_DECREF(cb);
return r;
}
static PyObject *
codecctx_errors_get(MultibyteStatefulCodecContext *self)
{
const char *errors;
if (self->errors == ERROR_STRICT)
errors = "strict";
else if (self->errors == ERROR_IGNORE)
errors = "ignore";
else if (self->errors == ERROR_REPLACE)
errors = "replace";
else {
Py_INCREF(self->errors);
return self->errors;
}
return PyString_FromString(errors);
}
static int
codecctx_errors_set(MultibyteStatefulCodecContext *self, PyObject *value,
void *closure)
{
PyObject *cb;
if (!PyString_Check(value)) {
PyErr_SetString(PyExc_TypeError, "errors must be a string");
return -1;
}
cb = internal_error_callback(PyString_AS_STRING(value));
if (cb == NULL)
return -1;
ERROR_DECREF(self->errors);
self->errors = cb;
return 0;
}
/* This getset handlers list is used by all the stateful codec objects */
static PyGetSetDef codecctx_getsets[] = {
{"errors", (getter)codecctx_errors_get,
(setter)codecctx_errors_set,
PyDoc_STR("how to treat errors")},
{NULL,}
};
static int
expand_encodebuffer(MultibyteEncodeBuffer *buf, Py_ssize_t esize)
{
Py_ssize_t orgpos, orgsize, incsize;
orgpos = (Py_ssize_t)((char *)buf->outbuf -
PyString_AS_STRING(buf->outobj));
orgsize = PyString_GET_SIZE(buf->outobj);
incsize = (esize < (orgsize >> 1) ? (orgsize >> 1) | 1 : esize);
if (orgsize > PY_SSIZE_T_MAX - incsize)
return -1;
if (_PyString_Resize(&buf->outobj, orgsize + incsize) == -1)
return -1;
buf->outbuf = (unsigned char *)PyString_AS_STRING(buf->outobj) +orgpos;
buf->outbuf_end = (unsigned char *)PyString_AS_STRING(buf->outobj)
+ PyString_GET_SIZE(buf->outobj);
return 0;
}
#define REQUIRE_ENCODEBUFFER(buf, s) { \
if ((s) < 1 || (buf)->outbuf + (s) > (buf)->outbuf_end) \
if (expand_encodebuffer(buf, s) == -1) \
goto errorexit; \
}
static int
expand_decodebuffer(MultibyteDecodeBuffer *buf, Py_ssize_t esize)
{
Py_ssize_t orgpos, orgsize;
orgpos = (Py_ssize_t)(buf->outbuf - PyUnicode_AS_UNICODE(buf->outobj));
orgsize = PyUnicode_GET_SIZE(buf->outobj);
if (PyUnicode_Resize(&buf->outobj, orgsize + (
esize < (orgsize >> 1) ? (orgsize >> 1) | 1 : esize)) == -1)
return -1;
buf->outbuf = PyUnicode_AS_UNICODE(buf->outobj) + orgpos;
buf->outbuf_end = PyUnicode_AS_UNICODE(buf->outobj)
+ PyUnicode_GET_SIZE(buf->outobj);
return 0;
}
#define REQUIRE_DECODEBUFFER(buf, s) { \
if ((s) < 1 || (buf)->outbuf + (s) > (buf)->outbuf_end) \
if (expand_decodebuffer(buf, s) == -1) \
goto errorexit; \
}
/**
* MultibyteCodec object
*/
static int
multibytecodec_encerror(MultibyteCodec *codec,
MultibyteCodec_State *state,
MultibyteEncodeBuffer *buf,
PyObject *errors, Py_ssize_t e)
{
PyObject *retobj = NULL, *retstr = NULL, *tobj;
Py_ssize_t retstrsize, newpos;
Py_ssize_t esize, start, end;
const char *reason;
if (e > 0) {
reason = "illegal multibyte sequence";
esize = e;
}
else {
switch (e) {
case MBERR_TOOSMALL:
REQUIRE_ENCODEBUFFER(buf, -1);
return 0; /* retry it */
case MBERR_TOOFEW:
reason = "incomplete multibyte sequence";
esize = (Py_ssize_t)(buf->inbuf_end - buf->inbuf);
break;
case MBERR_INTERNAL:
PyErr_SetString(PyExc_RuntimeError,
"internal codec error");
return -1;
default:
PyErr_SetString(PyExc_RuntimeError,
"unknown runtime error");
return -1;
}
}
if (errors == ERROR_REPLACE) {
const Py_UNICODE replchar = '?', *inbuf = &replchar;
Py_ssize_t r;
for (;;) {
Py_ssize_t outleft;
outleft = (Py_ssize_t)(buf->outbuf_end - buf->outbuf);
r = codec->encode(state, codec->config, &inbuf, 1,
&buf->outbuf, outleft, 0);
if (r == MBERR_TOOSMALL) {
REQUIRE_ENCODEBUFFER(buf, -1);
continue;
}
else
break;
}
if (r != 0) {
REQUIRE_ENCODEBUFFER(buf, 1);
*buf->outbuf++ = '?';
}
}
if (errors == ERROR_IGNORE || errors == ERROR_REPLACE) {
buf->inbuf += esize;
return 0;
}
start = (Py_ssize_t)(buf->inbuf - buf->inbuf_top);
end = start + esize;
/* use cached exception object if available */
if (buf->excobj == NULL) {
buf->excobj = PyUnicodeEncodeError_Create(codec->encoding,
buf->inbuf_top,
buf->inbuf_end - buf->inbuf_top,
start, end, reason);
if (buf->excobj == NULL)
goto errorexit;
}
else
if (PyUnicodeEncodeError_SetStart(buf->excobj, start) != 0 ||
PyUnicodeEncodeError_SetEnd(buf->excobj, end) != 0 ||
PyUnicodeEncodeError_SetReason(buf->excobj, reason) != 0)
goto errorexit;
if (errors == ERROR_STRICT) {
PyCodec_StrictErrors(buf->excobj);
goto errorexit;
}
retobj = call_error_callback(errors, buf->excobj);
if (retobj == NULL)
goto errorexit;
if (!PyTuple_Check(retobj) || PyTuple_GET_SIZE(retobj) != 2 ||
!PyUnicode_Check((tobj = PyTuple_GET_ITEM(retobj, 0))) ||
!(PyInt_Check(PyTuple_GET_ITEM(retobj, 1)) ||
PyLong_Check(PyTuple_GET_ITEM(retobj, 1)))) {
PyErr_SetString(PyExc_TypeError,
"encoding error handler must return "
"(unicode, int) tuple");
goto errorexit;
}
{
const Py_UNICODE *uraw = PyUnicode_AS_UNICODE(tobj);
retstr = multibytecodec_encode(codec, state, &uraw,
PyUnicode_GET_SIZE(tobj), ERROR_STRICT,
MBENC_FLUSH);
if (retstr == NULL)
goto errorexit;
}
retstrsize = PyString_GET_SIZE(retstr);
REQUIRE_ENCODEBUFFER(buf, retstrsize);
memcpy(buf->outbuf, PyString_AS_STRING(retstr), retstrsize);
buf->outbuf += retstrsize;
newpos = PyInt_AsSsize_t(PyTuple_GET_ITEM(retobj, 1));
if (newpos < 0 && !PyErr_Occurred())
newpos += (Py_ssize_t)(buf->inbuf_end - buf->inbuf_top);
if (newpos < 0 || buf->inbuf_top + newpos > buf->inbuf_end) {
PyErr_Clear();
PyErr_Format(PyExc_IndexError,
"position %zd from error handler out of bounds",
newpos);
goto errorexit;
}
buf->inbuf = buf->inbuf_top + newpos;
Py_DECREF(retobj);
Py_DECREF(retstr);
return 0;
errorexit:
Py_XDECREF(retobj);
Py_XDECREF(retstr);
return -1;
}
static int
multibytecodec_decerror(MultibyteCodec *codec,
MultibyteCodec_State *state,
MultibyteDecodeBuffer *buf,
PyObject *errors, Py_ssize_t e)
{
PyObject *retobj = NULL, *retuni = NULL;
Py_ssize_t retunisize, newpos;
const char *reason;
Py_ssize_t esize, start, end;
if (e > 0) {
reason = "illegal multibyte sequence";
esize = e;
}
else {
switch (e) {
case MBERR_TOOSMALL:
REQUIRE_DECODEBUFFER(buf, -1);
return 0; /* retry it */
case MBERR_TOOFEW:
reason = "incomplete multibyte sequence";
esize = (Py_ssize_t)(buf->inbuf_end - buf->inbuf);
break;
case MBERR_INTERNAL:
PyErr_SetString(PyExc_RuntimeError,
"internal codec error");
return -1;
default:
PyErr_SetString(PyExc_RuntimeError,
"unknown runtime error");
return -1;
}
}
if (errors == ERROR_REPLACE) {
REQUIRE_DECODEBUFFER(buf, 1);
*buf->outbuf++ = Py_UNICODE_REPLACEMENT_CHARACTER;
}
if (errors == ERROR_IGNORE || errors == ERROR_REPLACE) {
buf->inbuf += esize;
return 0;
}
start = (Py_ssize_t)(buf->inbuf - buf->inbuf_top);
end = start + esize;
/* use cached exception object if available */
if (buf->excobj == NULL) {
buf->excobj = PyUnicodeDecodeError_Create(codec->encoding,
(const char *)buf->inbuf_top,
(Py_ssize_t)(buf->inbuf_end - buf->inbuf_top),
start, end, reason);
if (buf->excobj == NULL)
goto errorexit;
}
else
if (PyUnicodeDecodeError_SetStart(buf->excobj, start) ||
PyUnicodeDecodeError_SetEnd(buf->excobj, end) ||
PyUnicodeDecodeError_SetReason(buf->excobj, reason))
goto errorexit;
if (errors == ERROR_STRICT) {
PyCodec_StrictErrors(buf->excobj);
goto errorexit;
}
retobj = call_error_callback(errors, buf->excobj);
if (retobj == NULL)
goto errorexit;
if (!PyTuple_Check(retobj) || PyTuple_GET_SIZE(retobj) != 2 ||
!PyUnicode_Check((retuni = PyTuple_GET_ITEM(retobj, 0))) ||
!(PyInt_Check(PyTuple_GET_ITEM(retobj, 1)) ||
PyLong_Check(PyTuple_GET_ITEM(retobj, 1)))) {
PyErr_SetString(PyExc_TypeError,
"decoding error handler must return "
"(unicode, int) tuple");
goto errorexit;
}
retunisize = PyUnicode_GET_SIZE(retuni);
if (retunisize > 0) {
REQUIRE_DECODEBUFFER(buf, retunisize);
memcpy((char *)buf->outbuf, PyUnicode_AS_DATA(retuni),
retunisize * Py_UNICODE_SIZE);
buf->outbuf += retunisize;
}
newpos = PyInt_AsSsize_t(PyTuple_GET_ITEM(retobj, 1));
if (newpos < 0 && !PyErr_Occurred())
newpos += (Py_ssize_t)(buf->inbuf_end - buf->inbuf_top);
if (newpos < 0 || buf->inbuf_top + newpos > buf->inbuf_end) {
PyErr_Clear();
PyErr_Format(PyExc_IndexError,
"position %zd from error handler out of bounds",
newpos);
goto errorexit;
}
buf->inbuf = buf->inbuf_top + newpos;
Py_DECREF(retobj);
return 0;
errorexit:
Py_XDECREF(retobj);
return -1;
}
static PyObject *
multibytecodec_encode(MultibyteCodec *codec,
MultibyteCodec_State *state,
const Py_UNICODE **data, Py_ssize_t datalen,
PyObject *errors, int flags)
{
MultibyteEncodeBuffer buf;
Py_ssize_t finalsize, r = 0;
if (datalen == 0)
return PyString_FromString("");
buf.excobj = NULL;
buf.inbuf = buf.inbuf_top = *data;
buf.inbuf_end = buf.inbuf_top + datalen;
if (datalen > (PY_SSIZE_T_MAX - 16) / 2) {
PyErr_NoMemory();
goto errorexit;
}
buf.outobj = PyString_FromStringAndSize(NULL, datalen * 2 + 16);
if (buf.outobj == NULL)
goto errorexit;
buf.outbuf = (unsigned char *)PyString_AS_STRING(buf.outobj);
buf.outbuf_end = buf.outbuf + PyString_GET_SIZE(buf.outobj);
while (buf.inbuf < buf.inbuf_end) {
Py_ssize_t inleft, outleft;
/* we don't reuse inleft and outleft here.
* error callbacks can relocate the cursor anywhere on buffer*/
inleft = (Py_ssize_t)(buf.inbuf_end - buf.inbuf);
outleft = (Py_ssize_t)(buf.outbuf_end - buf.outbuf);
r = codec->encode(state, codec->config, &buf.inbuf, inleft,
&buf.outbuf, outleft, flags);
if ((r == 0) || (r == MBERR_TOOFEW && !(flags & MBENC_FLUSH)))
break;
else if (multibytecodec_encerror(codec, state, &buf, errors,r))
goto errorexit;
else if (r == MBERR_TOOFEW)
break;
}
if (codec->encreset != NULL)
for (;;) {
Py_ssize_t outleft;
outleft = (Py_ssize_t)(buf.outbuf_end - buf.outbuf);
r = codec->encreset(state, codec->config, &buf.outbuf,
outleft);
if (r == 0)
break;
else if (multibytecodec_encerror(codec, state,
&buf, errors, r))
goto errorexit;
}
finalsize = (Py_ssize_t)((char *)buf.outbuf -
PyString_AS_STRING(buf.outobj));
if (finalsize != PyString_GET_SIZE(buf.outobj))
if (_PyString_Resize(&buf.outobj, finalsize) == -1)
goto errorexit;
*data = buf.inbuf;
Py_XDECREF(buf.excobj);
return buf.outobj;
errorexit:
Py_XDECREF(buf.excobj);
Py_XDECREF(buf.outobj);
return NULL;
}
static PyObject *
MultibyteCodec_Encode(MultibyteCodecObject *self,
PyObject *args, PyObject *kwargs)
{
MultibyteCodec_State state;
Py_UNICODE *data;
PyObject *errorcb, *r, *arg, *ucvt;
const char *errors = NULL;
Py_ssize_t datalen;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|z:encode",
codeckwarglist, &arg, &errors))
return NULL;
if (PyUnicode_Check(arg))
ucvt = NULL;
else {
arg = ucvt = PyObject_Unicode(arg);
if (arg == NULL)
return NULL;
else if (!PyUnicode_Check(arg)) {
PyErr_SetString(PyExc_TypeError,
"couldn't convert the object to unicode.");
Py_DECREF(ucvt);
return NULL;
}
}
data = PyUnicode_AS_UNICODE(arg);
datalen = PyUnicode_GET_SIZE(arg);
errorcb = internal_error_callback(errors);
if (errorcb == NULL) {
Py_XDECREF(ucvt);
return NULL;
}
if (self->codec->encinit != NULL &&
self->codec->encinit(&state, self->codec->config) != 0)
goto errorexit;
r = multibytecodec_encode(self->codec, &state,
(const Py_UNICODE **)&data, datalen, errorcb,
MBENC_FLUSH | MBENC_RESET);
if (r == NULL)
goto errorexit;
ERROR_DECREF(errorcb);
Py_XDECREF(ucvt);
return make_tuple(r, datalen);
errorexit:
ERROR_DECREF(errorcb);
Py_XDECREF(ucvt);
return NULL;
}
static PyObject *
MultibyteCodec_Decode(MultibyteCodecObject *self,
PyObject *args, PyObject *kwargs)
{
MultibyteCodec_State state;
MultibyteDecodeBuffer buf;
PyObject *errorcb;
Py_buffer pdata;
const char *data, *errors = NULL;
Py_ssize_t datalen, finalsize;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s*|z:decode",
codeckwarglist, &pdata, &errors))
return NULL;
data = pdata.buf;
datalen = pdata.len;
errorcb = internal_error_callback(errors);
if (errorcb == NULL) {
PyBuffer_Release(&pdata);
return NULL;
}
if (datalen == 0) {
PyBuffer_Release(&pdata);
ERROR_DECREF(errorcb);
return make_tuple(PyUnicode_FromUnicode(NULL, 0), 0);
}
buf.excobj = NULL;
buf.inbuf = buf.inbuf_top = (unsigned char *)data;
buf.inbuf_end = buf.inbuf_top + datalen;
buf.outobj = PyUnicode_FromUnicode(NULL, datalen);
if (buf.outobj == NULL)
goto errorexit;
buf.outbuf = PyUnicode_AS_UNICODE(buf.outobj);
buf.outbuf_end = buf.outbuf + PyUnicode_GET_SIZE(buf.outobj);
if (self->codec->decinit != NULL &&
self->codec->decinit(&state, self->codec->config) != 0)
goto errorexit;
while (buf.inbuf < buf.inbuf_end) {
Py_ssize_t inleft, outleft, r;
inleft = (Py_ssize_t)(buf.inbuf_end - buf.inbuf);
outleft = (Py_ssize_t)(buf.outbuf_end - buf.outbuf);
r = self->codec->decode(&state, self->codec->config,
&buf.inbuf, inleft, &buf.outbuf, outleft);
if (r == 0)
break;
else if (multibytecodec_decerror(self->codec, &state,
&buf, errorcb, r))
goto errorexit;
}
finalsize = (Py_ssize_t)(buf.outbuf -
PyUnicode_AS_UNICODE(buf.outobj));
if (finalsize != PyUnicode_GET_SIZE(buf.outobj))
if (PyUnicode_Resize(&buf.outobj, finalsize) == -1)
goto errorexit;
PyBuffer_Release(&pdata);
Py_XDECREF(buf.excobj);
ERROR_DECREF(errorcb);
return make_tuple(buf.outobj, datalen);
errorexit:
PyBuffer_Release(&pdata);
ERROR_DECREF(errorcb);
Py_XDECREF(buf.excobj);
Py_XDECREF(buf.outobj);
return NULL;
}
static struct PyMethodDef multibytecodec_methods[] = {
{"encode", (PyCFunction)MultibyteCodec_Encode,
METH_VARARGS | METH_KEYWORDS,
MultibyteCodec_Encode__doc__},
{"decode", (PyCFunction)MultibyteCodec_Decode,
METH_VARARGS | METH_KEYWORDS,
MultibyteCodec_Decode__doc__},
{NULL, NULL},
};
static void
multibytecodec_dealloc(MultibyteCodecObject *self)
{
PyObject_Del(self);
}
static PyTypeObject MultibyteCodec_Type = {
PyVarObject_HEAD_INIT(NULL, 0)
"MultibyteCodec", /* tp_name */
sizeof(MultibyteCodecObject), /* tp_basicsize */
0, /* tp_itemsize */
/* methods */
(destructor)multibytecodec_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
PyObject_GenericGetAttr, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT, /* tp_flags */
0, /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iterext */
multibytecodec_methods, /* tp_methods */
};
/**
* Utility functions for stateful codec mechanism
*/
#define STATEFUL_DCTX(o) ((MultibyteStatefulDecoderContext *)(o))
#define STATEFUL_ECTX(o) ((MultibyteStatefulEncoderContext *)(o))
static PyObject *
encoder_encode_stateful(MultibyteStatefulEncoderContext *ctx,
PyObject *unistr, int final)
{
PyObject *ucvt, *r = NULL;
Py_UNICODE *inbuf, *inbuf_end, *inbuf_tmp = NULL;
Py_ssize_t datalen, origpending;
if (PyUnicode_Check(unistr))
ucvt = NULL;
else {
unistr = ucvt = PyObject_Unicode(unistr);
if (unistr == NULL)
return NULL;
else if (!PyUnicode_Check(unistr)) {
PyErr_SetString(PyExc_TypeError,
"couldn't convert the object to unicode.");
Py_DECREF(ucvt);
return NULL;
}
}
datalen = PyUnicode_GET_SIZE(unistr);
origpending = ctx->pendingsize;
if (origpending > 0) {
if (datalen > PY_SSIZE_T_MAX - ctx->pendingsize) {
PyErr_NoMemory();
/* inbuf_tmp == NULL */
goto errorexit;
}
inbuf_tmp = PyMem_New(Py_UNICODE, datalen + ctx->pendingsize);
if (inbuf_tmp == NULL)
goto errorexit;
memcpy(inbuf_tmp, ctx->pending,
Py_UNICODE_SIZE * ctx->pendingsize);
memcpy(inbuf_tmp + ctx->pendingsize,
PyUnicode_AS_UNICODE(unistr),
Py_UNICODE_SIZE * datalen);
datalen += ctx->pendingsize;
ctx->pendingsize = 0;
inbuf = inbuf_tmp;
}
else
inbuf = (Py_UNICODE *)PyUnicode_AS_UNICODE(unistr);
inbuf_end = inbuf + datalen;
r = multibytecodec_encode(ctx->codec, &ctx->state,
(const Py_UNICODE **)&inbuf,
datalen, ctx->errors, final ? MBENC_FLUSH : 0);
if (r == NULL) {
/* recover the original pending buffer */
if (origpending > 0)
memcpy(ctx->pending, inbuf_tmp,
Py_UNICODE_SIZE * origpending);
ctx->pendingsize = origpending;
goto errorexit;
}
if (inbuf < inbuf_end) {
ctx->pendingsize = (Py_ssize_t)(inbuf_end - inbuf);
if (ctx->pendingsize > MAXENCPENDING) {
/* normal codecs can't reach here */
ctx->pendingsize = 0;
PyErr_SetString(PyExc_UnicodeError,
"pending buffer overflow");
goto errorexit;
}
memcpy(ctx->pending, inbuf,
ctx->pendingsize * Py_UNICODE_SIZE);
}
if (inbuf_tmp != NULL)
PyMem_Del(inbuf_tmp);
Py_XDECREF(ucvt);
return r;
errorexit:
if (inbuf_tmp != NULL)
PyMem_Del(inbuf_tmp);
Py_XDECREF(r);
Py_XDECREF(ucvt);
return NULL;
}
static int
decoder_append_pending(MultibyteStatefulDecoderContext *ctx,
MultibyteDecodeBuffer *buf)
{
Py_ssize_t npendings;
npendings = (Py_ssize_t)(buf->inbuf_end - buf->inbuf);
if (npendings + ctx->pendingsize > MAXDECPENDING ||
npendings > PY_SSIZE_T_MAX - ctx->pendingsize) {
PyErr_SetString(PyExc_UnicodeError, "pending buffer overflow");
return -1;
}
memcpy(ctx->pending + ctx->pendingsize, buf->inbuf, npendings);
ctx->pendingsize += npendings;
return 0;
}
static int
decoder_prepare_buffer(MultibyteDecodeBuffer *buf, const char *data,
Py_ssize_t size)
{
buf->inbuf = buf->inbuf_top = (const unsigned char *)data;
buf->inbuf_end = buf->inbuf_top + size;
if (buf->outobj == NULL) { /* only if outobj is not allocated yet */
buf->outobj = PyUnicode_FromUnicode(NULL, size);
if (buf->outobj == NULL)
return -1;
buf->outbuf = PyUnicode_AS_UNICODE(buf->outobj);
buf->outbuf_end = buf->outbuf +
PyUnicode_GET_SIZE(buf->outobj);
}
return 0;
}
static int
decoder_feed_buffer(MultibyteStatefulDecoderContext *ctx,
MultibyteDecodeBuffer *buf)
{
while (buf->inbuf < buf->inbuf_end) {
Py_ssize_t inleft, outleft;
Py_ssize_t r;
inleft = (Py_ssize_t)(buf->inbuf_end - buf->inbuf);
outleft = (Py_ssize_t)(buf->outbuf_end - buf->outbuf);
r = ctx->codec->decode(&ctx->state, ctx->codec->config,
&buf->inbuf, inleft, &buf->outbuf, outleft);
if (r == 0 || r == MBERR_TOOFEW)
break;
else if (multibytecodec_decerror(ctx->codec, &ctx->state,
buf, ctx->errors, r))
return -1;
}
return 0;
}
/**
* MultibyteIncrementalEncoder object
*/
static PyObject *
mbiencoder_encode(MultibyteIncrementalEncoderObject *self,
PyObject *args, PyObject *kwargs)
{
PyObject *data;
int final = 0;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i:encode",
incrementalkwarglist, &data, &final))
return NULL;
return encoder_encode_stateful(STATEFUL_ECTX(self), data, final);
}
static PyObject *
mbiencoder_reset(MultibyteIncrementalEncoderObject *self)
{
if (self->codec->decreset != NULL &&
self->codec->decreset(&self->state, self->codec->config) != 0)
return NULL;
self->pendingsize = 0;
Py_RETURN_NONE;
}
static struct PyMethodDef mbiencoder_methods[] = {
{"encode", (PyCFunction)mbiencoder_encode,
METH_VARARGS | METH_KEYWORDS, NULL},
{"reset", (PyCFunction)mbiencoder_reset,
METH_NOARGS, NULL},
{NULL, NULL},
};
static PyObject *
mbiencoder_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
MultibyteIncrementalEncoderObject *self;
PyObject *codec = NULL;
char *errors = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|s:IncrementalEncoder",
incnewkwarglist, &errors))
return NULL;
self = (MultibyteIncrementalEncoderObject *)type->tp_alloc(type, 0);
if (self == NULL)
return NULL;
codec = PyObject_GetAttrString((PyObject *)type, "codec");
if (codec == NULL)
goto errorexit;
if (!MultibyteCodec_Check(codec)) {
PyErr_SetString(PyExc_TypeError, "codec is unexpected type");
goto errorexit;
}
self->codec = ((MultibyteCodecObject *)codec)->codec;
self->pendingsize = 0;
self->errors = internal_error_callback(errors);
if (self->errors == NULL)
goto errorexit;
if (self->codec->encinit != NULL &&
self->codec->encinit(&self->state, self->codec->config) != 0)
goto errorexit;
Py_DECREF(codec);
return (PyObject *)self;
errorexit:
Py_XDECREF(self);
Py_XDECREF(codec);
return NULL;
}
static int
mbiencoder_init(PyObject *self, PyObject *args, PyObject *kwds)
{
return 0;
}
static int
mbiencoder_traverse(MultibyteIncrementalEncoderObject *self,
visitproc visit, void *arg)
{
if (ERROR_ISCUSTOM(self->errors))
Py_VISIT(self->errors);
return 0;
}
static void
mbiencoder_dealloc(MultibyteIncrementalEncoderObject *self)
{
PyObject_GC_UnTrack(self);
ERROR_DECREF(self->errors);
Py_TYPE(self)->tp_free(self);
}
static PyTypeObject MultibyteIncrementalEncoder_Type = {
PyVarObject_HEAD_INIT(NULL, 0)
"MultibyteIncrementalEncoder", /* tp_name */
sizeof(MultibyteIncrementalEncoderObject), /* tp_basicsize */
0, /* tp_itemsize */
/* methods */
(destructor)mbiencoder_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
PyObject_GenericGetAttr, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC
| Py_TPFLAGS_BASETYPE, /* tp_flags */
0, /* tp_doc */
(traverseproc)mbiencoder_traverse, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iterext */
mbiencoder_methods, /* tp_methods */
0, /* tp_members */
codecctx_getsets, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
mbiencoder_init, /* tp_init */
0, /* tp_alloc */
mbiencoder_new, /* tp_new */
};
/**
* MultibyteIncrementalDecoder object
*/
static PyObject *
mbidecoder_decode(MultibyteIncrementalDecoderObject *self,
PyObject *args, PyObject *kwargs)
{
MultibyteDecodeBuffer buf;
char *data, *wdata = NULL;
Py_buffer pdata;
Py_ssize_t wsize, finalsize = 0, size, origpending;
int final = 0;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s*|i:decode",
incrementalkwarglist, &pdata, &final))
return NULL;
data = pdata.buf;
size = pdata.len;
buf.outobj = buf.excobj = NULL;
origpending = self->pendingsize;
if (self->pendingsize == 0) {
wsize = size;
wdata = data;
}
else {
if (size > PY_SSIZE_T_MAX - self->pendingsize) {
PyErr_NoMemory();
goto errorexit;
}
wsize = size + self->pendingsize;
wdata = PyMem_Malloc(wsize);
if (wdata == NULL)
goto errorexit;
memcpy(wdata, self->pending, self->pendingsize);
memcpy(wdata + self->pendingsize, data, size);
self->pendingsize = 0;
}
if (decoder_prepare_buffer(&buf, wdata, wsize) != 0)
goto errorexit;
if (decoder_feed_buffer(STATEFUL_DCTX(self), &buf))
goto errorexit;
if (final && buf.inbuf < buf.inbuf_end) {
if (multibytecodec_decerror(self->codec, &self->state,
&buf, self->errors, MBERR_TOOFEW)) {
/* recover the original pending buffer */
memcpy(self->pending, wdata, origpending);
self->pendingsize = origpending;
goto errorexit;
}
}
if (buf.inbuf < buf.inbuf_end) { /* pending sequence still exists */
if (decoder_append_pending(STATEFUL_DCTX(self), &buf) != 0)
goto errorexit;
}
finalsize = (Py_ssize_t)(buf.outbuf - PyUnicode_AS_UNICODE(buf.outobj));
if (finalsize != PyUnicode_GET_SIZE(buf.outobj))
if (PyUnicode_Resize(&buf.outobj, finalsize) == -1)
goto errorexit;
PyBuffer_Release(&pdata);
if (wdata != data)
PyMem_Del(wdata);
Py_XDECREF(buf.excobj);
return buf.outobj;
errorexit:
PyBuffer_Release(&pdata);
if (wdata != NULL && wdata != data)
PyMem_Del(wdata);
Py_XDECREF(buf.excobj);
Py_XDECREF(buf.outobj);
return NULL;
}
static PyObject *
mbidecoder_reset(MultibyteIncrementalDecoderObject *self)
{
if (self->codec->decreset != NULL &&
self->codec->decreset(&self->state, self->codec->config) != 0)
return NULL;
self->pendingsize = 0;
Py_RETURN_NONE;
}
static struct PyMethodDef mbidecoder_methods[] = {
{"decode", (PyCFunction)mbidecoder_decode,
METH_VARARGS | METH_KEYWORDS, NULL},
{"reset", (PyCFunction)mbidecoder_reset,
METH_NOARGS, NULL},
{NULL, NULL},
};
static PyObject *
mbidecoder_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
MultibyteIncrementalDecoderObject *self;
PyObject *codec = NULL;
char *errors = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|s:IncrementalDecoder",
incnewkwarglist, &errors))
return NULL;
self = (MultibyteIncrementalDecoderObject *)type->tp_alloc(type, 0);
if (self == NULL)
return NULL;
codec = PyObject_GetAttrString((PyObject *)type, "codec");
if (codec == NULL)
goto errorexit;
if (!MultibyteCodec_Check(codec)) {
PyErr_SetString(PyExc_TypeError, "codec is unexpected type");
goto errorexit;
}
self->codec = ((MultibyteCodecObject *)codec)->codec;
self->pendingsize = 0;
self->errors = internal_error_callback(errors);
if (self->errors == NULL)
goto errorexit;
if (self->codec->decinit != NULL &&
self->codec->decinit(&self->state, self->codec->config) != 0)
goto errorexit;
Py_DECREF(codec);
return (PyObject *)self;
errorexit:
Py_XDECREF(self);
Py_XDECREF(codec);
return NULL;
}
static int
mbidecoder_init(PyObject *self, PyObject *args, PyObject *kwds)
{
return 0;
}
static int
mbidecoder_traverse(MultibyteIncrementalDecoderObject *self,
visitproc visit, void *arg)
{
if (ERROR_ISCUSTOM(self->errors))
Py_VISIT(self->errors);
return 0;
}
static void
mbidecoder_dealloc(MultibyteIncrementalDecoderObject *self)
{
PyObject_GC_UnTrack(self);
ERROR_DECREF(self->errors);
Py_TYPE(self)->tp_free(self);
}
static PyTypeObject MultibyteIncrementalDecoder_Type = {
PyVarObject_HEAD_INIT(NULL, 0)
"MultibyteIncrementalDecoder", /* tp_name */
sizeof(MultibyteIncrementalDecoderObject), /* tp_basicsize */
0, /* tp_itemsize */
/* methods */
(destructor)mbidecoder_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
PyObject_GenericGetAttr, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC
| Py_TPFLAGS_BASETYPE, /* tp_flags */
0, /* tp_doc */
(traverseproc)mbidecoder_traverse, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iterext */
mbidecoder_methods, /* tp_methods */
0, /* tp_members */
codecctx_getsets, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
mbidecoder_init, /* tp_init */
0, /* tp_alloc */
mbidecoder_new, /* tp_new */
};
/**
* MultibyteStreamReader object
*/
static PyObject *
mbstreamreader_iread(MultibyteStreamReaderObject *self,
const char *method, Py_ssize_t sizehint)
{
MultibyteDecodeBuffer buf;
PyObject *cres;
Py_ssize_t rsize, finalsize = 0;
if (sizehint == 0)
return PyUnicode_FromUnicode(NULL, 0);
buf.outobj = buf.excobj = NULL;
cres = NULL;
for (;;) {
int endoffile;
if (sizehint < 0)
cres = PyObject_CallMethod(self->stream,
(char *)method, NULL);
else
cres = PyObject_CallMethod(self->stream,
(char *)method, "i", sizehint);
if (cres == NULL)
goto errorexit;
if (!PyString_Check(cres)) {
PyErr_SetString(PyExc_TypeError,
"stream function returned a "
"non-string object");
goto errorexit;
}
endoffile = (PyString_GET_SIZE(cres) == 0);
if (self->pendingsize > 0) {
PyObject *ctr;
char *ctrdata;
if (PyString_GET_SIZE(cres) > PY_SSIZE_T_MAX - self->pendingsize) {
PyErr_NoMemory();
goto errorexit;
}
rsize = PyString_GET_SIZE(cres) + self->pendingsize;
ctr = PyString_FromStringAndSize(NULL, rsize);
if (ctr == NULL)
goto errorexit;
ctrdata = PyString_AS_STRING(ctr);
memcpy(ctrdata, self->pending, self->pendingsize);
memcpy(ctrdata + self->pendingsize,
PyString_AS_STRING(cres),
PyString_GET_SIZE(cres));
Py_DECREF(cres);
cres = ctr;
self->pendingsize = 0;
}
rsize = PyString_GET_SIZE(cres);
if (decoder_prepare_buffer(&buf, PyString_AS_STRING(cres),
rsize) != 0)
goto errorexit;
if (rsize > 0 && decoder_feed_buffer(
(MultibyteStatefulDecoderContext *)self, &buf))
goto errorexit;
if (endoffile || sizehint < 0) {
if (buf.inbuf < buf.inbuf_end &&
multibytecodec_decerror(self->codec, &self->state,
&buf, self->errors, MBERR_TOOFEW))
goto errorexit;
}
if (buf.inbuf < buf.inbuf_end) { /* pending sequence exists */
if (decoder_append_pending(STATEFUL_DCTX(self),
&buf) != 0)
goto errorexit;
}
finalsize = (Py_ssize_t)(buf.outbuf -
PyUnicode_AS_UNICODE(buf.outobj));
Py_DECREF(cres);
cres = NULL;
if (sizehint < 0 || finalsize != 0 || rsize == 0)
break;
sizehint = 1; /* read 1 more byte and retry */
}
if (finalsize != PyUnicode_GET_SIZE(buf.outobj))
if (PyUnicode_Resize(&buf.outobj, finalsize) == -1)
goto errorexit;
Py_XDECREF(cres);
Py_XDECREF(buf.excobj);
return buf.outobj;
errorexit:
Py_XDECREF(cres);
Py_XDECREF(buf.excobj);
Py_XDECREF(buf.outobj);
return NULL;
}
static PyObject *
mbstreamreader_read(MultibyteStreamReaderObject *self, PyObject *args)
{
PyObject *sizeobj = NULL;
Py_ssize_t size;
if (!PyArg_UnpackTuple(args, "read", 0, 1, &sizeobj))
return NULL;
if (sizeobj == Py_None || sizeobj == NULL)
size = -1;
else if (PyInt_Check(sizeobj))
size = PyInt_AsSsize_t(sizeobj);
else {
PyErr_SetString(PyExc_TypeError, "arg 1 must be an integer");
return NULL;
}
return mbstreamreader_iread(self, "read", size);
}
static PyObject *
mbstreamreader_readline(MultibyteStreamReaderObject *self, PyObject *args)
{
PyObject *sizeobj = NULL;
Py_ssize_t size;
if (!PyArg_UnpackTuple(args, "readline", 0, 1, &sizeobj))
return NULL;
if (sizeobj == Py_None || sizeobj == NULL)
size = -1;
else if (PyInt_Check(sizeobj))
size = PyInt_AsSsize_t(sizeobj);
else {
PyErr_SetString(PyExc_TypeError, "arg 1 must be an integer");
return NULL;
}
return mbstreamreader_iread(self, "readline", size);
}
static PyObject *
mbstreamreader_readlines(MultibyteStreamReaderObject *self, PyObject *args)
{
PyObject *sizehintobj = NULL, *r, *sr;
Py_ssize_t sizehint;
if (!PyArg_UnpackTuple(args, "readlines", 0, 1, &sizehintobj))
return NULL;
if (sizehintobj == Py_None || sizehintobj == NULL)
sizehint = -1;
else if (PyInt_Check(sizehintobj))
sizehint = PyInt_AsSsize_t(sizehintobj);
else {
PyErr_SetString(PyExc_TypeError, "arg 1 must be an integer");
return NULL;
}
r = mbstreamreader_iread(self, "read", sizehint);
if (r == NULL)
return NULL;
sr = PyUnicode_Splitlines(r, 1);
Py_DECREF(r);
return sr;
}
static PyObject *
mbstreamreader_reset(MultibyteStreamReaderObject *self)
{
if (self->codec->decreset != NULL &&
self->codec->decreset(&self->state, self->codec->config) != 0)
return NULL;
self->pendingsize = 0;
Py_RETURN_NONE;
}
static struct PyMethodDef mbstreamreader_methods[] = {
{"read", (PyCFunction)mbstreamreader_read,
METH_VARARGS, NULL},
{"readline", (PyCFunction)mbstreamreader_readline,
METH_VARARGS, NULL},
{"readlines", (PyCFunction)mbstreamreader_readlines,
METH_VARARGS, NULL},
{"reset", (PyCFunction)mbstreamreader_reset,
METH_NOARGS, NULL},
{NULL, NULL},
};
static PyMemberDef mbstreamreader_members[] = {
{"stream", T_OBJECT,
offsetof(MultibyteStreamReaderObject, stream),
READONLY, NULL},
{NULL,}
};
static PyObject *
mbstreamreader_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
MultibyteStreamReaderObject *self;
PyObject *stream, *codec = NULL;
char *errors = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|s:StreamReader",
streamkwarglist, &stream, &errors))
return NULL;
self = (MultibyteStreamReaderObject *)type->tp_alloc(type, 0);
if (self == NULL)
return NULL;
codec = PyObject_GetAttrString((PyObject *)type, "codec");
if (codec == NULL)
goto errorexit;
if (!MultibyteCodec_Check(codec)) {
PyErr_SetString(PyExc_TypeError, "codec is unexpected type");
goto errorexit;
}
self->codec = ((MultibyteCodecObject *)codec)->codec;
self->stream = stream;
Py_INCREF(stream);
self->pendingsize = 0;
self->errors = internal_error_callback(errors);
if (self->errors == NULL)
goto errorexit;
if (self->codec->decinit != NULL &&
self->codec->decinit(&self->state, self->codec->config) != 0)
goto errorexit;
Py_DECREF(codec);
return (PyObject *)self;
errorexit:
Py_XDECREF(self);
Py_XDECREF(codec);
return NULL;
}
static int
mbstreamreader_init(PyObject *self, PyObject *args, PyObject *kwds)
{
return 0;
}
static int
mbstreamreader_traverse(MultibyteStreamReaderObject *self,
visitproc visit, void *arg)
{
if (ERROR_ISCUSTOM(self->errors))
Py_VISIT(self->errors);
Py_VISIT(self->stream);
return 0;
}
static void
mbstreamreader_dealloc(MultibyteStreamReaderObject *self)
{
PyObject_GC_UnTrack(self);
ERROR_DECREF(self->errors);
Py_XDECREF(self->stream);
Py_TYPE(self)->tp_free(self);
}
static PyTypeObject MultibyteStreamReader_Type = {
PyVarObject_HEAD_INIT(NULL, 0)
"MultibyteStreamReader", /* tp_name */
sizeof(MultibyteStreamReaderObject), /* tp_basicsize */
0, /* tp_itemsize */
/* methods */
(destructor)mbstreamreader_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
PyObject_GenericGetAttr, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC
| Py_TPFLAGS_BASETYPE, /* tp_flags */
0, /* tp_doc */
(traverseproc)mbstreamreader_traverse, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iterext */
mbstreamreader_methods, /* tp_methods */
mbstreamreader_members, /* tp_members */
codecctx_getsets, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
mbstreamreader_init, /* tp_init */
0, /* tp_alloc */
mbstreamreader_new, /* tp_new */
};
/**
* MultibyteStreamWriter object
*/
static int
mbstreamwriter_iwrite(MultibyteStreamWriterObject *self,
PyObject *unistr)
{
PyObject *str, *wr;
str = encoder_encode_stateful(STATEFUL_ECTX(self), unistr, 0);
if (str == NULL)
return -1;
wr = PyObject_CallMethod(self->stream, "write", "O", str);
Py_DECREF(str);
if (wr == NULL)
return -1;
Py_DECREF(wr);
return 0;
}
static PyObject *
mbstreamwriter_write(MultibyteStreamWriterObject *self, PyObject *strobj)
{
if (mbstreamwriter_iwrite(self, strobj))
return NULL;
else
Py_RETURN_NONE;
}
static PyObject *
mbstreamwriter_writelines(MultibyteStreamWriterObject *self, PyObject *lines)
{
PyObject *strobj;
int i, r;
if (!PySequence_Check(lines)) {
PyErr_SetString(PyExc_TypeError,
"arg must be a sequence object");
return NULL;
}
for (i = 0; i < PySequence_Length(lines); i++) {
/* length can be changed even within this loop */
strobj = PySequence_GetItem(lines, i);
if (strobj == NULL)
return NULL;
r = mbstreamwriter_iwrite(self, strobj);
Py_DECREF(strobj);
if (r == -1)
return NULL;
}
Py_RETURN_NONE;
}
static PyObject *
mbstreamwriter_reset(MultibyteStreamWriterObject *self)
{
const Py_UNICODE *pending;
PyObject *pwrt;
pending = self->pending;
pwrt = multibytecodec_encode(self->codec, &self->state,
&pending, self->pendingsize, self->errors,
MBENC_FLUSH | MBENC_RESET);
/* some pending buffer can be truncated when UnicodeEncodeError is
* raised on 'strict' mode. but, 'reset' method is designed to
* reset the pending buffer or states so failed string sequence
* ought to be missed */
self->pendingsize = 0;
if (pwrt == NULL)
return NULL;
if (PyString_Size(pwrt) > 0) {
PyObject *wr;
wr = PyObject_CallMethod(self->stream, "write", "O", pwrt);
if (wr == NULL) {
Py_DECREF(pwrt);
return NULL;
}
}
Py_DECREF(pwrt);
Py_RETURN_NONE;
}
static PyObject *
mbstreamwriter_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
MultibyteStreamWriterObject *self;
PyObject *stream, *codec = NULL;
char *errors = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|s:StreamWriter",
streamkwarglist, &stream, &errors))
return NULL;
self = (MultibyteStreamWriterObject *)type->tp_alloc(type, 0);
if (self == NULL)
return NULL;
codec = PyObject_GetAttrString((PyObject *)type, "codec");
if (codec == NULL)
goto errorexit;
if (!MultibyteCodec_Check(codec)) {
PyErr_SetString(PyExc_TypeError, "codec is unexpected type");
goto errorexit;
}
self->codec = ((MultibyteCodecObject *)codec)->codec;
self->stream = stream;
Py_INCREF(stream);
self->pendingsize = 0;
self->errors = internal_error_callback(errors);
if (self->errors == NULL)
goto errorexit;
if (self->codec->encinit != NULL &&
self->codec->encinit(&self->state, self->codec->config) != 0)
goto errorexit;
Py_DECREF(codec);
return (PyObject *)self;
errorexit:
Py_XDECREF(self);
Py_XDECREF(codec);
return NULL;
}
static int
mbstreamwriter_init(PyObject *self, PyObject *args, PyObject *kwds)
{
return 0;
}
static int
mbstreamwriter_traverse(MultibyteStreamWriterObject *self,
visitproc visit, void *arg)
{
if (ERROR_ISCUSTOM(self->errors))
Py_VISIT(self->errors);
Py_VISIT(self->stream);
return 0;
}
static void
mbstreamwriter_dealloc(MultibyteStreamWriterObject *self)
{
PyObject_GC_UnTrack(self);
ERROR_DECREF(self->errors);
Py_XDECREF(self->stream);
Py_TYPE(self)->tp_free(self);
}
static struct PyMethodDef mbstreamwriter_methods[] = {
{"write", (PyCFunction)mbstreamwriter_write,
METH_O, NULL},
{"writelines", (PyCFunction)mbstreamwriter_writelines,
METH_O, NULL},
{"reset", (PyCFunction)mbstreamwriter_reset,
METH_NOARGS, NULL},
{NULL, NULL},
};
static PyMemberDef mbstreamwriter_members[] = {
{"stream", T_OBJECT,
offsetof(MultibyteStreamWriterObject, stream),
READONLY, NULL},
{NULL,}
};
static PyTypeObject MultibyteStreamWriter_Type = {
PyVarObject_HEAD_INIT(NULL, 0)
"MultibyteStreamWriter", /* tp_name */
sizeof(MultibyteStreamWriterObject), /* tp_basicsize */
0, /* tp_itemsize */
/* methods */
(destructor)mbstreamwriter_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
PyObject_GenericGetAttr, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC
| Py_TPFLAGS_BASETYPE, /* tp_flags */
0, /* tp_doc */
(traverseproc)mbstreamwriter_traverse, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iterext */
mbstreamwriter_methods, /* tp_methods */
mbstreamwriter_members, /* tp_members */
codecctx_getsets, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
mbstreamwriter_init, /* tp_init */
0, /* tp_alloc */
mbstreamwriter_new, /* tp_new */
};
/**
* Exposed factory function
*/
static PyObject *
__create_codec(PyObject *ignore, PyObject *arg)
{
MultibyteCodecObject *self;
MultibyteCodec *codec;
if (!PyCObject_Check(arg)) {
PyErr_SetString(PyExc_ValueError, "argument type invalid");
return NULL;
}
codec = PyCObject_AsVoidPtr(arg);
if (codec->codecinit != NULL && codec->codecinit(codec->config) != 0)
return NULL;
self = PyObject_New(MultibyteCodecObject, &MultibyteCodec_Type);
if (self == NULL)
return NULL;
self->codec = codec;
return (PyObject *)self;
}
static struct PyMethodDef __methods[] = {
{"__create_codec", (PyCFunction)__create_codec, METH_O},
{NULL, NULL},
};
PyMODINIT_FUNC
init_multibytecodec(void)
{
int i;
PyObject *m;
PyTypeObject *typelist[] = {
&MultibyteIncrementalEncoder_Type,
&MultibyteIncrementalDecoder_Type,
&MultibyteStreamReader_Type,
&MultibyteStreamWriter_Type,
NULL
};
if (PyType_Ready(&MultibyteCodec_Type) < 0)
return;
m = Py_InitModule("_multibytecodec", __methods);
if (m == NULL)
return;
for (i = 0; typelist[i] != NULL; i++) {
if (PyType_Ready(typelist[i]) < 0)
return;
Py_INCREF(typelist[i]);
PyModule_AddObject(m, typelist[i]->tp_name,
(PyObject *)typelist[i]);
}
if (PyErr_Occurred())
Py_FatalError("can't initialize the _multibytecodec module");
}