记---串口读取数据阻塞不能释放线程资源问题

1.问题与场景
在处理串口读取数据时遇到的一个问题。读取串口数据启动了一个线程去进行轮询读取,如果串口中有数据就正常读取,并且返回读取到的字节长度,如果串口没有数据写入时去进行读取的话这个操作是被阻塞住的。如果一直没有数据写入,那就一直不能读取返回。这就导致了一个问题,就是线程永远都会被阻塞在读取的那个地方。想要结束释放这个线程的资源显然是做不到的,因为线程被阻塞住了。

2.个人思考
思考1:既然线程是被阻塞住了,那么我就想能不能通过线程中断来进行线程的停止。经测试:调用线程的中断方法(Thread.interrupted()),线程并没有接收到中断信号,所以此方法行不通。

思考2:经过上面的方法不行之后,又尝试了使用线程池去执行读取数据,然后调用线程池的shutdownNow()方法企图想把线程资源释放,结果还是不行。后又尝试了AsyncTask去进行读取,然后使用AsyncTask的cancel()方法去进行线程资源释放,结果还是不行。这些方法都不能使被阻塞的线程得到资源的释放。

思考3:后询问同事这种场景的问题该怎么解决,有什么方案推荐一下?同事说用select(我以为说的是Selector,NIO里面的一个类java.nio.channels包下面的)我就去查阅Selector的使用方法,后经研究发现,这个Selector是实现非阻塞IO的一种方式,我看到的都是处理Sockect的,与我读取串口数据使用场景不同。至此还是没有想到解决办法。(因为当时没有想到select这个技术,本人也没了解过select,所以没解决。)

思考4:问题得不到解决,但是又想解决这个资源释放的问题。没办法就百度了selector.select()的方法是怎么实现的,虽然我的场景不是sockect,但是我就想知道这个selector.select()被阻塞之后是怎么能返回的。百度了selector.select()之后发现很多前辈都提到select、poll、epoll。前两者select、poll我没听过,但是epoll这个我知道,在安卓的Handler机制的MessageQueue.java的next()方法的nativePollOnce()方法的native源码里面用的就是epoll机制。epoll机制大概的原理就是监听一个fd。如果fd有修改,那nativePollOnce()就会返回继续执行下面的代码,同时nativePollOnce()还支持超时返回。好家伙这不正是我想要实现的功能吗?

思考5:既然网上都把select和poll和epoll进行比较,我就去看了很多它们的介绍。其实三者都是可以实现我想要的功能。最终我选择了select,因为同事给我指的路是select而且它实现起来相对容易,并且它虽然监听fd的数量上有限制,但是已经可以满足我的需求了。

3.代码实现
因为select只有c代码的实现,所以在android上只能使用JNI的方式。

3.1创建一个 MySelect.h 文件,内容如下。

#include

#include

#include

#include

#include

#include

#include

JNIEXPORT jint

Java_myselect_MySelect_nativeRead(JNIEnv *env, jclass clazz, jobject file_descriptor,

jbyteArray data, jint datalen) ;

JNIEXPORT jint

Java_myselect_MySelect_nativeWrite(JNIEnv *env, jclass clazz, jobject file_descriptor,

jbyteArray data, jint datalen) ;

3.2创建一个MySelect.c文件,内容如下:

#include "Myselect.h"

#include "android/log.h"

static const char *TAG="caifeng";

#define LOGI(fmt, args...) __android_log_print(ANDROID_LOG_INFO,  TAG, fmt, ##args)

#define LOGD(fmt, args...) __android_log_print(ANDROID_LOG_DEBUG, TAG, fmt, ##args)

#define LOGE(fmt, args...) __android_log_print(ANDROID_LOG_ERROR, TAG, fmt, ##args)

JNIEXPORT jint Java_myselect_MySelect_nativeRead(JNIEnv *env,jclass clazz,jobject file_descriptor,

jbyteArray data,jint datalen) {

jclass FileDescriptorClass = (*env)->FindClass(env,"java/io/FileDescriptor");

jfieldID descriptorID = (*env)->GetFieldID(env,FileDescriptorClass,"descriptor","I");

jint descriptor = (*env)->GetIntField(env, file_descriptor,descriptorID);

return uart_recv(descriptor,data,datalen);

}

JNIEXPORT jint Java_myselect_MySelect_nativeWrite(JNIEnv *env,jclass clazz,jobject file_descriptor,

jbyteArray data,jint datalen) {

jclass FileDescriptorClass = (*env)->FindClass(env,"java/io/FileDescriptor");

jfieldID descriptorID = (*env)->GetFieldID(env,FileDescriptorClass,"descriptor","I");

jint descriptor = (*env)->GetIntField(env, file_descriptor,descriptorID);

LOGD("descriptor写的 = %d",descriptor);

return uart_send(descriptor,data,datalen);

}

/**

*串口接收数据*要求启动后,在pc端发送ascii文件*/

int uart_recv(int serial_fd,char *data,int datalen)

{

int len=0,ret = -1;

fd_set fs_read;

struct timeval tv_timeout;

FD_ZERO(&fs_read);

FD_SET(serial_fd, &fs_read);

tv_timeout.tv_sec  =40000;//这个是秒

    tv_timeout.tv_usec =0;//这个是微秒

    ret = select(serial_fd+1, &fs_read,NULL,NULL, &tv_timeout);

return ret;

//    while(FD_ISSET(serial_fd,&fs_read))

//    {

//

//        FD_ZERO(&fs_read);

//        FD_SET(serial_fd,&fs_read);

//        ret = select(serial_fd+1, &fs_read, NULL, NULL, &tv_timeout);

//        LOGD("descriptor select ret = %d",ret);

//        printf("ret = %d\n", ret);

//        //如果返回0,代表在描述符状态改变前已超过timeout时间,错误返回-1

//

////        if(FD_ISSET(serial_fd, &fs_read)) {

////            len = read(serial_fd, data, datalen);

////            LOGD("descriptor read len = %d",ret);

////            printf("len = %d\n", len);

////            return 1;

////        } else {

////            LOGD("descriptor select--select");

////            perror("select");

////        }

//    }

//    return ret;

}

/**

*串口发送数据*@fd:串口描述符*@data:待发送数据*@datalen:数据长度*/

int uart_send(int serial_fd,char *data,int datalen)

{

int len =0;

len = write(serial_fd, data, datalen);//实际写入的长度

    if(len == datalen) {

return len;

}else {

tcflush(serial_fd,TCOFLUSH);//TCOFLUSH刷新写入的数据但不传送

        return -1;

}

return 0;

}

3.3编译后加载so库:创建一个MySelect.java类:

package myselect;

import java.io.FileDescriptor;

public class MySelect {

static {

System.loadLibrary("myselect");

}

public static int read(FileDescriptor fileDescriptor,byte[] data,int datalen){

return nativeRead(fileDescriptor,data,datalen);

}

public static int write(FileDescriptor fileDescriptor,byte[] data,int datalen){

return nativeWrite(fileDescriptor,data,datalen);

}

public static native int nativeRead(FileDescriptor fileDescriptor,byte[] data,int datalen);

public static native int nativeWrite(FileDescriptor fileDescriptor,byte[] data,int datalen);

}

3.4使用MySelect.java里面的函数进行读取。

package com.rk.device.selecttest;

import androidx.appcompat.app.AppCompatActivity;

import android.os.Bundle;

import android.os.Trace;

import android.util.Log;

import android.view.View;

import android.widget.Button;

import java.io.File;

import java.io.FileDescriptor;

import java.io.IOException;

import android_serialport_api.SerialPort;

import myselect.MySelect;

public class MainActivity extends AppCompatActivity {

private SerialPort mSerialPort;

private static final String TTYS4 ="/dev/ttyS4";

private static final int IBAUDRATE =115200;

FileDescriptor mFd;

Button read,write;

Thread t1;

@Override

    protected void onCreate(Bundle savedInstanceState) {

super.onCreate(savedInstanceState);

setContentView(R.layout.activity_main);

Log.d("MainActivity",Log.getStackTraceString(new Throwable()));

read = findViewById(R.id.read);

write = findViewById(R.id.write);

try {

mSerialPort =new SerialPort(new File(TTYS4),IBAUDRATE,0);

mFd =mSerialPort.getmFd();

}catch (IOException e) {

e.printStackTrace();

}

read.setOnClickListener(new View.OnClickListener() {

@Override

            public void onClick(View v) {

t1 =new Thread(new Runnable() {

@Override

                    public void run() {

while (!Thread.interrupted()) {

int len =MySelect.read(mFd,new byte[1024],1024);

Log.d("MainActivity","len=" + len);

if (len >0) {

try {

byte[]buff =new byte[1024];

len =mSerialPort.getInputStream().read(buff);

for (int i =0; i < len; i++) {

Log.d("MainActivity","读取到的值=" +buff[i]);

}

}catch (IOException e) {

e.printStackTrace();

}

}

}

Log.d("MainActivity","结束");

}

});

t1.start();

}

});

write.setOnClickListener(new View.OnClickListener() {

@Override

            public void onClick(View v) {

//                t1.interrupt();

                new Thread(new Runnable() {

@Override

                    public void run() {

while (true) {

try {

Thread.sleep(50);

}catch (InterruptedException e) {

e.printStackTrace();

}

try {

mSerialPort.getOutputStream().write(new byte[]{10,12,53});

}catch (IOException e) {

e.printStackTrace();

}

}

}

}).start();

}

});

}

@Override

    protected void onDestroy() {

super.onDestroy();

Log.d("MainActivity",Log.getStackTraceString(new Throwable()));

}

}

3.5 下面是串口类SerialPort.java 的定义:

package android_serialport_api;

import android.util.Log;

import java.io.File;

import java.io.FileDescriptor;

import java.io.FileInputStream;

import java.io.FileOutputStream;

import java.io.IOException;

import java.io.InputStream;

import java.io.OutputStream;

public class SerialPort {

private static final String TAG ="SerialPort";

/*

* Do not remove or rename the field mFd: it is used by native method close();

*/

    private FileDescriptor mFd;

private FileInputStream mFileInputStream;

private FileOutputStream mFileOutputStream;

public SerialPort(File device,int baudrate,int flags)throws SecurityException,IOException {

/* Check access permission */

/*if (!device.canRead() || !device.canWrite()) {

try {

Process su;

su = Runtime.getRuntime().exec("/system/bin/su");

String cmd = "chmod 666 " + device.getAbsolutePath() + "\n"

+ "exit\n";

su.getOutputStream().write(cmd.getBytes());

if ((su.waitFor() != 0) || !device.canRead()

|| !device.canWrite()) {

throw new SecurityException();

}

} catch (Exception e) {

e.printStackTrace();

throw new SecurityException();

}

}

*/

        mFd =open(device.getAbsolutePath(), baudrate, flags);

if (mFd ==null) {

Log.e(TAG,"native open returns null");

throw new IOException();

}

mFileInputStream =new FileInputStream(mFd);

mFileOutputStream =new FileOutputStream(mFd);

}

// Getters and setters

    public InputStream getInputStream() {

return mFileInputStream;

}

public OutputStream getOutputStream() {

return mFileOutputStream;

}

public FileDescriptor getmFd() {

return mFd;

}

// JNI

    private native static FileDescriptor open(String path,int baudrate,int flags);

public native void close();

static {

System.loadLibrary("serial_port");

}

}

System.loadLibrary("serial_port");这里加载的是我提前编译好打开串口的工具库。网上也有一些打开串口的其他方案。至此释放线程资源的办法就如上所述。原理就是:使用select去监听串口打开的那个fd,如果监听到有数据写入,那就返回len>0 如果返回len>0 我们再用串口打开的流去进行读取,而不是直接使用流的read()去读取。如果没有数据写入,并且超时了就返回len<0.超时之后我们就可以释放线程资源了,或者读取到数据之后也可以进行资源的释放。其实就是在getInputStream().read();之前添加select的监听。这样就可以防止线程长时间被阻塞了。如遇到IO阻塞问题,也可以使用这种方案。
项目工程链接:

链接:https://pan.baidu.com/s/1_VXa2ixy2N5FLbCfBjHLuA

提取码:x9gc

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容